Megfigyelhető elemek egyesítése az RxJava-ban

1. Bemutatkozás

Ebben a gyors bemutatóban a kombinálás különböző módjait tárgyaljuk Megfigyelhetők az RxJava-ban.

Ha még nem ismeri az RxJava alkalmazást, mindenképpen nézze meg először ezt a bevezető oktatóanyagot.

Most ugorjunk be.

2. Megfigyelhetők

Megfigyelhető szekvenciák, vagy egyszerűen Megfigyelhetők, aszinkron adatfolyamok ábrázolása.

Ezek a Megfigyelő mintán alapulnak, ahol egy objektum nevű Megfigyelő, feliratkozik egy Megfigyelhető.

Az előfizetés nem blokkoló, mivel Megfigyelő reagál, bármire is Megfigyelhető a jövőben bocsát ki. Ez pedig megkönnyíti az egyidejűséget.

Íme egy egyszerű bemutató az RxJava-ban:

Megfigyelhető .from (új karakterlánc [] {"John", "Doe"}) .subscribe (név -> System.out.println ("Hello" + név))

3. Megfigyelhető elemek egyesítése

A reaktív keretrendszer használatával történő programozáskor gyakran előfordul, hogy különbözőeket kombinálunk Megfigyelhetők.

Egy webalkalmazásban például két aszinkron adatfolyamot kell beszereznünk, amelyek függetlenek egymástól.

Ahelyett, hogy megvárnánk az előző adatfolyam befejezését, mielőtt a következő adatfolyamot kérnénk, egyszerre hívhatjuk mindkettőt, és feliratkozhatunk az egyesített streamekre.

Ebben a részben megvitatjuk a többféle kombinálás különböző módjait Megfigyelhetők az RxJava és az egyes módszerek különböző felhasználási esetei.

3.1. Összeolvad

Használhatjuk a összeolvad operátor kombinálni a többszörös kimenetét Megfigyelhetők hogy úgy viselkedjenek, mint egy:

@Test public void givenTwoObservables_whenMerged_shouldEmitCombinedResults () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.merge (Observable.from (new String [] {"Hello", "World"}), Observable.from (new String [] {"I love", "RxJava"})) .subscribe (testSubscriber); testSubscriber.assertValues ​​("Hello", "World", "I love", "RxJava"); }

3.2. MergeDelayError

A mergeDelayError módszer megegyezik a összeolvad annyiban egyesíti Megfigyelhetők egybe, de ha az egyesítés során hibák jelentkeznek, akkor a hibamentes elemek folytatódhatnak a hibák terjesztése előtt:

@Test public void givenMutipleObservablesOneThrows_whenMerged_thenCombineBeforePropagatingError () {TestSubscriber testSubscriber = new TestSubscriber (); Observable.mergeDelayError (Observable.from (new String [] {"helló", "world"}), Observable.error (új RuntimeException ("Néhány kivétel")), Observable.from (új String [] {"rxjava"}) )) .subscribe (testSubscriber); testSubscriber.assertValues ​​("hello", "world", "rxjava"); testSubscriber.assertError (RuntimeException.class); }

A fenti példa kiadja az összes hibamentes értéket:

hello world rxjava

Vegye figyelembe, hogy ha használjuk összeolvad ahelyett mergeDelayError, a Húrrxjava ” nem bocsátják ki, mert összeolvad azonnal leállítja az adatfolyamot innen Megfigyelhetők amikor hiba lép fel.

3.3. Postai irányítószám

A postai irányítószám kiterjesztési módszer két értéksorozatot hoz össze párként:

@Test public void givenTwoObservables_whenZipped_thenReturnCombinedResults () {List zippedStrings = new ArrayList (); Observable.zip (Observable.from (új karakterlánc [] {"Egyszerű", "Mérsékelt", "Komplex"}), Observable.from (új karakterlánc [] {"Megoldások", "Siker", "Hierarchia"}), (str1, str2) -> str1 + "" + str2) .subscribe (zippedStrings :: add); assertThat (zippedStrings) .isNotEmpty (); assertThat (zippedStrings.size ()). isEqualTo (3); assertThat (zippedStrings) .contains ("Egyszerű megoldások", "Mérsékelt siker", "Komplex hierarchia"); }

3.4. Zip Intervallummal

Ebben a példában egy adatfolyamot fogunk tömöríteni intervallum ami valójában késlelteti az első áram elemeinek kibocsátását:

@Test public void givenAStream_whenZippedWithInterval_shouldDelayStreamEmmission () {TestSubscriber testSubscriber = new TestSubscriber (); Megfigyelhető adatok = Megfigyelhető.csak ("egy", "kettő", "három", "négy", "öt"); Megfigyelhető intervallum = Megfigyelhető.intervál (1L, TimeUnit.SECONDS); Megfigyelhető .zip (adatok, intervallum, (strData, pipa) -> String.format ("[% d] =% s", tick, strData)) .toBlocking (). Előfizetés (testSubscriber); testSubscriber.assertCompleted (); testSubscriber.assertValueCount (5); testSubscriber.assertValues ​​("[0] = egy", "[1] = kettő", "[2] = három", "[3] = négy", "[4] = öt"); }

4. Összefoglalás

Ebben a cikkben néhány kombinációs módszert láthattunk Megfigyelhetők RxJava-val. Megismerhet más módszereket, például combLestest, csatlakozik, groupJoin, switchOnNext, a hivatalos RxJava dokumentációban.

Mint mindig, a cikk forráskódja is elérhető a GitHub repóban.