Ütemezők az RxJava-ban

1. Áttekintés

Ebben a cikkben különféle típusokra fogunk összpontosítani Ütemezők hogy az RxJava alapú többszálas programok megírásához fogjuk használni Megfigyelhető előfizetése és figyeldOn mód.

Ütemezők lehetőséget ad arra, hogy meghatározza, hol és milyen valószínűséggel kell végrehajtani a Megfigyelhető lánc.

Megszerezhetjük a Ütemező osztályban leírt gyári módszerekből Ütemezők.

2. Alapértelmezett menetmenet

Alapértelmezés szerint,Az Rx egyszálú ami arra utal, hogy egy Megfigyelhető és az operátorok láncolata, amelyet alkalmazhatunk rá, ugyanazon a szálon értesíti a megfigyelőket Iratkozz fel() módszert nevezzük.

A figyeldOn és feliratkozás a módszerek érvként veszik a Ütemező, ez, amint a neve is mutatja, olyan eszköz, amelyet felhasználhatunk az egyes műveletek ütemezéséhez.

Létrehozzuk a Ütemező a teremtMunkás metódus, amely a Ütemező.Munkás. A munkás elfogadja a műveleteket, és egymás után hajtja végre azokat egyetlen szálon.

Bizonyos értelemben a munkás egy Smaga az ütemező, de nem hivatkozunk rá a-ra Ütemező a zavart elkerülése érdekében.

2.1. Művelet ütemezése

Bármelyiknek ütemezhetünk munkát Ütemező új létrehozásával munkás és néhány művelet ütemezése:

Scheduler ütemező = Schedulers.immediate (); Scheduler.Worker worker = ütemező.createWorker (); munkás.ütemezés (() -> eredmény + = "akció"); Assert.assertTrue (eredmény.egyenlő ("cselekvés"));

Ezután a művelet sorba kerül azon a szálon, amelyhez a dolgozó hozzárendelve van.

2.2. Művelet törlése

Ütemező.Munkás kiterjed Feliratkozás. Felhívás a leiratkozás módszer a munkás a sor kiürülését és az összes függőben lévő feladat törlését eredményezi. Példával láthatjuk:

Scheduler ütemező = Schedulers.newThread (); Scheduler.Worker worker = ütemező.createWorker (); worker.schedule (() -> {result + = "First_Action"; worker.unsubscribe ();}); worker.schedule (() -> eredmény + = "Second_Action"); Assert.assertTrue (eredmény.egyenlő ("Első_Akció"));

A második feladatot soha nem hajtják végre, mert az előtte lévő az egész műveletet törölte. A végrehajtás alatt álló műveletek megszakadnak.

3. Schedulers.newThread

Ez az ütemező egyszerűen elindít egy új szálat, valahányszor igénylik subscribeOn () vagy observOn ().

Ez aligha jó választás, nemcsak a szál indításakor bekövetkező késés miatt, hanem azért is, mert ezt a szálat nem használják fel újra:

Observable.just ("Hello") .observeOn (Schedulers.newThread ()) .doOnNext (s -> result2 + = Thread.currentThread (). GetName ()) .observeOn (Schedulers.newThread ()) .subscribe (s - > eredmény1 + = Szál.currentThread (). getName ()); Szál.alszik (500); Assert.assertTrue (eredmény1.egyenlő ("RxNewThreadScheduler-1")); Assert.assertTrue (eredmény2.egyenlő ("RxNewThreadScheduler-2"));

Amikor az Munkás kész, a szál egyszerűen véget ér. Ez Ütemező csak akkor használható, ha a feladatok durva szemcséjűek: sok időbe telik a teljesítésük, de nagyon kevés van belőlük, így a szálak valószínűleg egyáltalán nem használhatók fel újra.

Scheduler ütemező = Schedulers.newThread (); Scheduler.Worker worker = ütemező.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> eredmény + = "_worker_"); eredmény + = "_End";} ); Szál.alszik (3000); Assert.assertTrue (eredmény.egyenlő ("RxNewThreadScheduler-1_Start_End_worker_"));

Amikor megterveztük a munkás rajta NewThreadScheduler, láttuk, hogy a munkás egy adott szálhoz van kötve.

4. Ütemezők.közvetlen

Ütemezők.közvetlen egy speciális ütemező, amely blokkolással meghív egy feladatot az ügyfélszálon belül, nem pedig aszinkron módon, és a művelet befejezésével tér vissza:

Scheduler ütemező = Schedulers.immediate (); Scheduler.Worker worker = ütemező.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "_Start"; worker.schedule (() -> eredmény + = "_worker_"); eredmény + = "_End";} ); Szál.alszik (500); Assert.assertTrue (eredmény.egyenlő ("main_Start_worker__End"));

Valójában feliratkozás egy Megfigyelhető keresztül azonnali Ütemező jellemzően ugyanolyan hatású, mint ha nem iratkozik fel valamelyikre Sütemező egyáltalán:

Megfigyelhető.just ("Hello") .subscribeOn (Schedulers.immediate ()) .subscribe (s -> eredmény + = Szál.currentThread (). GetName ()); Szál.alszik (500); Assert.assertTrue (eredmény.egyenlő ("main"));

5. Ütemezők.trambulin

A trambulinÜtemező nagyon hasonlít a azonnali mert a feladatokat ugyanabban a szálban ütemezi, hatékonyan blokkolva.

A soron következő feladat azonban akkor hajtódik végre, amikor az összes korábban ütemezett feladat befejeződött:

Megfigyelhető.just (2, 4, 6, 8) .subscribOO (Schedulers.trampoline ()) .subscribe (i -> eredmény + = "" + i); Megfigyelhető.just (1, 3, 5, 7, 9) .subscribeOn (Schedulers.trampoline ()) .subscribe (i -> eredmény + = "" + i); Szál.alszik (500); Assert.assertTrue (eredmény.egyenlő ("246813579"));

Azonnali azonnal meghív egy adott feladatot, míg trambulin várja az aktuális feladat befejezését.

A trambulin’S munkás végrehajtja az összes feladatot azon a szálon, amely ütemezte az első feladatot. Az első hívás menetrend blokkol, amíg a sor kiürül:

Scheduler ütemező = Schedulers.trampoline (); Scheduler.Worker worker = ütemező.createWorker (); worker.schedule (() -> {result + = Thread.currentThread (). getName () + "Start"; worker.schedule (() -> {result + = "_middleStart"; worker.schedule (() -> eredmény + = "_munkás_"); eredmény + = "_middleEnd";}); eredmény + = "_mainEnd";}); Szál.alszik (500); Assert.assertTrue (eredmény .egyenlő ("mainStart_mainEnd_middleStart_middleEnd_worker_"));

6. Ütemezők.-től

Ütemezők belül bonyolultabbak, mint Végrehajtók tól től java.util.egyidejű - ezért külön absztrakcióra volt szükség.

De mivel fogalmilag meglehetősen hasonlóak, nem meglepő, hogy van egy burkoló, amely megfordulhat Végrehajtó -ba Ütemező használni a tól től gyári módszer:

privát ThreadFactory threadFactory (karakterlánc minta) {return new ThreadFactoryBuilder () .setNameFormat (pattern) .build (); } @Test public void givenExecutors_whenSchedulerFrom_thenReturnElements () thrugs InterruptedException {ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched-A-% d")); Scheduler SchedulerA = Schedulers.from (poolA); ExecutorService poolB = newFixedThreadPool (10, threadFactory ("Sched-B-% d")); Scheduler SchedulerB = Schedulers.from (poolB); Megfigyelhető megfigyelhető = Megfigyelhető.teremt (előfizető -> {előfizető.onNext ("Alfa"); előfizető.onNext ("Béta"); előfizető.onCompleted ();}) ;; megfigyelhető .subscribeOn (ütemezőA) .subscribeOn (ütemezőB) .subscribe (x -> eredmény + = Szál.currentThread (). getName () + x + "_", Dobható :: printStackTrace, () -> eredmény + = "_Completed "); Szál.alszik (2000); Assert.assertTrue (eredmény.egyenlő ("Sched-A-0Alfa_Sched-A-0Beta__Completed")); }

ÜtemezőB rövid ideig használják, de alig ütemezi be az új műveletet ütemezőA, amely elvégzi az összes munkát. Így többszörös subscribeOn módszereket nem csak figyelmen kívül hagyják, hanem egy kis rezsit is bevezetnek.

7. Ütemezők.io

Ez Ütemező hasonló a új téma kivéve azt a tényt, hogy a már elindított szálakat újrahasznosítják, és képesek kezelni a jövőbeni kéréseket.

Ez a megvalósítás hasonlóan működik ThreadPoolExecutor tól től java.util.egyidejű korlátlan szálmedencével. Minden alkalommal, amikor egy új munkás vagy egy új szálat indítanak (és később egy ideig tétlenek), vagy a tétlenet újra felhasználják:

Megfigyelhető.just ("io") .subscribeOn (Schedulers.io ()) .subscribe (i -> eredmény + = Szál.currentThread (). GetName ()); Assert.assertTrue (eredmény.egyenlő ("RxIoScheduler-2"));

Óvatosnak kell lennünk bármilyen korlátlan erőforrással - lassú vagy nem reagáló külső függőségek esetén, mint például a webszolgáltatások, ioütemező hatalmas számú szálat indíthat el, ami ahhoz vezet, hogy a saját alkalmazásunk nem reagál.

A gyakorlatban követve Ütemezők.io szinte mindig jobb választás.

8. Ütemezők.számítás

Számítás Sütemező alapértelmezés szerint korlátozza a párhuzamosan futó szálak számát processzorok (), amint az a Runtime.getRuntime () hasznossági osztály.

Tehát a számítási ütemező amikor a feladatok teljes egészében CPU-hoz vannak kötve; vagyis számítási teljesítményre van szükségük, és nincs blokkoló kódjuk.

Minden szál előtt korlátlan sort használ, így ha a feladat ütemezett, de az összes mag foglalt, akkor sorba kerül. Azonban az egyes szálak előtti sor folyamatosan növekszik:

Megfigyelhető.just ("számítás") .subscribeOn (Schedulers.computation ()) .subscribe (i -> eredmény + = Szál.currentThread (). GetName ()); Assert.assertTrue (eredmény.egyenlő ("RxComputationScheduler-1"));

Ha valamilyen okból az alapértelmezettől eltérő számú szálra van szükségünk, akkor mindig használhatjuk a rx.scheduler.max-computation-threads rendszer tulajdonság.

Kevesebb szál használatával biztosíthatjuk, hogy mindig egy vagy több CPU-mag legyen alapjáraton, és még nagy terhelés mellett is, számítás a szálkészlet nem telíti a szervert. Egyszerűen nem lehet több számítási szál, mint mag.

9. Ütemezők.teszt

Ez Ütemező csak tesztelési célokra használják, és soha nem fogjuk látni a gyártási kódban. Fő előnye az óra előrehaladásának képessége, az önkényesen múló idő szimulálása:

Lista betűk = Arrays.asList ("A", "B", "C"); TestScheduler ütemező = Schedulers.test (); TestSubscriber előfizető = new TestSubscriber (); Megfigyelhető pipa = Megfigyelhető .interval (1, TimeUnit.SECONDS, ütemező); Megfigyelhető.from (betűk) .zipWith (pipa, (karakterlánc, index) -> index + "-" + karakterlánc) .subscribeOn (ütemező) .subscribe (előfizető); előfizető.assertNoValues ​​(); subscriber.assertNotCompleted (); ütemező.advanceTimeBy (1, TimeUnit.SECONDS); subscriber.assertNoErrors (); subscriber.assertValueCount (1); subscriber.assertValues ​​("0-A"); ütemező.advanceTimeTo (3, TimeUnit.SECONDS); subscriber.assertCompleted (); subscriber.assertNoErrors (); subscriber.assertValueCount (3); assertThat (előfizető.getOnNextEvents (), hasItems ("0-A", "1-B", "2-C");

10. Alapértelmezett ütemezők

Néhány Megfigyelhető az RxJava operátorainak vannak alternatív formái, amelyek lehetővé teszik számunkra, hogy melyiket állítsuk be Ütemező az üzemeltető felhasználja a működéséhez. Mások nem operálnak különösebbet Ütemező vagy egy adott alapértelmezés szerint működnek Ütemező.

Például a késleltetés üzemeltető veszi az upstream eseményeket, és adott idő után lefelé tolja őket. Nyilvánvaló, hogy ebben az időszakban nem tudja megtartani az eredeti szálat, ezért mást kell használnia Ütemező:

ExecutorService poolA = newFixedThreadPool (10, threadFactory ("Sched1-")); Scheduler SchedulerA = Schedulers.from (poolA); Megfigyelhető.just ('A', 'B') .delay (1, TimeUnit.SECONDS, ütemezőA) .subscribe (i -> eredmény + = Szál.currentThread (). GetName () + i + ""); Szál.alszik (2000); Assert.assertTrue (eredmény.egyenlő ("Sched1-A Sched1-B"));

Anélkül, hogy beszállítana egy szokást ütemezőA, az összes üzemeltető alatt késleltetés használná a számítási ütemező.

Egyéb fontos operátorok, amelyek támogatják a szokásokat Ütemezők vannak puffer, intervallum, hatótávolság, időzítő, kihagy, vesz, időtúllépés, és még sokan mások. Ha nem adjuk meg a Ütemező az ilyen üzemeltetőknek, számítás ütemezőt használnak, ami a legtöbb esetben biztonságos alapértelmezés.

11. Következtetés

Az igazán reaktív alkalmazásokban, amelyeknél az összes régóta futó művelet aszinkron, nagyon kevés szál és így Ütemezők szükségesek.

Az ütemezők elsajátítása elengedhetetlen a skálázható és biztonságos kód RxJava használatával történő írásához. A különbség feliratkozás és figyeldOn különösen fontos nagy terhelés alatt, ahol minden feladatot pontosan akkor kell végrehajtani, amikor elvárjuk.

Végül, de nem utolsósorban biztosnak kell lennünk ebben Ütemezők a downstream felhasználva lépést tud tartani a Ütemezők upstrea m. További információért olvassa el ezt a cikket az ellennyomásról.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.