Java 9 reaktív adatfolyamok

1. Áttekintés

Ebben a cikkben a Java 9 reaktív folyamokat vesszük szemügyre. Egyszerűen fogalmazva, képesek leszünk használni a Folyam osztály, amely magában foglalja az elsődleges építőelemeket a reaktív folyam feldolgozási logika felépítéséhez.

Reaktív folyamok a nem blokkoló ellennyomású aszinkron folyamfeldolgozás szabványa. Ezt a specifikációt a Reaktív kiáltvány, és különféle megvalósításai vannak, például RxJava vagy Akka-patakok.

2. Reaktív API áttekintése

A Folyam, három fő absztrakciót használhatunk és aszinkron feldolgozási logikába állíthatjuk össze.

Minden Folyam feldolgoznia kell azokat az eseményeket, amelyeket egy Publisher-példány közzétett neki; a Kiadó egy módja van - Iratkozz fel().

Ha valamelyik előfizető az általa közzétett eseményeket szeretné kapni, akkor fel kell iratkoznia az adottra Kiadó.

Az üzenetek fogadójának végre kell hajtania a Előfizető felület. Általában ez a vége mindenkinek Folyam feldolgozás, mert a példánya nem küld tovább üzeneteket.

Gondolkodhatunk Előfizető mint a Mosogató. Ennek négy módszere van, amelyeket felül kell vizsgálni - onSubscribe (), onNext (), onError (), és onComplete (). A következő szakaszban megnézzük azokat.

Ha át akarjuk alakítani a bejövő üzenetet, és továbbadjuk a következőnek Előfizető, végre kell hajtanunk a Processzor felület. Ez mind a Előfizető mert üzeneteket fogad, és mint a Kiadó mert ezeket az üzeneteket feldolgozza és további feldolgozásra küldi.

3. Üzenetek közzététele és fogyasztása

Tegyük fel, hogy egy egyszerűt akarunk létrehozni Folyam, amelyben van egy Kiadó üzenetek közzététele, és egy egyszerű Előfizető üzeneteket fogyaszt, amint megérkeznek - egyenként.

Hozzunk létre egy EndSubscriber osztály. Meg kell valósítanunk a Előfizető felület. Ezután felülírjuk a szükséges módszereket.

A onSubscribe () metódust hívják meg a feldolgozás megkezdése előtt. A Feliratkozás érvelésként kerül átadásra. Ez egy osztály, amely az üzenetek közötti áramlás vezérlésére szolgál Előfizető és a Kiadó:

public class EndSubscriber megvalósítja Subscriber {private Subscription előfizetés; public List usedElements = new LinkedList (); @Orride public void onSubscribe (Subscription subscription) {this.subscription = előfizetés; előfizetés.kérelem (1); }}

Inicializáltunk egy üreset is Lista nak,-nek elfogyasztott elemek ezt felhasználják a tesztekben.

Most végre kell hajtanunk a Előfizető felület. A fő módszer itt az onNext () - ezt hívják, amikor a Kiadó új üzenetet tesz közzé:

@Orride public void onNext (T item) {System.out.println ("Got:" + item); előfizetés.kérelem (1); }

Ne feledje, hogy amikor megkezdtük az előfizetést az onSubscribe () metódust, és amikor feldolgoztunk egy üzenetet, meg kell hívnunk a kérés() módszer a Feliratkozás jelezni, hogy az áram Előfizető készen áll további üzenetek elfogyasztására.

Végül végre kell hajtanunk onError () - amelyet akkor hívnak meg, amikor valamilyen kivétel lesz a feldolgozás dobása, valamint onComplete () - hívták, amikor a Kiadó zárva:

@Orride public void onError (Throwable t) {t.printStackTrace (); } @Orride public void onComplete () {System.out.println ("Kész"); }

Írjunk egy tesztet a feldolgozáshoz Folyam. Használjuk a SubmissionPublisher osztály - egy konstrukció a java.util.egyidejű - amely végrehajtja a Kiadó felület.

Beküldjük N elemei a Kiadó - amely a mi EndSubscriber fog kapni:

@Test public void whenSubscribeToIt_thenShouldConsumeAll () dobja az InterruptedException {// adott SubmissionPublisher publisher = new SubmissionPublisher (); EndSubscriber előfizető = new EndSubscriber (); kiadó.subscribe (előfizető); Lista elemek = List.of ("1", "x", "2", "x", "3", "x"); // when assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (kiadó :: beküldés); kiadó.zárja (); // majd várjon (). atMost (1000, TimeUnit.MILLISECONDS). amíg (() -> assertThat (Subscriber.consumedElements) .containsExactlyElementsOf (items)); }

Ne feledje, hogy hívjuk a Bezárás() metódus a EndSubscriber. Ez hivatkozni fog onComplete () visszahívás alatt minden Előfizető az adott Kiadó.

A program futtatása a következő kimenetet eredményezi:

Got: 1 Got: x Got: 2 Got: x Got: 3 Got: x Kész

4. Üzenetek átalakítása

Tegyük fel, hogy hasonló logikát akarunk kiépíteni az a között Kiadó és a Előfizető, hanem alkalmazzon némi átalakítást.

Létrehozzuk a TransformProcessor osztály, amely megvalósítja Processzor és kiterjeszti SubmissionPublisher - mivel ez mindkettő lesz Publisher és Sfeliratkozó.

Átmegyünk a Funkció amely átalakítja a bemeneteket kimenetekké:

a TransformProcessor nyilvános osztály kiterjeszti a SubmissionPublisher megvalósítja a Flow.Processor {private Function függvényt; privát Flow.Fizetési előfizetés; public TransformProcessor (Funkció függvény) {super (); this. function = function; } @Orride public void onSubscribe (Flow.Subscription subscription) {this.subscription = előfizetés; előfizetés.kérelem (1); } @Orride public void onNext (T elem) {submit (function.apply (item)); előfizetés.kérelem (1); } @Orride public void onError (Throwable t) {t.printStackTrace (); } @Orride public void onComplete () {bezárás (); }}

Nézzük most írj egy gyors tesztet olyan feldolgozási folyamattal, amelyben a Kiadó kiadja Húr elemek.

A mi TransformProcessor elemzi a Húr mint Egész szám - ami azt jelenti, hogy itt átalakításra van szükség:

@Test public void whenSubscribeAndTransformElements_thenShouldConsumeAll () thrumps InterruptedException {// adott SubmissionPublisher publisher = new SubmissionPublisher (); TransformProcessor transformProcessor = új TransformProcessor (Integer :: parseInt); EndSubscriber előfizető = new EndSubscriber (); Lista elemek = List.of ("1", "2", "3"); List várható eredménye: List (1, 2, 3); // when publisher.subscribe (transformProcessor); transformProcessor.subscribe (előfizető); items.forEach (kiadó :: beküldés); kiadó.zárja (); // majd várjuk meg (). atMost (1000, TimeUnit.MILLISECONDS). amíg (() -> assertThat (Subscriber.consumedElements) .containsExactlyElementsOf (várhatóRezult)); }

Ne feledje, hogy a Bezárás() módszer az alapon Kiadó okozza a onComplete () módszer a TransformProcessor hogy hívják.

Ne feledje, hogy a feldolgozási láncban szereplő összes megjelenítőt be kell zárni.

5. Az üzenetek iránti kereslet ellenőrzése a Feliratkozás

Tegyük fel, hogy az Előfizetésből csak az első elemet szeretnénk elfogyasztani, alkalmazni egy kis logikát és befejezni a feldolgozást. Használhatjuk a kérés() módszer ennek elérésére.

Módosítsuk a EndSubscriber hogy csak N számú üzenetet fogyasszon. Át fogjuk adni ezt a számot howMuchMessagesCumsume konstruktor érv:

public class EndSubscriber implementálja Subscriber {private AtomicInteger howMuchMessagesConsume; privát előfizetés előfizetés; public List usedElements = new LinkedList (); public EndSubscriber (Integer howMuchMessagesConsume) {this.howMuchMessagesConsume = new AtomicInteger (howMuchMessagesConsume); } @Orride public void onSubscribe (Subscription subscription) {this.subscription = előfizetés; előfizetés.kérelem (1); } @Orride public void onNext (T elem) {howMuchMessagesConsume.decrementAndGet (); System.out.println ("Megvan:" + elem); felhasználtElementek.add (elem); if (howMuchMessagesConsume.get ()> 0) {előfizetés.request (1); }} // ...}

Kérhetünk elemeket, ameddig csak akarjuk.

Írjunk egy tesztet, amelyben csak egy elemet akarunk elfogyasztani az adottból Feliratkozás:

@Test public void whenRequestForOnlyOneElement_thenShouldConsumeOne () dobja az InterruptedException {// adott SubmissionPublisher Publisher = new SubmissionPublisher (); EndSubscriber előfizető = new EndSubscriber (1); kiadó.subscribe (előfizető); Lista elemek = List.of ("1", "x", "2", "x", "3", "x"); Várható lista = List.of ("1"); // when assertThat (publisher.getNumberOfSubscribers ()). isEqualTo (1); items.forEach (kiadó :: beküldés); kiadó.zárja (); // majd várjuk meg (). atMost (1000, TimeUnit.MILLISECONDS). amíg (() -> assertThat (Subscriber.consumedElements) .containsExactlyElementsOf (várható)); }

Habár a kiadó hat elemet tesz közzé, a mi EndSubscriber csak egy elemet fog fogyasztani, mert csak az egyetlen elem feldolgozásának igényét jelzi.

A kérés() módszer a Feliratkozás, kifinomultabb ellennyomás-mechanizmust valósíthatunk meg az üzenetfogyasztás sebességének szabályozására.

6. Következtetés

Ebben a cikkben a Java 9 reaktív folyamokat tekintettük meg.

Láttuk, hogyan lehet létrehozni egy feldolgozást Folyam amely a Kiadó és a Előfizető. Összetettebb feldolgozási folyamatot hoztunk létre az elemek átalakításával Processzorok.

Végül a Feliratkozás hogy az elemek iránti igényt az Előfizető.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.