Bevezetés a reaktor magjába

1. Bemutatkozás

A Reactor Core egy Java 8 könyvtár, amely a reaktív programozási modellt valósítja meg. A reaktív folyamok specifikációjának tetejére épül, amely a reaktív alkalmazások építésének szabványa.

A nem reaktív Java fejlesztés hátterében a reaktívvá válás meglehetősen meredek tanulási görbét jelenthet. Ez még nagyobb kihívást jelent, ha összehasonlítjuk a Java 8-zal Folyam API, mivel összetéveszthetik, hogy ugyanazok a magas szintű absztrakciók.

Ebben a cikkben megpróbáljuk demisztifikálni ezt a paradigmát. Kis lépéseket teszünk a Reaktoron keresztül, amíg elkészítünk egy képet a reaktív kód összeállításáról, megalapozva a fejlettebb cikkeket egy későbbi sorozatban.

2. Reaktív áramok specifikációja

Mielőtt megnéznénk a Reactort, meg kell vizsgálnunk a Reactive Streams specifikációt. A Reactor ezt valósítja meg, és ez alapozza meg a könyvtárat.

Lényegében a reaktív folyamok az aszinkron folyamfeldolgozás specifikációi.

Más szavakkal, egy olyan rendszer, ahol rengeteg eseményt állítanak elő és fogyasztanak aszinkron módon. Gondoljon arra, hogy másodpercenként ezernyi részvényfrissítés folyik be egy pénzügyi alkalmazásba, és annak időben kell reagálnia ezekre a frissítésekre.

Ennek egyik fő célja az ellennyomás problémájának kezelése. Ha van olyan termelőnk, amely gyorsabban bocsát ki eseményeket a fogyasztó számára, mint amennyit képes feldolgozni, akkor a fogyasztót végül elárasztják az események, elfogynak a rendszer erőforrásai.

Az ellennyomás azt jelenti, hogy fogyasztóinknak képesnek kell lenniük megmondani a gyártónak, hogy mennyi adatot kell elküldeniük ennek megakadályozása érdekében, és ezt írja le a specifikáció.

3. Maven-függőségek

Mielőtt belekezdenénk, adjuk hozzá a Maven-függőségeinket:

 io.projectreactor reaktor-mag 3.3.9. KÖZLEMÉNY ch.qos.logback logback-classic 1.1.3 

Felvesszük a Logbacket is függőségként. Ennek oka az, hogy naplózni fogjuk a Reactor kimenetét, hogy jobban megértsük az adatfolyamot.

4. Adatfolyam létrehozása

Annak érdekében, hogy egy alkalmazás reaktív legyen, az első dolog, amire képesnek kell lennie, az az, hogy adatfolyamot állít elő.

Ez valami hasonló lehet a részvényfrissítés példájához, amelyet korábban adtunk. Ezen adatok nélkül nem lenne mire reagálnunk, ezért ez egy logikus első lépés.

A reaktív mag két adattípust ad nekünk, amelyek lehetővé teszik ezt.

4.1. Fényáram

Ennek első módja a Fényáram. Ez egy patak, amely kibocsáthat 0..n elemek. Próbáljunk meg létrehozni egy egyszerűt:

Flux just = Flux.just (1, 2, 3, 4);

Ebben az esetben négy elemből áll egy statikus áram.

4.2. Monó

Ennek második módja az a Monó, ami egy patak 0..1 elemek. Próbáljuk meg példázni:

Mono just = Mono.just (1);

Ez szinte pontosan ugyanúgy néz ki és viselkedik, mint a Fényáram, csak ezúttal legfeljebb egy elemre korlátozódunk.

4.3. Miért nem csak a Flux?

Mielőtt tovább kísérletezne, érdemes kiemelni, hogy miért van ez a két adattípus.

Először is meg kell jegyezni, hogy mind a Fényáram és Monó a reaktív folyamok megvalósításai Kiadó felület. Mindkét osztály megfelel a specifikációnak, és ezt a felületet használhatnánk a helyükön:

Kiadó csak = Mono.just ("foo");

De valóban, ennek a kardinalitásnak az ismerete hasznos. Ez azért van, mert néhány műveletnek csak a két típus egyikére van értelme, és mert kifejezőbb is lehet (képzeld el találj egyet() tárolóban).

5. Feliratkozás egy adatfolyamra

Most magas szintű áttekintésünk van arról, hogyan lehet adatfolyamot előállítani, fel kell iratkoznunk rá, hogy kibocsássa az elemeket.

5.1. Elemek gyűjtése

Használjuk a Iratkozz fel() módszer az összes elem adatgyűjtésére egy adatfolyamban:

Lista elemek = new ArrayList (); Flux.just (1, 2, 3, 4) .log () .subscribe (elements :: add); assertThat (elemek) .contact (1, 2, 3, 4);

Az adatok csak akkor kezdenek áramlani, ha feliratkozunk. Vegye figyelembe, hogy felvettünk néhány naplózást is, ez hasznos lehet, ha megnézzük, mi történik a kulisszák mögött.

5.2. Az elemek áramlása

A helyben történő naplózás segítségével azt szemléltethetjük, hogyan áramlanak az adatok a mi adatfolyamunkon:

20: 25: 19.550 [fő] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | kérés (korlátlan) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 25: 19.553 [main] INFO reaktor.Flux.Array.1 - | onComplete ()

Először is minden a főszálon fut. Ne menjünk bele ennek részleteibe, mivel a cikk későbbi szakaszában tovább vizsgáljuk a párhuzamosságot. A dolgokat mégis egyszerűsíti, mivel mindent rendben tudunk kezelni.

Most menjünk végig azon a sorrenden, amelyet egyesével naplózunk:

  1. onSubscribe () - Ezt akkor hívják, amikor feliratkozunk az adatfolyamunkra
  2. kérelem (korlátlan) - Amikor hívunk Iratkozz fel, a kulisszák mögött létrehozunk egy Feliratkozás. Ez az előfizetés elemeket kér a falról. Ebben az esetben alapértelmezés szerint határtalan, vagyis minden elérhető elemet igényel
  3. onNext () - Ezt hívják meg minden egyes elemre
  4. onComplete () - Ezt utoljára hívják, miután megkapta az utolsó elemet. Valójában van egy onError () is, amelyet hívnának, ha van kivétel, de ebben az esetben nincs

Ez a folyamat lefektetett Előfizető felületet a reaktív folyamok specifikációjának részeként, és a valóságban ezt hívták fel a kulisszák mögé a onSubscribe (). Hasznos módszer, de hogy jobban megértsük, mi történik, adjuk meg a Előfizető interfész közvetlenül:

Flux.just (1, 2, 3, 4) .log () .subscribe (new Subscriber () {@Orride public void onSubscribe (Subscription s) {s.request (Long.MAX_VALUE);} @Orride public void onNext ( Egész egész szám) {elements.add (integer);} @Orride public void onError (Throwable t) {} @Orride public void onComplete () {}});

Láthatjuk, hogy a fenti folyamat minden lehetséges szakasza a Előfizető végrehajtás. Csak úgy történik, hogy a Fényáram segítő módszert biztosított számunkra e bőbeszéd csökkentésére.

5.3. Összehasonlítás a Java 8-zal Patakok

Még mindig úgy tűnhet, hogy van valami szinonimája a Java 8-nak Folyam gyűjt:

Összegyűjtött lista = Stream.of (1, 2, 3, 4) .collect (toList ());

Csak mi nem.

A legfontosabb különbség az, hogy a Reactive egy push modell, míg a Java 8 Patakok pull modellek. Reaktív megközelítésben az események meglökött az előfizetőknek, amint bejönnek.

A következő dolog, amit észre kell venni: a Patakok a terminálüzemeltető csak annyi, hogy terminál, az összes adatot lekéri és eredményt ad vissza. A Reactive segítségével végtelen adatfolyam érkezhet egy külső erőforrásból, több előfizetőt ad hoc alapon csatolva és eltávolítva. Tehetünk olyan dolgokat is, mint például az áramok egyesítése, a fojtószelepek áramlása és az ellennyomás alkalmazása, amelyeket ezután kitérünk.

6. Ellennyomás

A következő dolog, amit figyelembe kell vennünk, az az ellennyomás. Példánkban az előfizető azt mondja a gyártónak, hogy minden egyes elemet nyomjon egyszerre. Ez az előfizető számára elsöprővé válhat, és minden erőforrását felemészti.

Az ellennyomás az, amikor a downstream azt mondhatja egy upstream-nek, hogy küldjön neki kevesebb adatot, hogy megakadályozza a túlterhelést.

Módosíthatjuk Előfizető megvalósítás az ellennyomás alkalmazásához. Mondjuk az upstream-nek, hogy egyszerre csak két elemet küldjön a használatával kérés():

Flux.just (1, 2, 3, 4) .log () .subscribe (new Subscriber () {private Subscription s; int onNextAmount; @Orride public void onSubscribe (Subscription s) {this.s = s; s.request (2);} @Orride public void onNext (Integer integer) {elements.add (integer); onNextAmount ++; if (onNextAmount% 2 == 0) {s.request (2);}} @Override public void onError (Throwable) t) {} @Override public void onComplete () {}});

Most, ha újra futtatjuk a kódunkat, látni fogjuk a kérés (2) hívják, majd kettő következik onNext () akkor hív kérés (2) újra.

23: 31: 15.395 [main] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 23: 31: 15.397 [main] INFO reaktor.Flux.Array.1 - | kérés (2) 23: 31: 15.397 [main] INFO reaktor.Flux.Array.1 - | onNext (1) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | onNext (2) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | kérés (2) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | onNext (3) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | onNext (4) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | kérés (2) 23: 31: 15.398 [main] INFO reaktor.Flux.Array.1 - | onComplete ()

Lényegében ez a reaktív ellennyomás. Azt kérjük az upstream-től, hogy csak bizonyos mennyiségű elemet toljon be, és csak akkor, ha készen állunk.

Ha azt képzeljük, hogy tweeteket közvetítenek a twitterről, akkor az upstream feladata lenne eldönteni, hogy mit tegyen. Ha bejöttek a tweetek, de az alsóbb rétegektől nem érkezik kérés, akkor az upstream eldobhatja az elemeket, tárolhatja őket egy pufferben vagy más stratégiában.

7. Streamelés

Műveleteket is végezhetünk az adatfolyamunkban lévő adatokkal, reagálva az eseményekre, ahogy jónak látjuk.

7.1. Adatok feltérképezése egy adatfolyamban

Egy egyszerű művelet, amelyet elvégezhetünk, egy transzformáció alkalmazása. Ebben az esetben csak duplázzuk meg a streamünk összes számát:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribe (elements :: add);

térkép() mikor kell alkalmazni onNext () nak, nek hívják.

7.2. Két áramlat egyesítése

Ezután érdekesebbé tehetjük a dolgokat, ha egy másik adatfolyamot egyesítünk ezzel. Próbálja ki ezt a használatával postai irányítószám() funkció:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .zipWith (Flux.range (0, Integer.MAX_VALUE), (one, two) -> String.format ("Első fluxus:% d, Második fluxus:% d", egy, kettő)) .subscribe (elements :: add); assertThat (elements) .conactExactly ("Első fluxus: 2, Második fluxus: 0", "Első fluxus: 4, Második fluxus: 1", "Első fluxus: 6, Második fluxus: 2", "Első fluxus: 8, Második Flux: 3 ");

Itt létrehozunk egy másikat Fényáram amely folyamatosan növekszik eggyel és továbbítja az eredetivel. A naplók ellenőrzésével láthatjuk, hogyan működnek ezek együtt:

20: 04: 38.064 [main] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 04: 38.065 [main] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 04: 38.066 [main] INFO reaktor.Flux.Range.2 - | onSubscribe ([Synchronous Fuseable] FluxRange.RangeSubscription) 20: 04: 38.066 [main] INFO reaktor.Flux.Range.2 - | onNext (0) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 04: 38.067 [main] INFO reaktor.Flux.tartomány.2 - | onNext (1) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 04: 38.067 [main] INFO reaktor.Flux.Range.2 - | onNext (2) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 04: 38.067 [main] INFO reaktor.Flux.Range.2 - | onNext (3) 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | onComplete () 20: 04: 38.067 [main] INFO reaktor.Flux.Array.1 - | törlés () 20: 04: 38.067 [main] INFO reaktor.Flux.Range.2 - | megszünteti()

Vegye figyelembe, hogy mostantól egy előfizetésünk van Fényáram. A onNext () a hívások szintén váltakoznak, így a stream minden elemének indexe megegyezik, amikor alkalmazzuk a postai irányítószám() funkció.

8. Források

Jelenleg elsősorban a hideg folyamokra koncentráltunk. Ezek statikus, rögzített hosszúságú folyamok, amelyekkel könnyű kezelni. A reaktív reálisabb felhasználási esete végtelenül történhet.

Például rendelkezhetünk egérmozgásokkal, amelyekre folyamatosan reagálni kell, vagy rendelkezhetünk twitter hírcsatornával. Az ilyen típusú streameket hot streameknek nevezzük, mivel ezek mindig futnak, és bármikor feliratkozhatnak rájuk, hiányozva az adatok kezdetétől.

8.1. A. Létrehozása ConnectableFlux

A forró adatfolyam létrehozásának egyik módja az, ha a hideg folyamot eggyé konvertálja. Hozzunk létre egy Fényáram örökké tartó, az eredmények kinyomtatása a konzol számára, amely egy külső erőforrásból származó végtelen adatfolyamot szimulálna:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .publish ();

Hívással közzétenni () kapunk egy ConnectableFlux. Ez azt a hívást jelenti Iratkozz fel() nem okoz kibocsátást, lehetővé téve számunkra több előfizetés hozzáadását:

publish.subscribe (System.out :: println); publish.subscribe (System.out :: println);

Ha megpróbáljuk futtatni ezt a kódot, semmi sem fog történni. Csak addig hívunk connect (), hogy a Fényáram kibocsátani kezd:

publish.connect ();

8.2. Fojtás

Ha futtatjuk a kódunkat, akkor a konzolunkat elárasztja a naplózás. Ez olyan helyzetet szimulál, amikor túl sok adatot továbbítanak fogyasztóinknak. Próbáljuk meg ezt megkerülni fojtással:

ConnectableFlux publish = Flux.create (fluxSink -> {while (true) {fluxSink.next (System.currentTimeMillis ());}}) .sample (ofSeconds (2)) .publish ();

Itt bemutattuk a minta() módszer két másodperces intervallummal. Az értékeket csak két másodpercenként juttatjuk el előfizetőnkhöz, vagyis a konzol sokkal kevésbé lesz mozgalmas.

Természetesen számos stratégia létezik a továbbiakban elküldött adatok mennyiségének csökkentésére, mint például az ablakolás és a pufferelés, de ezek a cikk hatályán kívül maradnak.

9. Egyidejűség

A fenti példák mindegyike jelenleg a fő szálon fut. Ha akarjuk, szabályozhatjuk azonban, hogy melyik szálon fut a kódunk. A Ütemező interfész absztrakciót biztosít az aszinkron kód körül, amelyhez számos megvalósítás biztosított számunkra. Próbáljunk meg feliratkozni egy másik szálra a main-ra:

Flux.just (1, 2, 3, 4) .log () .map (i -> i * 2) .subscribeOn (Schedulers.parallel ()) .subscribe (elements :: add);

A Párhuzamos Az ütemező az előfizetésünket egy másik szálon futtatja, amit a naplók megtekintésével bizonyíthatunk. Látjuk, hogy az első bejegyzés a fő- szál, és a Flux egy másik szálban fut, az úgynevezett párhuzamos-1.

20:03:27.505 [fő] DEBUG reaktor.util.Loggers $ LoggerFactory - Az Slf4j naplózási keretrendszer használata 20: 03: 27.529 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | onSubscribe ([Synchronous Fuseable] FluxArray.ArraySubscription) 20: 03: 27.531 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | kérés (korlátlan) 20: 03: 27.531 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | onNext (1) 20: 03: 27.531 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | onNext (2) 20: 03: 27.531 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | onNext (3) 20: 03: 27.531 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | onNext (4) 20: 03: 27.531 [párhuzamos-1] INFO reaktor.Flux.Array.1 - | onComplete ()

A párhuzamossági lekérdezés ennél érdekesebb, és érdemes lesz felfedeznünk egy másik cikkben.

10. Következtetés

Ebben a cikkben magas szintű, végpontok közötti áttekintést adtunk a reaktív magról. Elmagyaráztuk, hogyan lehet közzétenni és feliratkozni az adatfolyamokra, alkalmazni az ellennyomást, működtetni az adatfolyamokat és az adatokat aszinkron módon kezelni. Ez remélhetőleg megalapozza a reaktív alkalmazások megírását.

A sorozat későbbi cikkei a fejlettebb párhuzamosságra és más reaktív koncepciókra terjednek ki. Van még egy cikk, amely a Reactort tavasszal ismerteti.

Alkalmazásunk forráskódja a GitHub-on keresztül elérhető; ez egy Maven projekt, amelynek képesnek kell lennie a jelenlegi állapotban történő futtatásra.