Bevezetés a Hazelcast Jet-be
1. Bemutatkozás
Ebben az oktatóanyagban megismerhetjük a Hazelcast Jet-et. Ez egy elosztott adatfeldolgozó motor, amelyet a Hazelcast, Inc. biztosít, és a Hazelcast IMDG tetejére épül.
Ha szeretne többet megtudni a Hazelcast IMDG-ről, itt van egy cikk az induláshoz.
2. Mi az a Hazelcast Jet?
A Hazelcast Jet egy elosztott adatfeldolgozó motor, amely az adatokat streamként kezeli. Feldolgozhatja az adatbázisban vagy fájlokban tárolt adatokat, valamint a Kafka szerver által közvetített adatokat.
Ezenkívül összesített funkciókat végezhet végtelen adatfolyamokon, úgy, hogy felosztja a folyamokat részhalmazokra és összesítést alkalmaz az egyes részhalmazokra. Ezt a koncepciót a Jet terminológiájában ablakosításnak nevezik.
Telepíthetjük a Jet-et egy gépcsoportba, majd elküldhetjük neki az adatfeldolgozási feladatainkat. A Jet a fürt összes tagját automatikusan feldolgozza. A fürt minden tagja elfogyasztja az adatok egy részét, és ez megkönnyíti az átviteli képesség bármely szintjére történő skálázást.
Íme a Hazelcast Jet tipikus felhasználási esetei:
- Valós idejű adatfolyam-feldolgozás
- Gyors kötegelt feldolgozás
- Java 8 adatfolyamok feldolgozása elosztott módon
- Adatfeldolgozás a mikroszolgáltatásokban
3. Beállítás
A Hazelcast Jet környezetünkbe történő beállításához csak egyetlen Maven-függőséget kell hozzáadnunk a sajátunkhoz pom.xml.
Így csináljuk:
com.hazelcast.jet hazelcast-jet 4.2
Ennek a függőségnek a feltöltésével letölthetünk egy 10 Mb-os jar fájlt, amely biztosítja számunkra az összes infrastruktúrát, amelyre szükségünk van egy elosztott adatfeldolgozó folyamat kiépítéséhez.
A Hazelcast Jet legújabb verziója itt található.
4. Minta alkalmazás
Annak érdekében, hogy többet megtudjon a Hazelcast Jet-ről, létrehozunk egy mintaalkalmazást, amely beveszi a mondatokat és egy szót, hogy megtalálja ezeket a mondatokat, és visszaadja a megadott szavak számát a mondatokban.
4.1. A csővezeték
A Pipeline alkotja a Jet alkalmazás alapkonstrukcióját. A csővezetéken belüli feldolgozás a következő lépéseket követi:
- forrásból származó adatok olvasása
- átalakítsa az adatokat
- írjon adatokat egy mosogatóba
Alkalmazásunkhoz a csővezeték elosztottról fog olvasni Lista, alkalmazza a csoportosítás és az összesítés átalakítását, és végül írjon egy elosztottra Térkép.
Így írjuk a csővezetéket:
privát csővezeték createPipeLine () {Pipeline p = Pipeline.create (); p.readFrom (Források.lista (LIST_NAME)) .flatMap (szó -> traverseArray (word.toLowerCase (). split ("\ W +"))) .filter (word ->! word.isEmpty ()) .groupingKey (egészItem ()) .aggregate (counting ()) .writeTo (Sink.map (MAP_NAME)); visszatérő p; }
Miután olvastunk a forrásból, bejárjuk az adatokat, és szabályos kifejezéssel felosztjuk a térben. Ezt követően kiszűrjük az üres részeket.
Végül csoportosítjuk a szavakat, összesítjük és az eredményeket a Térkép.
4.2. A munka
Most, hogy meghatározzuk a csővezetékünket, létrehozunk egy munkát a vezeték végrehajtására.
Így írjuk a countWord függvény, amely elfogadja a paramétereket és visszaadja a számlálást:
public Hosszú countWord (Mondatok felsorolása, String szó) {long count = 0; JetInstance jet = Jet.newJetInstance (); próbálja ki a {List textList = jet.getList (LIST_NAME) listát; textList.addAll (mondatok); Pipeline p = createPipeLine (); jet.newJob (p) .csatlakozzon (); Térképszám = jet.getMap (MAP_NAME); count = count.get (szó); } végül {Jet.shutdownAll (); } visszatérés száma; }
Először létrehozunk egy Jet példányt a munkánk létrehozása és a csővezeték használata érdekében. Ezután bemásoljuk a bemenetet Lista elosztott listára, hogy az az összes példányon elérhető legyen.
Ezután benyújtunk egy munkát a fent épített csővezeték segítségével. A módszer, a metódus új Munka() olyan futtatható feladatot ad vissza, amelyet a Jet aszinkron módon indított el. A csatlakozik metódus várja a munka befejezését és dob egy kivétel ha a munka hibával fejeződik be.
Amikor a munka befejeződik, az eredmények elosztott módon kerülnek lekérésre Térkép, ahogy a csővezetékünkben meghatároztuk. Szóval, megkapjuk a Térkép a Jet-példányból, és vegye be a szó számát.
Végül leállítottuk a Jet példányt. Fontos, hogy a kivégzésünk befejezése után állítsuk le, as A Jet-példány elindítja a saját szálait. Ellenkező esetben a Java folyamatunk akkor is életben marad, ha a módszerünk kilép.
Itt van egy egység teszt, amely teszteli a Jet számára írt kódot:
@Test public void whenGivenSentencesAndWord_ThenReturnCountOfWord () {Mondatok felsorolása = new ArrayList (); mondatok.add ("Az első másodperc rendben volt, de a második másodperc kemény volt."); WordCounter wordCounter = új WordCounter (); long countSecond = wordCounter.countWord (mondatok, "második"); assertEquals (3, countSecond); }
5. Következtetés
Ebben a cikkben megtudtuk a Hazelcast Jet-et. Ha többet szeretne megtudni róla és annak jellemzőiről, olvassa el a kézikönyvet.
Szokás szerint az ebben a cikkben használt példák kódja a Github oldalon található.