Az RxJava API és a Java 9 Flow API közötti különbség

1. Bemutatkozás

A Java Flow API-t a Java 9-ben vezették be a Reactive Stream Specification megvalósításaként.

Ebben az oktatóanyagban először a reaktív folyamokat vizsgáljuk meg. Ezután megismerjük az RxJava és a Flow API kapcsolatát.

2. Mik azok a reaktív folyamok?

A Reaktív Kiáltvány bevezette a Reaktív Áramlatokat, hogy meghatározza a nem blokkoló ellennyomású aszinkron folyamfeldolgozás szabványát.

A reaktív folyam specifikáció hatókörének meghatározása minimális interfészkészlet e célok elérése érdekében:

  • org.reactivestreams.Publisher olyan adatszolgáltató, amely az igényeik alapján adatot tesz közzé az előfizetőknek

  • org.reactivestreams.Subscriber az adatok fogyasztója - adatokat kaphat, miután feliratkozott egy kiadóra

  • org.reactivestreams.Subscription akkor jön létre, amikor a kiadó elfogadja az előfizetőt

  • org.reactivestreams.Processzor előfizető és kiadó egyaránt - feliratkozik egy kiadóra, feldolgozza az adatokat, majd továbbítja a feldolgozott adatokat az előfizetőnek

A Flow API a specifikációból származik. Az RxJava megelőzi, de a 2.0 óta az RxJava is támogatja a specifikációt.

Mélyre megyünk mindkettőbe, de először nézzünk meg egy gyakorlati felhasználási esetet.

3. Használjon esetet

Ehhez az oktatóanyaghoz élő közvetítésű video szolgáltatást fogunk használni.

Az élő közvetítésű videó, ellentétben az igény szerinti videofolyamokkal, nem a fogyasztótól függ. Ezért a szerver a saját ütemében teszi közzé az adatfolyamot, és a fogyasztó felelőssége, hogy alkalmazkodjon.

A legegyszerűbb formában modellünk egy videofolyam-kiadóból és egy videolejátszóból áll, mint előfizető.

Tegyük végre VideoFrame mint adatelemünk:

public class VideoFrame {privát hosszú szám; // további adatmezők // konstruktor, getterek, beállítók}

Ezután menjünk végig egyenként a Flow API és RxJava implementációinkon.

4. Megvalósítás Flow API-val

A JDK 9-ben található Flow API-k megfelelnek a reaktív folyamok specifikációjának. A Flow API segítségével, ha az alkalmazás kezdetben N elemet kér, akkor a kiadó legfeljebb N elemet tol el az előfizetőnek.

A Flow API interfészek mind a java.util.concurrent.Flow felület. Szemantikailag ekvivalensek a megfelelő reaktív folyam társaikkal.

Tegyük végre VideoStreamServer mint a kiadó VideoFrame.

public class VideoStreamServer kiterjeszti a SubmissionPublisher {public VideoStreamServer () {super (Executors.newSingleThreadExecutor (), 5); }}

Meghosszabbítottuk VideoStreamServer tól től SubmissionPublisher közvetlen végrehajtás helyett Flow :: Kiadó. SubmissionPublisher a JDK megvalósítása Flow :: Kiadó az aszinkron kommunikációhoz az előfizetőkkel, így lehetővé teszi a mi VideoStreamServer a saját ütemében kibocsátani.

Ez is hasznos az ellennyomás és a puffer kezelésében, mert mikor SubmissionPublisher :: feliratkozás nevű, létrehozza a BufferedSubscription, majd hozzáadja az új előfizetést az előfizetésláncához. BufferedSubscription pufferelheti a kiadott tételeket SubmissionPublisher # maxBufferCapacity.

Most határozzuk meg Videó lejátszó, amely egy áramot emészt fel VideoFrame. Ezért végre kell hajtania Flow :: Előfizető.

public class VideoPlayer végrehajtja a Flow.Subscriber {Flow.Subscription előfizetés = null; @Orride public void onSubscribe (Flow.Subscription subscription) {this.subscription = előfizetés; előfizetés.kérelem (1); } @Orride public void onNext (VideoFrame item) {log.info ("play # {}", item.getNumber ()); előfizetés.kérelem (1); } @Orride public void onError (Throwable dobható) {log.error ("Hiba történt a video streamingben: {}", dobható.getMessage ()); } @Orride public void onComplete () {log.error ("A videó véget ért"); }}

Videó lejátszó feliratkozik VideoStreamServer, majd sikeres előfizetés után Videó lejátszó::onSubscribe metódust hívják meg, és egy keretet kér. Videó lejátszó:: onNext megkapja a keretet és az új kéréseket. A kért képkockák száma függ a felhasználási esettől és Előfizető megvalósítások.

Végül rakjuk össze a dolgokat:

VideoStreamServer streamServer = új VideoStreamServer (); streamServer.subscribe (új VideoPlayer ()); // videokeretek beküldése ScheduledExecutorService végrehajtó = Executors.newScheduledThreadPool (1); AtomicLong frameNumber = új AtomicLong (); végrehajtó.scheduleWithFixedDelay (() -> {streamServer.offer (új VideoFrame (frameNumber.getAndIncrement ()), (előfizető, videoFrame) -> {subscriber.onError (új RuntimeException ("Frame #" + videoFrame.getNumber () +) az ellennyomás miatt elesett ")); return true;});}, 0, 1, TimeUnit.MILLISECONDS); alvás (1000);

5. Megvalósítás RxJava-val

Az RxJava a ReactiveX Java implementációja. A ReactiveX (vagy Reactive Extensions) projekt célja egy reaktív programozási koncepció kidolgozása. Ez az Observer mintázat, az Iterator minta és a funkcionális programozás kombinációja.

Az RxJava legújabb legfrissebb verziója a 3.x. Az RxJava a 2.x verzió óta támogatja a reaktív adatfolyamokat Folyékony alaposztály, de jelentősebb halmaz, mint a reaktív folyamok több alaposztályával, mint például Folyékony, Megfigyelhető, Egyetlen, Teljes.

Folyékony mivel a reaktív áram megfelelőségének összetevője 0-tól N-ig terjedő áramlás ellennyomás-kezeléssel. Folyékony kiterjed Kiadó a reaktív folyamokból. Ezért sok RxJava operátor elfogadja Kiadó közvetlenül, és lehetővé teszi a közvetlen interoperációt más reaktív áramok megvalósításával.

Készítsük el a videofolyamat-generátorunkat, amely egy végtelen lusta stream:

Stream videoStream = Stream.iterate (new VideoFrame (0), videoFrame -> {// sleep for 1ms; return new VideoFrame (videoFrame.getNumber () + 1);});

Ezután meghatározzuk a Folyékony példány keretek létrehozására külön szálon:

Folyékony .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))

Fontos megjegyezni, hogy egy végtelen adatfolyam elegendő számunkra, de ha rugalmasabb módszerre van szükségünk az adatfolyamunk létrehozásához, akkor Folyékony.hozza létre jó választás.

Flowable .create (new FlowableOnSubscribe () {AtomicLong frame = new AtomicLong (); @Orride public void subscribe (@NonNull FlowableEmitter emitter) {while (true) {emitter.onNext (new VideoFrame (frame.incrementAndGet ())) / / alvás 1 ms-ig a szimultán késleltetésig}}}, / * Itt állítsa be az ellennyomás-stratégiát

Ezután a következő lépésben a VideoPlayer feliratkozik erre a Folyamatosra, és külön szálon figyeli az elemeket.

videoFlowable .observeOn (Schedulers.from (Executors.newSingleThreadExecutor ()))) .subscribe (item -> {log.info ("play #" + item.getNumber ()); // aludjon 30 ms-ot a keret megjelenítésének szimulálásához}) ;

És végül konfiguráljuk az ellennyomás stratégiáját. Ha keretkiesés esetén le akarjuk állítani a videót, akkor használnunk kell EllennyomásOverflowStrategy :: HIBA amikor a puffer megtelt.

Folyékony .fromStream (videoStream) .subscribeOn (Schedulers.from (Executors.newSingleThreadExecutor ())) .onBackpressureBuffer (5, null, BackpressureOverflowStrategy.ERROR) .observeOn (Schedulers.from (Executors). > {log.info ("play #" + item.getNumber ()); // aludjon 30 ms-ig a keret megjelenítésének szimulálásához});

6. Az RxJava és a Flow API összehasonlítása

Még ebben a két egyszerű megvalósításban is láthatjuk, hogy az RxJava API milyen gazdag, különösen a pufferkezelés, a hibakezelés és az ellennyomás stratégia szempontjából. Több lehetőséget és kevesebb kódsort ad nekünk folyékony API-jával. Most vegyük fontolóra a bonyolultabb eseteket.

Tegyük fel, hogy lejátszónk nem képes videokereteket megjeleníteni kodek nélkül. Ezért a Flow API-val meg kell valósítanunk a Processzor a kodek szimulálására és a szerver és a lejátszó között ülésre. Az RxJava segítségével meg tudjuk csinálni Folyékony :: flatMap vagy Folyékony :: térkép.

Vagy képzeljük el, hogy a lejátszónk élő fordító audiót is sugároz, ezért egyesítenünk kell a különféle kiadóktól származó video- és hangfolyamokat. Az RxJava segítségével használhatjuk Folyékony :: combLatest, de a Flow API-val nem könnyű feladat.

Bár lehet szokást írni Processzor amely feliratkozik mindkét adatfolyamra és elküldi az egyesített adatokat a mi Videó lejátszó. A megvalósítás azonban fejtörést okoz.

7. Miért Flow API?

Ezen a ponton kérdésünk lehet, mi a filozófia a Flow API mögött?

Ha a JDK-ban Flow API-használatra keresünk, találhatunk benne valamit java.net.http és jdk.internal.net.http.

Ezenkívül adaptereket is találhatunk a reaktor projektben vagy a reaktív áram csomagban. Például, org.reactivestreams.FlowAdapters rendelkezik módszerekkel a Flow API-csatolók átalakítására reaktív adatfolyamokká és fordítva. Ezért reaktív adatfolyam támogatással segíti a Flow API és a könyvtárak közötti átjárhatóságot.

Mindezek a tények segítenek megérteni a Flow API célját: A JDK reaktív specifikációs interfészeinek csoportjaként hozták létre, harmadik fél közvetítése nélkül. Ezenkívül a Java elvárja, hogy a Flow API-t elfogadják a reaktív specifikációk szabványos interfészeiként, és hogy azokat JDK-ban vagy más Java-alapú könyvtárakban használják, amelyek megvalósítják a köztes eszközök és segédprogramok reaktív specifikációját.

8. Következtetések

Ebben az oktatóanyagban bemutatjuk a reaktív adatfolyam specifikációját, a Flow API-t és az RxJava alkalmazást.

Ezenkívül láttunk egy gyakorlati példát a Flow API és az RxJava megvalósításairól egy élő videofolyamra.

De a Flow API és az RxJava minden szempontja tetszik Flow :: Processzor, Folyékony :: térkép és Folyékony :: flatMap vagy az ellennyomás stratégiák nem tartoznak ide.

Mint mindig, a bemutató teljes kódját a GitHubon találja meg.