Wat is GenServer en waarom zou u om u geven?

In dit artikel leer je de basisprincipes van concurrency in Elixir en zie je hoe je processen kunt spawnen, berichten kunt verzenden en ontvangen en langlopende processen kunt maken. Ook leert u meer over GenServer, ziet u hoe het in uw toepassing kan worden gebruikt en ontdekt u enkele goodies die het voor u biedt.

Zoals u waarschijnlijk weet, is Elixir een functionele taal die wordt gebruikt om fouttolerante, gelijktijdige systemen te bouwen die veel gelijktijdige verzoeken afhandelen. BEAM (Erlang virtual machine) gebruikt processen om verschillende taken gelijktijdig uit te voeren, wat betekent dat bijvoorbeeld het dienen van een verzoek niet een ander blokkeert. Processen zijn lichtgewicht en geïsoleerd, wat betekent dat ze geen geheugen delen en zelfs als een proces crasht, kunnen anderen blijven werken.

BEAM-processen zijn heel anders dan de OS-processen. In feite werkt BEAM in één OS-proces en gebruikt het zijn eigen proces planners. Elke planner neemt er een in beslag CPU kern, werkt in een aparte thread en kan duizenden processen gelijktijdig verwerken (die om de beurt worden uitgevoerd). U kunt meer lezen over BEAM en multithreading op StackOverflow.

Dus, zoals je ziet, zijn de BEAM-processen (ik zeg gewoon "processen" vanaf nu) erg belangrijk in Elixir. De taal biedt u enkele hulpprogramma's op laag niveau om processen handmatig te spawnen, de status te behouden en de aanvragen af ​​te handelen. Weinig mensen gebruiken ze echter. Het is gebruikelijker om te vertrouwen op de Open Telecom Platform (OTP) kader om dat te doen. 

OTP heeft tegenwoordig niets meer te maken met telefoons - het is een algemeen kader om complexe gelijktijdige systemen te bouwen. Het definieert hoe uw applicaties moeten worden gestructureerd en biedt een database en een hele reeks zeer nuttige tools om serverprocessen te creëren, te herstellen van fouten, logboekregistratie uit te voeren, etc. In dit artikel zullen we het hebben over een server gedrag genaamd GenServer die wordt aangeboden door OTP.  

U kunt GenServer zien als een abstractie of een hulp die het werken met serverprocessen vereenvoudigt. Allereerst zul je zien hoe je processen kunt spawnen met behulp van sommige functies op laag niveau. Dan zullen we overschakelen naar GenServer en zien hoe het ons vereenvoudigt door de noodzaak te verwijderen om elke keer saaie (en vrij generieke) code te schrijven. Laten we beginnen!

Alles begint met spawn

Als je me zou vragen hoe je een proces in Elixir kunt maken, zou ik antwoorden: paaien het! spawn / 1 is een functie die is gedefinieerd in de pit module die een nieuw proces retourneert. Deze functie accepteert een lambda die in het gecreëerde proces zal worden uitgevoerd. Zodra de uitvoering is voltooid, wordt het proces ook afgesloten:

spawn (fn -> IO.puts ("hi") einde) |> IO.inspect # => hi # => #PID<0.72.0>

Dus hier paaien heeft een nieuw proces-ID geretourneerd. Als je een vertraging toevoegt aan de lambda, wordt de string "hi" na enige tijd afgedrukt:

spawn (fn ->: timer.sleep (5000) IO.puts ("hi") einde) |> IO.inspect # => #PID<0.82.0> # => (na 5 seconden) "hallo"

Nu kunnen we zoveel processen spawnen als we willen, en ze zullen gelijktijdig worden uitgevoerd:

spawn_it = fn (num) -> spawn (fn ->: timer.sleep (5000) IO.puts ("hi # num") einde) einde Enum.each (1 ... 10, fn (_) -> spawn_it . (: rand.uniform (100)) einde) # => (allemaal afgedrukt op hetzelfde moment, na 5 seconden) # => hi 5 # => hi 10 enz ... 

Hier spawnen we tien processen en drukken we een testreeks af met een willekeurig nummer. :rand is een module geleverd door Erlang, dus de naam is een atoom. Wat cool is, is dat alle berichten tegelijkertijd worden afgedrukt, na vijf seconden. Het gebeurt omdat alle tien processen gelijktijdig worden uitgevoerd.

Vergelijk het met het volgende voorbeeld dat dezelfde taak uitvoert, maar dan zonder te gebruiken kuit / 1:

dont_spawn_it = fn (num) ->: timer.sleep (5000) IO.puts ("hi # num") einde Enum.each (1 ... 10, fn (_) -> dont_spawn_it. (: rand.uniform ( 100)) einde) # => (na 5 seconden) hi 70 # => (na nog eens 5 seconden) hi 45 # => etc ... 

Terwijl deze code wordt uitgevoerd, kunt u naar de keuken gaan en nog een kop koffie zetten, want het duurt ongeveer een minuut om te voltooien. Elk bericht wordt opeenvolgend weergegeven, wat natuurlijk niet optimaal is!

Je zou je kunnen afvragen: "Hoeveel geheugen verbruikt een proces?" Nou, het hangt ervan af, maar in eerste instantie neemt het een paar kilobytes in beslag, wat een heel klein aantal is (zelfs mijn oude laptop heeft 8 GB aan geheugen, om nog maar te zwijgen van gave moderne servers).

Tot nu toe, zo goed. Voordat we echter gaan samenwerken met GenServer, laten we nog een ander belangrijk punt bespreken: doorgeven en ontvangen berichten.

Werken met berichten

Het is geen verrassing dat processen (die geïsoleerd zijn, zoals u zich herinnert) op de een of andere manier moeten communiceren, vooral als het gaat om het bouwen van meer of minder complexe systemen. Om dit te bereiken, kunnen we berichten gebruiken.

Een bericht kan verzonden worden met een functie met een voor de hand liggende naam: send / 2. Het accepteert een bestemming (poort, proces-id of procesnaam) en het eigenlijke bericht. Nadat het bericht is verzonden, verschijnt het in de postbus van een proces en kan worden verwerkt. Zoals je ziet, lijkt het algemene idee erg op onze dagelijkse activiteit van het uitwisselen van e-mails.

Een mailbox is in feite een wachtrij "first in first out" (FIFO). Nadat het bericht is verwerkt, wordt het uit de wachtrij verwijderd. Om berichten te ontvangen, moet je - wat denk je - een macro ontvangen. Deze macro bevat een of meer clausules en een bericht is hierop afgestemd. Als een overeenkomst wordt gevonden, wordt het bericht verwerkt. Anders wordt het bericht teruggezet in de mailbox. Daarbovenop kun je een optionele instellen na clausule die wordt uitgevoerd als een bericht niet binnen de opgegeven tijd is ontvangen. U kunt meer lezen over verzenden / 2 en te ontvangen in de officiële documenten.

Oké, genoeg met de theorie - laten we proberen met de berichten te werken. Allereerst, stuur iets naar het huidige proces:

send (self (), "hallo!")

De zelf / 0-macro retourneert een pid van het aanroepproces, wat precies is wat we nodig hebben. Laat ronde haakjes na de functie niet weg omdat u een waarschuwing krijgt met betrekking tot de dubbelzinnigheidsmatch.

Ontvang nu het bericht tijdens het instellen van de na clausule:

receive do msg -> IO.puts "Yay, een bericht: # msg" msg after 1000 -> IO.puts: stderr, "I want messages!" end |> IO.puts # => Yay, een bericht: hallo! # => hallo!

Merk op dat de clausule het resultaat van het evalueren van de laatste regel retourneert, dus we krijgen de "hallo!" draad.

Vergeet niet dat u zoveel clausules als nodig kunt invoeren:

send (self (), : ok, "hallo!") receive do : ok, msg -> IO.puts "Yay, een bericht: # msg" msg : error, msg -> IO .puts: stderr, "Oh nee, er is iets ergs gebeurd: # msg" _ -> IO.puts "I dunno what this message is ..." after 1000 -> IO.puts: stderr, "I want messages!" einde |> IO.puts

Hier hebben we vier clausules: een om een ​​succesbericht af te handelen, een andere om fouten af ​​te handelen, en dan een "fallback" -clausule en een time-out.

Als het bericht niet overeenkomt met een van de clausules, wordt het in de mailbox bewaard, wat niet altijd wenselijk is. Waarom? Omdat telkens wanneer een nieuw bericht arriveert, de oude worden verwerkt in de eerste kop (omdat de mailbox een FIFO-wachtrij is), waardoor het programma wordt vertraagd. Daarom kan een "fallback" -clausule van pas komen.

Nu je weet hoe je processen kunt spawnen, berichten kunt verzenden en ontvangen, laten we eens kijken naar een iets complexer voorbeeld waarbij een eenvoudige server wordt gemaakt die reageert op verschillende berichten.

Werken met serverproces

In het vorige voorbeeld hebben we slechts één bericht verzonden, ontvangen en wat werk uitgevoerd. Dat is prima, maar niet erg functioneel. Meestal gebeurt er wat we doen met een server die op verschillende berichten kan reageren. Met "server" bedoel ik een langlopend proces gebouwd met een terugkerende functie. Laten we bijvoorbeeld een server maken om een ​​aantal wiskundige vergelijkingen uit te voeren. Het ontvangt een bericht met de gevraagde bewerking en enkele argumenten.

Begin met het maken van de server en de looping-functie:

defmodule MathServer do def start do spawn & listen / 0 end defp listen do receive do : sqrt, caller, arg -> IO.puts arg _ -> IO.puts: stderr, "Not executed." einde luister () einde

Dus we spawnen een proces dat blijft luisteren naar de binnenkomende berichten. Nadat het bericht is ontvangen, wordt de luisteren / 0 de functie wordt opnieuw gebeld, waardoor een eindeloze lus ontstaat. Binnen in de luisteren / 0 functie, we voegen ondersteuning toe voor de : sqrt bericht, waarmee de vierkantswortel van een getal wordt berekend. De arg bevat het werkelijke aantal om de bewerking uit te voeren. We definiëren ook een fallback-clausule.

U kunt nu de server starten en de proces-ID aan een variabele toewijzen:

math_server = MathServer.start IO.inspect math_server # => #PID<0.85.0>

Briljant! Laten we nu een toevoegen implementatie functie om de berekening daadwerkelijk uit te voeren:

defmodule MathServer do # ... def sqrt (server, arg) do send (: some_name, : sqrt, self (), arg) end end

Gebruik deze functie nu:

MathServer.sqrt (math_server, 3) # => 3

Voor nu wordt gewoon het gepasseerde argument afgedrukt, dus voer je code zo uit om de wiskundige bewerking uit te voeren:

defmodule MathServer do # ... defp listen do receive do : sqrt, caller, arg -> send (: some_name, : result, do_sqrt (arg)) _ -> IO.puts: stderr, "Not executed." end listen () end defp do_sqrt (arg) do: math.sqrt (arg) end end

Nu wordt nog een ander bericht naar de server verzonden met het resultaat van de berekening. 

Wat interessant is, is dat de sqrt / 2 functie verzendt eenvoudigweg een bericht naar de server met de vraag om een ​​bewerking uit te voeren zonder op het resultaat te wachten. Dus, in principe voert het een asynchrone oproep.

Uiteraard willen we het resultaat op een bepaald moment in de tijd grijpen, dus codeer een andere openbare functie:

def grab_result do do do : result, result -> result after 5000 -> IO.puts: stderr, "Timeout" end end

Gebruik het nu:

math_server = MathServer.start MathServer.sqrt (math_server, 3) MathServer.grab_result |> IO.puts # => 1.7320508075688772

Het werkt! Natuurlijk kunt u zelfs een pool van servers maken en taken daartussen verdelen, waardoor gelijktijdigheid wordt bereikt. Het is handig wanneer de verzoeken niet op elkaar betrekking hebben.

Maak kennis met GenServer

Goed, we hebben een handvol functies behandeld waarmee we langlopende serverprocessen kunnen maken en berichten kunnen verzenden en ontvangen. Dit is geweldig, maar we moeten te veel boilerplate-code schrijven die een server-lus start (start / 0), reageert op berichten (luisteren / 0 privéfunctie) en geeft een resultaat (grab_result / 0). In meer complexe situaties, moeten we mogelijk ook een gedeelde status beheren of de fouten afhandelen.

Zoals ik aan het begin van het artikel al zei, is het niet nodig om een ​​fiets opnieuw uit te vinden. In plaats daarvan kunnen we GenServer-gedrag gebruiken dat al de boilerplate-code voor ons biedt en heeft het geweldige ondersteuning voor serverprocessen (zoals we in de vorige sectie hebben gezien).

Gedrag in Elixir is een code die een gemeenschappelijk patroon implementeert. Om GenServer te gebruiken, moet u een special definiëren callback-module die voldoet aan het contract zoals gedicteerd door het gedrag. Concreet zou het enkele callback-functies moeten implementeren, en de daadwerkelijke implementatie is aan jou. Nadat de callbacks zijn geschreven, de gedragsmodule kan ze gebruiken.

Zoals gesteld door de documenten, vereist GenServer zes callbacks om te worden geïmplementeerd, hoewel ze ook een standaardimplementatie hebben. Het betekent dat je alleen die kunt herdefiniëren die wat aangepaste logica vereisen.

Allereerst: we moeten de server starten voordat we iets anders gaan doen, dus ga naar het volgende gedeelte!

Start de server

Om het gebruik van GenServer te demonstreren, laten we een CalcServer waarmee gebruikers verschillende bewerkingen op een argument kunnen toepassen. Het resultaat van de bewerking wordt opgeslagen in een server staat, en dan kan er ook een andere bewerking op worden toegepast. Of een gebruiker kan een eindresultaat van de berekeningen krijgen.

Gebruik om te beginnen de gebruiksmacro om GenServer aan te sluiten:

defmodule CalcServer gebruikt GenServer-uiteinde

Nu zullen we sommige callbacks opnieuw moeten definiëren.

De eerste is init / 1, die wordt aangeroepen wanneer een server wordt gestart. Het gepasseerde argument wordt gebruikt om de status van een initiële server in te stellen. In het eenvoudigste geval zou dit terugbellen de : ok, initial_state tuple, hoewel er andere mogelijke retourwaarden zijn zoals : stop, reden, waardoor de server onmiddellijk stopt.

Ik denk dat we gebruikers de initiële status voor onze server kunnen laten bepalen. We moeten echter controleren of het aangenomen argument een getal is. Gebruik daarom een ​​bewakingsbeding:

defmodule CalcServer gebruikt GenServer def init (initial_value) wanneer is_number (initial_value) do : ok, initial_value end def init (_) do : stop, "De waarde moet een geheel getal zijn!" einde

Start nu gewoon de server met behulp van de start / 3-functie en geef uw CalcServer als een callback-module (het eerste argument). Het tweede argument is de oorspronkelijke status:

GenServer.start (CalcServer, 5.1) |> IO.inspect # => : ok, #PID<0.85.0>

Als u probeert een niet-nummer als een tweede argument door te geven, wordt de server niet gestart en dat is precies wat we nodig hebben.

Super goed! Nu onze server actief is, kunnen we beginnen met het coderen van wiskundige bewerkingen.

Asynchrone verzoeken verwerken

Asynchrone verzoeken worden aangeroepen afgietsels in termen van GenServer. Om een ​​dergelijk verzoek uit te voeren, gebruikt u de functie cast / 2, die een server en het daadwerkelijke verzoek accepteert. Het is vergelijkbaar met de sqrt / 2 functie die we hebben gecodeerd bij het praten over serverprocessen. Het maakt ook gebruik van de "vuur en vergeet" -benadering, wat betekent dat we niet wachten tot het verzoek is voltooid.

Om de asynchrone berichten af ​​te handelen, wordt een call_cast / 2 callback gebruikt. Het accepteert een verzoek en een staat en zou moeten reageren met een tuple : noreply, new_state in het eenvoudigste geval (of : stop, reason, new_state om de serverlus te stoppen). Laten we bijvoorbeeld een asynchroon afhandelen : sqrt gegoten:

def handle_cast (: sqrt, state) do : noreply,: math.sqrt (state) end 

Dat is hoe we de staat van onze server onderhouden. Aanvankelijk was het nummer (doorgegeven toen de server werd gestart) 5.1. Nu updaten we de status en stellen deze in : Math.sqrt (5.1).

Codeer de interface-functie die wordt gebruikt cast / 2:

Def sqrt (pid) do GenServer.cast (pid,: sqrt) einde

Voor mij lijkt dit op een kwaadaardige tovenaar die een spreuk uitstraalt maar niet geeft om de impact die het veroorzaakt.

Merk op dat we een proces-id nodig hebben om de cast uit te voeren. Onthoud dat wanneer een server met succes is gestart, een tuple : ok, pid wordt teruggestuurd. Laten we daarom patroonaanpassing gebruiken om de proces-ID te extraheren:

: ok, pid = GenServer.start (CalcServer, 5.1) CalcServer.sqrt (pid)

Leuk! Dezelfde aanpak kan worden gebruikt om bijvoorbeeld vermenigvuldiging te implementeren. De code zal wat ingewikkelder zijn omdat we het tweede argument, een vermenigvuldigingsfactor, moeten doorgeven:

def vermenigvuldigen (pid, vermenigvuldiger) do GenServer.cast (pid, : vermenigvuldigen, vermenigvuldigen) einde

De gegoten functie ondersteunt slechts twee argumenten, dus ik moet een tuple maken en daar een extra argument doorgeven.

Nu de callback:

def handle_cast (: vermenigvuldigen, vermenigvuldigen, staat) do : noreply, state * multiplier end

We kunnen ook een single schrijven handle_cast callback die zowel de werking als het stoppen van de server ondersteunt als de bewerking onbekend is:

def handle_cast (operation, state) do case operatie do: sqrt -> : noreply,: math.sqrt (state) : multiply, multiplier -> : noreply, state * multiplier _ -> : stop, "Niet geïmplementeerd", "state einde

Gebruik nu de nieuwe interface-functie:

CalcServer.multiply (pid, 2)

Geweldig, maar momenteel is er geen manier om een ​​resultaat van de berekeningen te krijgen. Daarom is het tijd om nog een callback te definiëren.

Omgaan met synchrone aanvragen

Als asynchrone verzoeken worden uitgebracht, worden synchrone verzoeken benoemd calls. Om dergelijke verzoeken uit te voeren, gebruikt u de call / 3-functie, die een server, een verzoek en een optionele time-out accepteert die standaard overeenkomt met vijf seconden.

Synchrone verzoeken worden gebruikt wanneer we willen wachten totdat het antwoord daadwerkelijk van de server komt. De typische use-case is het verkrijgen van informatie zoals het resultaat van berekeningen, zoals in het voorbeeld van vandaag (onthoud het grab_result / 0 functie van een van de voorgaande secties).

Om synchrone verzoeken te verwerken, handle_call / 3 callback wordt gebruikt. Het accepteert een verzoek, een tuple die de pid van de server bevat, en een term die de oproep identificeert, evenals de huidige status. In het eenvoudigste geval zou het moeten reageren met een tuple : reply, reply, new_state

Codeer deze callback nu:

def handle_call (: result, _, state) do : reply, state, state end

Zoals je ziet, niets complex. De antwoord en de nieuwe staat is gelijk aan de huidige staat omdat ik niets wil wijzigen nadat het resultaat is teruggestuurd.

Nu de interface resultaat / 1 functie:

def result (pid) do GenServer.call (pid,: result) einde

Dit is het! Het uiteindelijke gebruik van de CalcServer wordt hieronder gedemonstreerd:

: ok, pid = GenServer.start (CalcServer, 5.1) CalcServer.sqrt (pid) CalcServer.multiply (pid, 2) CalcServer.result (pid) |> IO.puts # => 4.516635916254486

aliasing

Het wordt een beetje vervelend om altijd een proces-ID op te geven bij het aanroepen van de interfacefuncties. Gelukkig is het mogelijk om je proces een naam te geven, of een alias. Dit wordt gedaan bij het starten van de server door in te stellen naam:

GenServer.start (CalcServer, 5.1, naam:: calc) CalcServer.sqrt CalcServer.multiply (2) CalcServer.result |> IO.puts

Merk op dat ik nu geen pid opsla, hoewel je misschien patroonherkenning wilt doen om er zeker van te zijn dat de server daadwerkelijk is gestart.

Nu worden de interface-functies een beetje eenvoudiger:

def sqrt do GenServer.cast (: calc,: sqrt) einde def vermenigvuldigen (vermenigvuldiger) do GenServer.cast (: calc, : vermenigvuldigen, vermenigvuldigen) einde def resultaat do GenServer.call (: calc,: result) end

Vergeet alleen niet dat u niet twee servers met dezelfde alias kunt starten.

Als alternatief kunt u nog een andere interface-functie introduceren start / 1 in uw module en profiteer van de macro __MODULE __ / 0, die de naam van de huidige module als een atoom retourneert:

defmodule CalcServer gebruikt GenServer def start (initial_value) do GenServer.start (CalcServer, initial_value, name: __MODULE__) end def sqrt do GenServer.cast (__ MODULE__,: sqrt) einde def vermenigvuldigen (multiplier) do GenServer.cast (__ MODULE__,  : vermenigvuldigen, vermenigvuldigen) einde def-resultaat do GenServer.call (__ MODULE__,: result) end # ... end CalcServer.start (6.1) CalcServer.sqrt CalcServer.multiply (2) CalcServer.result |> IO.puts

Beëindiging

Een andere callback die in uw module opnieuw kan worden gedefinieerd, wordt terminate / 2 genoemd. Het accepteert een reden en de huidige staat, en het wordt opgeroepen wanneer een server op het punt staat te verlaten. Dit kan gebeuren wanneer u bijvoorbeeld een onjuist argument doorgeeft aan de vermenigvuldig / 1 interface functie:

# ... CalcServer.multiply (2)

De callback ziet er ongeveer zo uit:

Def terminate (_reason, _state) do IO.puts "The server terminated" end

Conclusie

In dit artikel hebben we de basisprincipes van concurrency besproken in Elixir en besproken functies en macro's zoals paaien, te ontvangen, en sturen. U hebt geleerd welke processen zijn, hoe u ze kunt maken en hoe u berichten kunt verzenden en ontvangen. We hebben ook gezien hoe we een eenvoudig, langlopend serverproces kunnen bouwen dat reageert op zowel synchrone als asynchrone berichten.

Daarbovenop hebben we GenServer-gedrag besproken en hebben we gezien hoe het de code vereenvoudigt door verschillende callbacks te introduceren. We hebben met de. Gewerkt in het, beëindigen, handle_call en handle_cast callbacks en creëerde een eenvoudige calculatieserver. Als u iets onduidelijk lijkt, aarzel dan niet om uw vragen te stellen!

Er is meer aan GenServer, en het is natuurlijk onmogelijk om alles in één artikel te behandelen. In mijn volgende post zal ik uitleggen wat supervisors zijn en hoe u ze kunt gebruiken om uw processen te bewaken en ze te herstellen van fouten. Tot die tijd gelukkig coderen!