Az ellennyomás kezelése az RxJava-val

1. Áttekintés

Ebben a cikkben megvizsgáljuk, hogyan segít az RxJava könyvtár az ellennyomás kezelésében.

Egyszerűen fogalmazva - az RxJava a reaktív áramok koncepcióját használja a bevezetés révén Megfigyelhetők, melyikhez egy vagy sok Megfigyelők feliratkozhat. Az esetlegesen végtelen áramok kezelése nagyon kihívást jelent, mivel az ellennyomás problémájával kell szembenéznünk.

Nem nehéz olyan helyzetbe kerülni, amelyben egy Megfigyelhető gyorsabban bocsát ki elemeket, mint amennyit az előfizető el tud fogyasztani. Meg fogjuk vizsgálni a fogyasztási cikkek növekvő puffere problémájának különböző megoldásait.

2. Forró Megfigyelhetők Versus hideg Megfigyelhetők

Először hozzunk létre egy egyszerű fogyasztói függvényt, amelyet a (z) elemei fogyasztóként fogunk használni Megfigyelhetők amelyet később meghatározunk:

public class ComputeFunction {public static void compute (Integer v) {try {System.out.println ("v v egész számítása:" + v); Szál.alszik (1000); } catch (InterruptedException e) {e.printStackTrace (); }}}

A mi kiszámít() függvény egyszerűen kinyomtatja az argumentumot. A fontos dolog, amit itt észre kell venni, az a Thread.sleep (1000) módszer - azért csinálunk, hogy utánanézzünk valamilyen hosszú távon futó feladatot, amelyet kiváltunk Megfigyelhető hogy gyorsabban feltöltsön elemeket Megfigyelő fogyaszthatja őket.

Kétféle típusú Megfigyelhetők - Forró és Hideg - ezek teljesen különböznek az ellennyomás kezeléséről.

2.1. Hideg Megfigyelhetők

Megfázás Megfigyelhető egy adott tételsorozatot bocsát ki, de elkezdheti kibocsátani ezt a szekvenciát, amikor annak Megfigyelő kényelmesnek találja, és bármilyen sebességgel is Megfigyelő vágyak, anélkül, hogy megzavarnák a sorozat integritását. Hideg Megfigyelhető tárgyakat nyújt lustán.

A Megfigyelő csak akkor vesz elemeket, amikor készen áll az adott elem feldolgozására, és az elemeket nem kell pufferelni egy Megfigyelhető mert húzó módon kérik őket.

Például, ha létrehoz egy Megfigyelhető statikus elemtartomány alapján, egytől egymillióig, az Megfigyelhető ugyanolyan tételsorozatot bocsátana ki, függetlenül attól, hogy milyen gyakran figyelik ezeket az elemeket:

Observable.range (1, 1_000_000) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute);

Amikor elindítjuk programunkat, az elemeket kiszámítja Megfigyelő lustán és húzóformán fogják kérni. A Schedulers.computation () módszer azt jelenti, hogy szeretnénk futtatni a mi Megfigyelő számítási szálkészletén belül RxJava.

A program kimenete a következményből áll kiszámít() metódus meghívva egyenként az an Megfigyelhető:

kiszámítja az v egész számot: 1 kiszámítja az egész v számot: 2 kiszámítja az egész v számot: 3 kiszámítja a v egész számot: 4

Hideg Megfigyelhetők nem kell semmiféle ellennyomás, mert húzóan működnek. Példák a megfázás által kibocsátott tárgyakra Megfigyelhető tartalmazhat adatbázis-lekérdezés, fájlkeresés vagy webes kérés eredményeit.

2.2. Forró Megfigyelhetők

Egy forró Megfigyelhető megkezdi az elemek generálását és létrehozásuk után azonnal kibocsátja azokat. Ez ellentmond a megfázásnak Megfigyelhetők a feldolgozás húzó modellje. Forró Megfigyelhető a saját ütemében bocsát ki elemeket, és a megfigyelők feladata, hogy lépést tartsanak.

Amikor az Megfigyelő nem képes olyan gyorsan fogyasztani az elemeket, mint amennyit egy Megfigyelhető pufferelni vagy más módon kell kezelni őket, mivel feltöltik a memóriát, végül okozzák OutOfMemoryException.

Vegyünk egy példát a forró Megfigyelhető, amely 1 millió darabot állít elő egy olyan végfelhasználó számára, amely ezeket a tételeket feldolgozza. Amikor a kiszámít() módszer a Megfigyelő Minden elem feldolgozása eltart egy ideig, a Megfigyelhető elkezdi feltölteni a memóriát elemekkel, ami a program meghibásodását okozza:

PublishSubject forrás = PublishSubject.create (); source.observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); IntStream.range (1, 1_000_000) .forEach (forrás :: onNext); 

A program futtatása sikertelen lesz a MissingBackpressureException mert nem határoztuk meg a túltermelés kezelésének módját Megfigyelhető.

Példák a forró készülék által kibocsátott tárgyakra Megfigyelhető tartalmazhatnak egér és billentyűzet eseményeket, rendszereseményeket vagy részvényárfolyamokat.

3. Pufferelő túltermelés Megfigyelhető

Az első módszer a túltermelés kezelésére Megfigyelhető egyfajta puffer definiálása azokhoz az elemekhez, amelyeket egy Megfigyelő.

Megtehetjük a puffer () módszer:

PublishSubject forrás = PublishSubject.create (); source.buffer (1024) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

1024 méretű puffer definiálása ad Megfigyelő egy kis idő, hogy utolérjék a túltermelő forrásokat. A puffer olyan elemeket tárol, amelyek még nem voltak feldolgozva.

Növelhetjük a puffer méretét, hogy elegendő hely álljon rendelkezésre az előállított értékekhez.

Ne feledje azonban, hogy általában ez csak ideiglenes javítás lehet mivel a túlcsordulás akkor is megtörténhet, ha a forrás túltermeli az előre jelzett pufferméretet.

4. Kibocsátott tételek kötegelése

Túltermelt elemeket tételezhetünk fel N elem ablakaiban.

Mikor Megfigyelhető gyorsabban gyárt elemeket, mint Megfigyelő feldolgozni tudja ezeket, enyhíthetjük, ha a gyártott elemeket csoportosítjuk, és egy tétel elemet küldünk a címre Megfigyelő amely egyesével képes elemgyűjteményt feldolgozni:

PublishSubject forrás = PublishSubject.create (); source.window (500) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace); 

Használata ablak() módszer érveléssel 500, el fogja mondani Megfigyelhető hogy elemeket csoportosítson az 500 méretű kötegekbe. Ez a technika csökkentheti a túltermelés problémáját Megfigyelhető mikor Megfigyelő gyorsabban képes feldolgozni az elemek egy tételét, összehasonlítva az egyes elemek feldolgozásával.

5. Az elemek kihagyása

Ha az által termelt értékek egy része Megfigyelhető biztonságosan figyelmen kívül hagyható, felhasználhatjuk a mintavételt egy meghatározott időn belül és a fojtás operátorokat.

A módszerek minta() és throttleFirst () az időtartamot veszik paraméterként:

  • Az Sbőséges() A módszer periodikusan megvizsgálja az elemek sorrendjét, és kiadja az utolsó elemet, amelyet a paraméterként megadott időtartamon belül állítottak elő
  • A throttleFirst () metódus adja ki az első elemet, amelyet a paraméterként megadott időtartam után állítottak elő

Az időtartam az az idő, amely után egy adott elem kiválasztásra kerül a létrehozott elemek sorrendjéből. Megadhatunk egy stratégiát az ellennyomás kezelésére az elemek kihagyásával:

PublishSubject forrás = PublishSubject.create (); source.sample (100, TimeUnit.MILLISECONDS) .observeOn (Schedulers.computation ()) .subscribe (ComputeFunction :: compute, Throwable :: printStackTrace);

Meghatároztuk, hogy az elemek kihagyásának stratégiája a minta() módszer. Szeretnénk egy 100 milliszekundumos időtartamú mintát. Ez az elem kibocsátásra kerül a Megfigyelő.

Ne feledje azonban, hogy ezek az operátorok csak csökkentik a downstream értékének vételét Megfigyelő és így még vezethetnek MissingBackpressureException.

6. A töltelék kezelése Megfigyelhető Puffer

Abban az esetben, ha a mintavételi vagy kötegelt stratégiáink nem segítenek egy puffer kitöltésében, stratégiát kell végrehajtanunk az esetek kezelésére, amikor puffer töltődik fel.

Használnunk kell egy onBackpressureBuffer () módszer a megelőzésre BufferOverflowException.

A onBackpressureBuffer () metódus három érvet vesz fel: egy an kapacitása Megfigyelhető puffer, egy puffer feltöltésekor meghívott módszer és stratégia a pufferből elvetendő elemek kezelésére. A túlcsordulás stratégiái a Ellennyomás Túlfolyás osztály.

A puffer feltöltésekor 4féle művelet hajtható végre:

  • ON_OVERFLOW_ERROR - ez az alapértelmezett viselkedés jelzi a BufferOverflowException amikor a puffer megtelt
  • ON_OVERFLOW_DEFAULT - jelenleg ugyanaz, mint ON_OVERFLOW_ERROR
  • ON_OVERFLOW_DROP_LATEST - ha túlcsordulás történne, akkor az aktuális értéket egyszerűen figyelmen kívül hagyják, és csak a régi értékeket adják meg, ha a downstream Megfigyelő kéréseket
  • ON_OVERFLOW_DROP_OLDEST - eldobja a puffer legrégebbi elemét, és hozzáadja az aktuális értéket

Nézzük meg, hogyan adható meg ez a stratégia:

Observable.range (1, 1_000_000) .onBackpressureBuffer (16, () -> {}, BackpressureOverflow.ON_OVERFLOW_DROP_OLDEST) .observeOn (Schedulers.computation ()) .subscribe (e -> {}, Throwable :: printStackTraceTraceTraceTraceTrace 

Itt a túlcsorduló puffer kezelésére vonatkozó stratégiánk a puffer legrégebbi elemének eldobása és a Megfigyelhető.

Ne feledje, hogy az utolsó két stratégia megszakításokat okoz a folyamban, amikor az elemeket kidobják. Ezen kívül nem jeleznek BufferOverflowException.

7. Minden túltermelt elem ledobása

Bármikor a lefelé Megfigyelő nem áll készen egy elem fogadására, használhatunk egy onBackpressureDrop () módszer az elem elhagyására a sorozatból.

Gondolhatunk arra a módszerre, mint egy onBackpressureBuffer () módszer, amelynek puffer kapacitása nulla, stratégiával ON_OVERFLOW_DROP_LATEST.

Ez az operátor akkor hasznos, ha biztonságosan figyelmen kívül hagyhatjuk a forrásból származó értékeket Megfigyelhető (például az egér mozgatásával vagy az aktuális GPS-helyzetjelekkel), mivel később naprakészebb értékek lesznek:

Observable.range (1, 1_000_000) .onBackpressureDrop () .observeOn (Schedulers.computation ()) .doOnNext (ComputeFunction :: compute) .subscribe (v -> {}, Throwable :: printStackTrace);

A módszer, a metódus onBackpressureDrop () megszünteti a túltermelés problémáját Megfigyelhető de óvatosan kell használni.

8. Következtetés

Ebben a cikkben a túltermelés problémáját vizsgáltuk Megfigyelhető és az ellennyomás kezelésének módjai. Megvizsgáltuk az elemek pufferelésének, adagolásának és kihagyásának stratégiáit, amikor a Megfigyelő nem képes olyan gyorsan fogyasztani az elemeket, mint amennyit egy Megfigyelhető.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.


$config[zx-auto] not found$config[zx-overlay] not found