Útmutató az Akka-patakokhoz

1. Áttekintés

Ebben a cikkben megnézzük a akka-patakok könyvtár, amely az Akka színészkeret tetején épül, amely ragaszkodik a reaktív folyamok manifesztumához. Az Akka Streams API lehetővé teszi számunkra, hogy könnyen összeállítsuk az adatátalakítási folyamatokat független lépésekből.

Ezenkívül az összes feldolgozás reaktív, nem blokkoló és aszinkron módon történik.

2. Maven-függőségek

A kezdéshez hozzá kell adnunk a akka-patak és akka-stream-testkit könyvtárakat pom.xml:

 com.typesafe.akka akka-stream_2.11 2.5.2 com.typesafe.akka akka-stream-testkit_2.11 2.5.2 

3. Akka Streams API

Az Akka Streamekkel való együttműködéshez tisztában kell lennünk az alapvető API-fogalmakkal:

  • Forrás - a feldolgozás belépési pontja a akka-patak könyvtár - több forrásból is létrehozhatunk ennek az osztálynak egy példányát; például használhatjuk a egyetlen() metódust, ha a Forrás egyetlen Húr, vagy létrehozhatunk egy Forrás egy an Iterálható elemekből
  • Folyam - a fő feldolgozó építőelem - minden Folyam a példánynak van egy bemeneti és egy kimeneti értéke
  • Materializer - we használhat egyet, ha akarjuk Folyam hogy legyen néhány mellékhatása, például naplózása vagy eredményeinek mentése; leggyakrabban a Nem használt álnév mint a Materializer jelölni, hogy a mi Folyam nem lehetnek mellékhatásai
  • Mosogató működés - amikor építünk egy Folyam, addig nem hajtják végre, amíg nem regisztráljuk a Mosogató művelet rajta - ez egy terminális művelet, amely az egész számítást kiváltja Folyam

4. Teremtés Áramlik az Akka-patakokban

Kezdjük egy egyszerű példa felépítésével, ahol megmutatjuk, hogyan kell többszörös létrehozása és kombinálása Folyams - egész számfolyam feldolgozása és az egész párok átlagos mozgó ablakának kiszámítása a folyamból.

Pontosítóval elválasztott elemzést fogunk végezni Húr egész számok bemenetként a mi létrehozásához akka-patak Forrás például.

4.1. Használva Folyam a bemenet elemzéséhez

Először hozzunk létre egy DataImporter osztály, amely példányt fog venni a ActorSystem hogy később felhasználjuk a sajátunk létrehozására Folyam:

nyilvános osztály DataImporter {private ActorSystem activSystem; // szabványos kivitelezők, szerelők ...}

Ezután hozzunk létre egy parseLine módszer, amely a Lista nak,-nek Egész szám behatárolt bemenetünkből Húr. Ne feledje, hogy a Java Stream API-t itt csak elemzéshez használjuk:

privát lista parseLine (String line) {String [] mezők = line.split (";"); return Arrays.stream (mezők) .map (Integer :: parseInt) .collect (Collectors.toList ()); }

Kezdõlapunk Folyam alkalmazni fogja parseLine hozzájárulásunkhoz a Folyam bemeneti típussal Húr és a kimenet típusa Egész szám:

privát Flow parseContent () {return Flow.of (String.class) .mapConcat (this :: parseLine); }

Amikor felhívjuk a parseLine () módszerrel, a fordító tudja, hogy a lambda függvény argumentuma a Húr - ugyanaz, mint a bemenet típusa Folyam.

Ne feledje, hogy a mapConcat () módszer - egyenértékű a Java 8-zal flatMap () módszer - mert le akarjuk lapítani a Lista nak,-nek Egész szám által visszatért parseLine () ba be Folyam nak,-nek Egész szám hogy feldolgozásunk további lépéseinek ne kelljen foglalkozniuk a Lista.

4.2. Használva Folyam a számítások elvégzéséhez

Ezen a ponton megvan a miénk Folyam elemzett egész számok. Most meg kell implementálja a logikát, amely az összes beviteli elemet párokba csoportosítja, és kiszámítja ezen párok átlagát.

Most meg fogjuk hozzon létre egy Folyam nak,-nek Egész száms és csoportosítsa őket a csoportosítva () módszer.

Ezután egy átlagot akarunk kiszámítani.

Mivel minket nem érdekel az átlagok feldolgozásának sorrendje, igen a párhuzamosan több szál felhasználásával kiszámított átlagokat a mapAsyncUnordered () módszer, argumentumként átadva a szálak számát ennek a módszernek.

Az a művelet, amelyet lambda néven adnak át a Folyam vissza kell adnia a CompletableFuture mert ezt a műveletet aszinkron módon számoljuk ki a különálló szálban:

private Flow computeAverage () {return Flow.of (Integer.class) .grouped (2) .mapAsyncUnordered (8, egész szám -> CompletableFuture.supplyAsync (() -> integers.stream () .mapToDouble (v -> v). átlag () .vagyElse (-1,0))); }

Nyolc párhuzamos szálban számoljuk az átlagokat. Ne feledje, hogy a Java 8 Stream API-t használjuk az átlag kiszámításához.

4.3. Többszörös komponálása Áramlik szingliként Folyam

A Folyam Az API folyékony absztrakció, amely lehetővé teszi számunkra többszöröset komponálni Folyam a végső feldolgozási cél eléréséhez. Granulált folyamataink lehetnek, ahol például az egyik elemzi JSON, egy másik átalakul, egy másik pedig statisztikákat gyűjt.

Az ilyen részletesség segít abban, hogy több tesztelhető kódot hozzunk létre, mert minden egyes feldolgozási lépést függetlenül tesztelhetünk.

Két fenti áramlást hoztunk létre, amelyek egymástól függetlenül működhetnek. Most együtt szeretnénk összeállítani őket.

Először szeretnénk elemezni a bemenetünket Húr, és ezután egy elemáram átlagát akarjuk kiszámítani.

Összeállíthatjuk az áramlásainkat a keresztül() módszer:

Flow calcAverage () {return Flow.of (String.class) .via (parseContent ()) .via (computeAverage ()); }

Létrehoztuk a Folyam amelynek bemeneti típusa van Húr és két másik áramlik utána. A parseContent ()Folyam vesz egy Húr beviszi és visszatér egy Egész szám kimenetként. A computeAverage () Flow ezt veszi Egész szám és kiszámítja az átlagos hozamot Kettős mint a kimenet típusa.

5. Hozzáadás Mosogató hoz Folyam

Mint említettük, e pontig az egész Folyam még nem hajtják végre, mert lusta. A. Végrehajtásának megkezdéséhez Folyam meg kell határoznunk a Mosogató. A Mosogató A művelet például adatokat menthet egy adatbázisba, vagy eredményeket küldhet valamilyen külső webszolgáltatásnak.

Tegyük fel, hogy van egy AverageRepository osztály a következőkkel mentés() módszer, amely eredményeket ír az adatbázisunkba:

CompletionStage save (Dupla átlag) {return CompletableFuture.supplyAsync (() -> {// írjon az adatbázis visszatérési átlagához;}); }

Most szeretnénk létrehozni egy Mosogató művelet, amely ezt a módszert használja a Folyam feldolgozás. Hogy létrehozza a mi Mosogató, először nekünk kell hozzon létre egy Folyam amely feldolgozásunk eredményét veszi input típusnak. Ezután minden eredményünket el akarjuk menteni az adatbázisba.

Megint nem törődünk az elemek sorrendjével, ezért megtehetjük végre a mentés() párhuzamosan végzett műveletek használni a mapAsyncUnordered () módszer.

A Mosogató tól Folyam hívnunk kell a toMat () val vel Sink.ignore () első érvként és Tarts jobbra() másodikként, mert vissza akarjuk adni a feldolgozás állapotát:

privát Mosogató storeAverages () {return Flow.of (Double.class) .mapAsyncUnordered (4, averageRepository :: save) .toMat (Sink.ignore (), Keep.right ()); }

6. Forrás meghatározása a Folyam

Az utolsó dolog, amit meg kell tennünk, az hozzon létre egy Forrás a bemenetből Húr. Alkalmazhatjuk a calcAverage ()Folyam erre a forrásra a keresztül() módszer.

Ezután adja hozzá a Mosogató a feldolgozáshoz meg kell hívnunk a runWith () módszer és adja át a storeAverages () Mosogató amit most hoztunk létre:

CompletionStage calcAverageForContent (String content) {return Source.single (content) .via (calcAverage ()) .runWith (storeAverages (), ActorMaterializer.create (actorSystem)) .whenComplete ((d, e) -> {if (d! = null) {System.out.println ("Importálás befejeződött");} else {e.printStackTrace ();}}); }

Vegye figyelembe, hogy a feldolgozás befejeztével hozzáadjuk a whenComplete () visszahívás, amelyben a feldolgozás eredményétől függően bizonyos műveleteket hajthatunk végre.

7. Tesztelés Akka-patakok

Kipróbálhatjuk feldolgozásunkat a akka-stream-testkit.

A feldolgozás tényleges logikájának tesztelésének legjobb módja az összes tesztelése Folyam logika és használat TestSink a számítás kiváltására és az eredmények érvényesítésére.

Tesztünk során létrehozzuk a Folyam hogy tesztelni akarjuk, és ezután létrehozunk egy Forrás a teszt bemeneti tartalomból:

@Test public void givenStreamOfIntegers_whenCalculateAverageOfPairs_thenShouldReturnProperResults () {// megadott Flow tesztelve = új DataImporter (actorSystem) .calculateAverage (); Karakterlánc bemenet = "1; 9; 11; 0"; // amikor Forrásáram = Forrás.egyszeres (bemenet) .via (tesztelve); // majd folyjon .runWith (TestSink.probe (actorSystem), ActorMaterializer.create (actorSystem)) .request (4) .expectNextUnordered (5d, 5.5); }

Ellenőrizzük, hogy négy bemeneti argumentumra számítunk-e, és két átlagértékű eredmény bármilyen sorrendben érkezhet, mivel a feldolgozásunk aszinkron és párhuzamos módon történik.

8. Következtetés

Ebben a cikkben a akka-patak könyvtár.

Meghatároztunk egy folyamatot, amely többszöröset ötvöz Áramlik az elemek mozgó átlagának kiszámításához. Ezután meghatároztuk a Forrás vagyis az adatfolyam feldolgozásának belépési pontja és a Mosogató ami kiváltja a tényleges feldolgozást.

Végül írtunk egy tesztet a feldolgozásunkra a akka-stream-testkit.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.


$config[zx-auto] not found$config[zx-overlay] not found