Reactieve programmeeroperatoren in RxJava 2

Als uw Android-app die vijfsterrenrecensies op Google Play verzamelt, moet deze multi-tasken.

Als een absoluut minimum verwachten de mobiele gebruikers van vandaag dat ze nog steeds kunnen communiceren met uw app terwijl deze op de achtergrond aan het werk is. Dit klinkt misschien eenvoudig, maar Android is standaard voorzien van één thread, dus als je aan de verwachtingen van je publiek wilt voldoen, zul je vroeg of laat wat extra threads moeten maken.

In het vorige artikel in deze serie kregen we een inleiding tot RxJava, een reactieve bibliotheek voor de JVM die je kan helpen Android-applicaties te maken die reageren op gegevens en gebeurtenissen terwijl ze zich voordoen. Maar u kunt deze bibliotheek ook gebruiken om te reageren op gegevens en gebeurtenissen gelijktijdig.

In dit bericht laat ik je zien hoe je de operators van RxJava kunt gebruiken om uiteindelijk gelijktijdigheid op Android een pijnvrije ervaring te maken. Aan het einde van dit artikel weet u hoe u RxJava-operators kunt gebruiken om extra threads te maken, het werk op te geven dat moet plaatsvinden in deze threads en de resultaten vervolgens weer in de allerbelangrijkste UI-thread van Android te plaatsen - alles met slechts een enkele regels code.

En aangezien geen enkele technologie perfect is, zal ik u ook vertellen over een belangrijke potentiële valkuil bij het toevoegen van de RxJava-bibliotheek aan uw projecten - voordat u wordt getoond hoe u operators kunt gebruiken om dit probleem te voorkomen nooit komt voor in uw eigen Android-projecten.

Introductie van Operators

RxJava heeft een enorme verzameling operatoren die voornamelijk zijn bedoeld om u te helpen bij het wijzigen, filteren, samenvoegen en transformeren van de gegevens die door uw waarneembaars. Je vindt de volledige lijst met RxJava-operatoren bij de officiële documenten, en hoewel niemand verwacht dat je uit het hoofd leert elke operator, het is de moeite waard om wat tijd te besteden aan het lezen van deze lijst, alleen al om een ​​idee te hebben van de verschillende gegevenstransformaties die u kunt uitvoeren.

De lijst met operators van RxJava is al behoorlijk uitgebreid, maar als u de perfecte operator voor de datatransformatie die u in gedachten had, niet kunt vinden, kunt u meerdere operatoren altijd aan elkaar koppelen. Een operator op een waarneembaar meestal geeft een ander terug waarneembaar, u kunt dus gewoon operators blijven gebruiken totdat u de gewenste resultaten krijgt.

Er zijn veel te veel RxJava-operatoren om in één artikel te behandelen, en de officiële RxJava-documenten doen al goed werk bij het introduceren van alle operatoren die u kunt gebruiken voor datatransformaties, dus ik zal me concentreren op twee operatoren die de het meeste potentieel om uw leven als Android-ontwikkelaar eenvoudiger te maken: subscribeOn () en observeOn ()

Multithreading met RxJava-operators

Als uw app de best mogelijke gebruikerservaring biedt, moet deze in staat zijn om intensieve of langdurige taken uit te voeren en meerdere taken tegelijk uit te voeren, zonder de belangrijkste UI-thread van Android te blokkeren.

Stel dat uw app bijvoorbeeld informatie moet ophalen uit twee verschillende databases. Als u beide taken één voor één uitvoert in de hoofdthread van Android, zal dit niet alleen een aanzienlijke hoeveelheid tijd vergen, maar de gebruikersinterface reageert pas als uw app alle informatie uit beide databases heeft opgehaald. . Niet echt een geweldige gebruikerservaring!

Een veel betere oplossing is om twee extra threads te maken waarin u beide taken tegelijkertijd kunt uitvoeren zonder dat de hoofd UI-thread wordt geblokkeerd. Deze aanpak betekent dat het werk twee keer zo snel wordt voltooid, en de gebruiker kan blijven communiceren met de gebruikersinterface van uw app. Uw gebruikers zijn mogelijk niet eens op de hoogte van het feit dat uw app op de achtergrond intensief en langdurig werk uitvoert - alle database-informatie zal eenvoudig verschijnen in de gebruikersinterface van uw toepassing, alsof het magie is!

Out of the box biedt Android een aantal hulpmiddelen die u kunt gebruiken om extra threads te maken, waaronder Services en IntentServices, maar deze oplossingen zijn lastig te implementeren en kunnen snel resulteren in complexe, uitgebreide code. Plus, als u multithreading niet correct implementeert, is het mogelijk dat u een toepassing tegenkomt die geheugen lekt en allerlei soorten fouten veroorzaakt.

Om multithreading op Android nog meer hoofdpijn te geven, is Android's belangrijkste UI-thread de enige thread die de gebruikersinterface van uw app kan bijwerken. Als u de gebruikersinterface van uw app wilt bijwerken met het resultaat van het uitgevoerde werk een andere thread, dan moet je meestal een maken handler op de hoofd UI-thread en gebruik deze handler om gegevens van uw achtergrondthread over te brengen naar de hoofdthread. Dit betekent meer code, meer complexiteit en meer kans dat fouten in uw project terechtkomen.

Maar RxJava beschikt over twee operatoren die u kunnen helpen om veel van deze complexiteit en kans op fouten te voorkomen.

Merk op dat u deze operatoren in combinatie met gebruikt planners, die in essentie componenten zijn waarmee je kunt specificeren threads. Denk er nu aan scheduler als synoniem met het woord draad.

  • subscribeOn (Scheduler): Standaard, een waarneembaar zendt zijn gegevens uit op de thread waar het abonnement werd gedeclareerd, d.w.z. waar je het hebt geroepen .abonneren methode. In Android is dit meestal de belangrijkste UI-thread. U kunt de subscribeOn () operator om een ​​ander te definiëren Scheduler waar de waarneembaar zou zijn gegevens moeten uitvoeren en uitzenden.
  • observeOn (Scheduler): U kunt deze operator gebruiken om uw waarneembaarde emissies naar een andere Scheduler, effectief de draad veranderen waar de waarneembaarmeldingen worden verzonden, en bij uitbreiding de thread waar de gegevens worden gebruikt.

RxJava wordt geleverd met een aantal planners die u kunt gebruiken om verschillende threads te maken, waaronder:

  • Schedulers.io (): Ontworpen om te worden gebruikt voor IO-gerelateerde taken. 
  • Schedulers.computation (): Ontworpen om te worden gebruikt voor computationele taken. Standaard is het aantal threads in de berekeningsplanner beperkt tot het aantal beschikbare CPU's op uw apparaat.
  • Schedulers.newThread (): Creëert een nieuwe thread.

Nu heb je een overzicht van alle bewegende delen, laten we eens kijken naar enkele voorbeelden van hoe subscribeOn () en observeOn () worden gebruikt en zien sommige planners in actie.

subscribeOn ()

In Android gebruik je meestal subscribeOn () en een begeleidende Scheduler om de thread te wijzigen waar een aantal langlopende of intensieve taken worden uitgevoerd, dus er is geen risico dat de hoofd UI-thread wordt geblokkeerd. U kunt bijvoorbeeld besluiten om een ​​grote hoeveelheid gegevens te importeren op de io() planner of voer een aantal berekeningen uit op de berekening () scheduler.

In de volgende code maken we een nieuwe thread waarbij de waarneembaar zal zijn operaties uitvoeren en de waarden uitzenden 1, 2, en 3.

Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer); 

Hoewel dit alles is wat je nodig hebt om een ​​thread te maken en begin met het uitzenden van gegevens over die thread, wil je misschien een bevestiging dat deze waarneembaar echt werkt op een nieuwe thread. Eén methode is om de naam van de thread af te drukken die uw toepassing momenteel gebruikt, in Android Studio'sLogcat Monitor.

Handig genoeg, in het vorige bericht, Aan de slag met RxJava, hebben we een toepassing gemaakt die berichten naar Logcat Monitor verzendt in verschillende stadia tijdens de levenscyclus van Observable, dus we kunnen veel van deze code hergebruiken.

Open het project dat je in die post hebt gemaakt en pas je code aan zodat deze het bovenstaande gebruikt waarneembaar als de bron waarneembaar. Voeg vervolgens de subscribeOn () Operator en geef op dat de berichten die naar Logcat worden verzonden, de naam van de huidige thread moeten bevatten.

Uw voltooide project zou er ongeveer zo uit moeten zien:

import android.support.v7.app.AppCompatActivity; import android.os.Bundle; import android.util.Log; import io.reactivex.Observable; importeer io.reactivex.Observer; importeer io.reactivex.disposables.Disposable; import io.reactivex.schedulers.Schedulers; public class MainActivity breidt AppCompatActivity uit public static final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Observable.just (1, 2, 3) .subscribeOn (Schedulers.newThread ()) .subscribe (Observer);  Waarnemer Observer = nieuwe waarnemer() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe" + Thread.currentThread (). GetName ());  @Override public void onNext (Integer value) Log.e (TAG, "onNext:" + value + Thread.currentThread (). GetName ());  @Override public void onError (Throwable e) Log.e (TAG, "onError:");  @Override public void onComplete () Log.e (TAG, "onComplete: All Done!" + Thread.currentThread (). GetName ()); ; 

Zorg ervoor dat de Logcat Monitor van Android Studio open is (door de Android-monitor tabblad, gevolgd door logcat) en voer uw project vervolgens uit op een fysiek Android-apparaat of een AVD. U zou de volgende uitvoer in de Logcat Monitor moeten zien:

Hier kun je dat zien .abonneren wordt aangeroepen op de hoofd UI-thread, maar de waarneembare werkt op een geheel andere thread.

De subscribeOn () operator zal hetzelfde effect hebben, ongeacht waar u het in de waarneembare ketting plaatst; u kunt echter niet meerdere gebruiken subscribeOn () operatoren in dezelfde keten. Als u er meer dan één toevoegt subscribeOn (), dan zal je ketting het doen enkel en alleen gebruik de subscribeOn () dat is het dichtst bij de waarneembare bron.

observeOn ()

anders subscribeOn (), waar je plaatst observeOn () in je ketting doet materie, omdat deze operator alleen de thread wijzigt die wordt gebruikt door de waarneembare waarnemingen die verschijnen stroomafwaarts

Als u bijvoorbeeld het volgende in uw ketting hebt ingevoegd, gebruikt elke waarneembare die vanaf dit punt in de ketting wordt weergegeven de nieuwe thread.

.observeOn (Schedulers.newThread ())

Deze keten blijft op de nieuwe thread lopen totdat deze een andere ontmoet observeOn () operator, waarna het overschakelt naar de thread die door die operator is opgegeven. U kunt de draad besturen waar specifieke waarnemingen hun meldingen verzenden door er meerdere in te voegen observeOn () operators in uw keten.

Bij het ontwikkelen van Android-apps zult u dit over het algemeen gebruiken observeOn () om het resultaat van uitgevoerde werkzaamheden op achtergrondthreads naar de belangrijkste UI-thread van Android te verzenden. De eenvoudigste manier om emissies om te leiden naar de belangrijkste UI-thread van Android is om de AndroidSchedulers.mainThread Scheduler, die is opgenomen als onderdeel van de RxAndroid-bibliotheek, in plaats van de RxJava-bibliotheek. 

De RxAndroid-bibliotheek bevat Android-specifieke bindingen voor RxJava 2, waardoor het een waardevolle aanvullende bron voor Android-ontwikkelaars is (en iets waar we in het volgende bericht van deze serie in veel meer detail naar zullen kijken).

Als u RxAndroid aan uw project wilt toevoegen, opent u uw moduleniveau build.gradle bestand en voeg de nieuwste versie van de bibliotheek toe aan het gebied afhankelijkheden. Op het moment van schrijven was de nieuwste release van RxAndroid 2.0.1, dus ik voeg het volgende toe:

afhankelijkheden ... compileer 'io.reactivex.rxjava2: rxandroid: 2.0.1'

Nadat u deze bibliotheek aan uw project hebt toegevoegd, kunt u opgeven dat de resultaten van een waarneembaar bericht naar de belangrijkste UI-thread van uw app moeten worden verzonden, met behulp van een enkele regel code:

.observeOn (AndroidSchedulers.mainThread ())

Aangezien communicatie met de belangrijkste UI-thread van uw app een volledige pagina van de officiële Android-documenten in beslag neemt, is dit een enorme verbetering die u mogelijk veel tijd zou kunnen besparen bij het maken van multithreaded Android-apps.

RxJava's grootste nadeel

Hoewel RxJava veel Android-ontwikkelaars te bieden heeft, is geen enkele technologie perfect en heeft RxJava één grote valkuil die het potentieel heeft om uw app te laten crashen.

Standaard heeft RxJava een push-gebaseerde workflow: gegevens worden stroomopwaarts geproduceerd door een waarneembaar, en wordt dan stroomafwaarts naar de toegewezen geduwd Waarnemer. Het belangrijkste probleem met een push-gebaseerde workflow is hoe gemakkelijk het is voor de producent (in dit geval de waarneembaar) om voorwerpen te snel uit te stoten voor de consument (Waarnemer) verwerken.

Een spraakzaam waarneembaar en een traag Waarnemer kan snel resulteren in een achterstand van niet-verbruikte items, die systeembronnen gaan opslokken en zelfs resulteren in een OutOfMemoryException. Dit probleem staat bekend als tegendruk.

Als u vermoedt dat tegendruk optreedt in uw app, dan zijn er enkele mogelijke oplossingen, waaronder het gebruik van een operator om het aantal geproduceerde items te verminderen.

Samplingperioden maken met monster() en throttlefirst ()

Als een waarneembaar zendt een groot aantal items uit, dan is het misschien niet nodig voor de toegewezen Waarnemer ontvangen elk een van die items.

Als je een deel van een veilig kunt negeren waarneembaarde emissies, dan zijn er een paar operators die u kunt gebruiken om bemonsteringsperioden te maken en vervolgens specifieke waarden te selecteren die tijdens deze perioden worden uitgezonden:

  • De monster() De operator controleert de uitvoer van de waarneembare met door u opgegeven intervallen en neemt vervolgens het meest recente item dat werd uitgestraald tijdens die bemonsteringsperiode. Als u bijvoorbeeld opneemt .monster (5, SECONDEN) in uw project zal de waarnemer de laatste waarde ontvangen die werd uitgezonden gedurende elk interval van vijf seconden. 
  • De throttleFirst () De operator neemt de eerste waarde die werd uitgestraald tijdens de bemonsteringsperiode. Als u bijvoorbeeld opneemt .throttlefirst (5, SECONDS) dan zal de waarnemer de eerste waarde ontvangen die wordt uitgezonden gedurende elk interval van vijf seconden.  

Batching Emissies met buffer()

Als je niet veilig emissies kunt overslaan, dan kun je nog steeds een beetje druk uit je worstelen halen Waarnemer door emissies in batches te groeperen en vervolgens door te sturen En masse. Het verwerken van gecombineerde emissies is doorgaans efficiënter dan het afzonderlijk verwerken van meerdere emissies, dus deze aanpak zou het consumptieniveau moeten verbeteren.

U kunt gecombineerde emissies creëren met behulp van de buffer() operator. Hier gebruiken we buffer() om alle items te groeperen die gedurende een periode van drie seconden worden uitgezonden:

Observable.range (0, 10) .buffer (3, SECONDS) .subscribe (System.out :: println);

Als alternatief kunt u gebruiken buffer() om een ​​batch te maken die bestaat uit een specifiek aantal emissies. Hier vertellen we het bijvoorbeeld buffer() Emissies bundelen in groepen van vier:

Observable.range (0, 10) .buffer (4) .subscribe (System.out :: println);

Observables vervangen door flowables

Een alternatieve methode om het aantal emissies te verminderen, is het vervangen van de waarneembaar dat veroorzaakt je problemen met een flowable.

In RxJava 2 heeft het RxJava-team besloten de standaard te splitsen waarneembaar in twee soorten: de gewone soort waar we in deze serie naar hebben gekeken, en flowables.

flowables functioneert op vrijwel dezelfde manier als waarneembaars, maar met één groot verschil: flowables stuur alleen zoveel items als de waarnemer vraagt. Als je een hebt waarneembaar dat is meer items uitzenden dan de toegewezen waarnemer kan consumeren, dan zou je misschien willen overwegen om over te schakelen naar een flowable in plaats daarvan.

Voordat u kunt beginnen met gebruiken flowables in uw projecten, moet u de volgende importverklaring toevoegen:

importeer io.reactivex.Flowable;

Je kunt dan creëren flowables met exact dezelfde technieken die werden gebruikt om te creëren waarneembaars. Elk van de volgende codefragmenten maakt bijvoorbeeld een flowable die in staat is om gegevens uit te zenden:

flowable flowable = Flowable.fromArray (nieuwe tekenreeks [] "zuid", "noord", "west", "oost"); ... flowable.subscribe ()
flowable stroombaar = Flowable.range (0, 20); ... flowable.subscribe ()

Op dit punt vraag je je misschien af: waarom zou ik ooit gebruiken? waarneembaars wanneer ik gewoon kan gebruiken flowableen hoeft u zich geen zorgen te maken over de tegendruk? Het antwoord is dat a flowable maakt meer van een overhead dan van een regulier waarneembaar, dus in het belang van het creëren van een goed presterende app, moet je vasthouden aan waarneembaars tenzij u vermoedt dat uw toepassing worstelt met tegendruk.

singles

EEN flowable is niet de enige variatie op waarneembaar dat vind je in RxJava, omdat de bibliotheek ook het single klasse.

singles zijn handig wanneer u slechts één waarde hoeft uit te stoten. In deze scenario's maakt u een waarneembaar kan voelen als overkill, maar een single is ontworpen om gewoon een enkele waarde uit te zenden en vervolgens te voltooien, ofwel door te bellen naar:

  • onSuccess (): De single heeft zijn enige waarde.  
  • OnError (): Als de single is niet in staat om zijn item uit te zenden, dan zal het deze methode doorgeven als resultaat Throwable.

EEN single zal alleen een van deze methoden aanroepen en dan onmiddellijk beëindigen.

Laten we eens kijken naar een voorbeeld van een single in actie - nogmaals, om tijd te besparen, hergebruiken we code:

import android.os.Bundle; import android.support.v7.app.AppCompatActivity; import android.util.Log; importeer io.reactivex.Single; importeer io.reactivex.SingleObserver; importeer io.reactivex.disposables.Disposable; public class MainActivity breidt AppCompatActivity uit public static final String TAG = "MainActivity"; @Override protected void onCreate (Bundle savedInstanceState) super.onCreate (savedInstanceState); setContentView (R.layout.activity_main); Single.just ("Hello World") .subscribe (getSingleObserver ());  private SingleObserver getSingleObserver () retourneer nieuwe SingleObserver() @Override public void onSubscribe (Disposable d) Log.e (TAG, "onSubscribe");  @Override public void onSuccess (String value) Log.e (TAG, "onSuccess:" + value);  @Override public void onError (Throwable e) Log.e (TAG, "onError:"); ; 

Voer uw project uit op een AVD- of fysiek Android-apparaat en u ziet de volgende uitvoer in de Logcat Monitor van Android Studio:

Als je van gedachten verandert en een wilt converteren single in een waarneembaar op elk moment, dan heeft RxJava wederom alle operators die u nodig hebt, waaronder:

  • samensmelten met(): Voegt meerdere samen singles in een single waarneembaar
  • concatWith (): Kettingen de items die door meerdere worden uitgezonden singles samen vormen een waarneembaar emissie. 
  • toObservable (): Converteert een single in een waarneembaar die het item uitzendt dat oorspronkelijk door de single is uitgezonden en vervolgens wordt voltooid.

Samenvatting

In deze post hebben we enkele RxJava-operators onderzocht die je kunt gebruiken om meerdere threads te maken en te beheren, zonder de complexiteit en het potentieel voor fouten die traditioneel gepaard gaan met multithreading op Android. We hebben ook gezien hoe u de RxAndroid-bibliotheek kunt gebruiken om te communiceren met de belangrijkste UI-thread van Android via een enkele regel code en hoe u kunt garanderen dat tegendruk geen probleem wordt in uw toepassing.

We hebben de RxAndroid-bibliotheek een paar keer in deze serie aangeraakt, maar deze bibliotheek zit vol met Android-specifieke RxJava-bindingen die van onschatbare waarde kunnen zijn bij het werken met RxJava op het Android-platform, dus in het laatste bericht in deze serie zullen we kijk in veel meer detail naar de RxAndroid-bibliotheek.

Bekijk tot die tijd enkele van onze andere berichten over codering voor Android!