Bevezetés a Hazelcast Jet-be

1. Bemutatkozás

Ebben az oktatóanyagban megismerhetjük a Hazelcast Jet-et. Ez egy elosztott adatfeldolgozó motor, amelyet a Hazelcast, Inc. biztosít, és a Hazelcast IMDG tetejére épül.

Ha szeretne többet megtudni a Hazelcast IMDG-ről, itt van egy cikk az induláshoz.

2. Mi az a Hazelcast Jet?

A Hazelcast Jet egy elosztott adatfeldolgozó motor, amely az adatokat streamként kezeli. Feldolgozhatja az adatbázisban vagy fájlokban tárolt adatokat, valamint a Kafka szerver által közvetített adatokat.

Ezenkívül összesített funkciókat végezhet végtelen adatfolyamokon, úgy, hogy felosztja a folyamokat részhalmazokra és összesítést alkalmaz az egyes részhalmazokra. Ezt a koncepciót a Jet terminológiájában ablakosításnak nevezik.

Telepíthetjük a Jet-et egy gépcsoportba, majd elküldhetjük neki az adatfeldolgozási feladatainkat. A Jet a fürt összes tagját automatikusan feldolgozza. A fürt minden tagja elfogyasztja az adatok egy részét, és ez megkönnyíti az átviteli képesség bármely szintjére történő skálázást.

Íme a Hazelcast Jet tipikus felhasználási esetei:

  • Valós idejű adatfolyam-feldolgozás
  • Gyors kötegelt feldolgozás
  • Java 8 adatfolyamok feldolgozása elosztott módon
  • Adatfeldolgozás a mikroszolgáltatásokban

3. Beállítás

A Hazelcast Jet környezetünkbe történő beállításához csak egyetlen Maven-függőséget kell hozzáadnunk a sajátunkhoz pom.xml.

Így csináljuk:

 com.hazelcast.jet hazelcast-jet 4.2 

Ennek a függőségnek a feltöltésével letölthetünk egy 10 Mb-os jar fájlt, amely biztosítja számunkra az összes infrastruktúrát, amelyre szükségünk van egy elosztott adatfeldolgozó folyamat kiépítéséhez.

A Hazelcast Jet legújabb verziója itt található.

4. Minta alkalmazás

Annak érdekében, hogy többet megtudjon a Hazelcast Jet-ről, létrehozunk egy mintaalkalmazást, amely beveszi a mondatokat és egy szót, hogy megtalálja ezeket a mondatokat, és visszaadja a megadott szavak számát a mondatokban.

4.1. A csővezeték

A Pipeline alkotja a Jet alkalmazás alapkonstrukcióját. A csővezetéken belüli feldolgozás a következő lépéseket követi:

  • forrásból származó adatok olvasása
  • átalakítsa az adatokat
  • írjon adatokat egy mosogatóba

Alkalmazásunkhoz a csővezeték elosztottról fog olvasni Lista, alkalmazza a csoportosítás és az összesítés átalakítását, és végül írjon egy elosztottra Térkép.

Így írjuk a csővezetéket:

privát csővezeték createPipeLine () {Pipeline p = Pipeline.create (); p.readFrom (Források.lista (LIST_NAME)) .flatMap (szó -> traverseArray (word.toLowerCase (). split ("\ W +"))) .filter (word ->! word.isEmpty ()) .groupingKey (egészItem ()) .aggregate (counting ()) .writeTo (Sink.map (MAP_NAME)); visszatérő p; }

Miután olvastunk a forrásból, bejárjuk az adatokat, és szabályos kifejezéssel felosztjuk a térben. Ezt követően kiszűrjük az üres részeket.

Végül csoportosítjuk a szavakat, összesítjük és az eredményeket a Térkép.

4.2. A munka

Most, hogy meghatározzuk a csővezetékünket, létrehozunk egy munkát a vezeték végrehajtására.

Így írjuk a countWord függvény, amely elfogadja a paramétereket és visszaadja a számlálást:

public Hosszú countWord (Mondatok felsorolása, String szó) {long count = 0; JetInstance jet = Jet.newJetInstance (); próbálja ki a {List textList = jet.getList (LIST_NAME) listát; textList.addAll (mondatok); Pipeline p = createPipeLine (); jet.newJob (p) .csatlakozzon (); Térképszám = jet.getMap (MAP_NAME); count = count.get (szó); } végül {Jet.shutdownAll (); } visszatérés száma; }

Először létrehozunk egy Jet példányt a munkánk létrehozása és a csővezeték használata érdekében. Ezután bemásoljuk a bemenetet Lista elosztott listára, hogy az az összes példányon elérhető legyen.

Ezután benyújtunk egy munkát a fent épített csővezeték segítségével. A módszer, a metódus új Munka() olyan futtatható feladatot ad vissza, amelyet a Jet aszinkron módon indított el. A csatlakozik metódus várja a munka befejezését és dob egy kivétel ha a munka hibával fejeződik be.

Amikor a munka befejeződik, az eredmények elosztott módon kerülnek lekérésre Térkép, ahogy a csővezetékünkben meghatároztuk. Szóval, megkapjuk a Térkép a Jet-példányból, és vegye be a szó számát.

Végül leállítottuk a Jet példányt. Fontos, hogy a kivégzésünk befejezése után állítsuk le, as A Jet-példány elindítja a saját szálait. Ellenkező esetben a Java folyamatunk akkor is életben marad, ha a módszerünk kilép.

Itt van egy egység teszt, amely teszteli a Jet számára írt kódot:

@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord () {Mondatok felsorolása = new ArrayList (); mondatok.add ("Az első másodperc rendben volt, de a második másodperc kemény volt."); WordCounter wordCounter = új WordCounter (); long countSecond = wordCounter.countWord (mondatok, "második"); assertEquals (3, countSecond); }

5. Következtetés

Ebben a cikkben megtudtuk a Hazelcast Jet-et. Ha többet szeretne megtudni róla és annak jellemzőiről, olvassa el a kézikönyvet.

Szokás szerint az ebben a cikkben használt példák kódja a Github oldalon található.


$config[zx-auto] not found$config[zx-overlay] not found