Adatvezeték építése a Kafka, a Spark Streaming és a Cassandra segítségével

1. Áttekintés

Az Apache Kafka egy skálázható, nagy teljesítményű, alacsony késési platform lehetővé teszi az adatfolyamok olvasását és írását, mint egy üzenetkezelő rendszer. A Java-ban lévő Kafka-val meglehetősen könnyen kezdhetünk.

A Spark Streaming az Apache Spark platform része lehetővé teszi az adatfolyamok skálázható, nagy áteresztőképességű, hibatűrő feldolgozását. Bár a Scala nyelven íródott, a Spark Java API-kat kínál a munkához.

Apache Cassandra a elosztott és széles oszlopú NoSQL adattár. A Cassandráról további részletek az előző cikkünkben találhatók.

Ebben az oktatóanyagban ezeket egyesítjük a nagymértékben méretezhető és hibatűrő adatcsatorna valós idejű adatfolyamhoz.

2. Telepítések

Először is szükségünk lesz Kafka, Spark és Cassandra telepítésre a gépünkre az alkalmazás futtatásához. Majd meglátjuk, hogyan lehet e platformok segítségével kifejleszteni egy adatcsatornát.

Hagyunk azonban minden alapértelmezett konfigurációt, beleértve a portokat is, minden telepítéshez, ami elősegíti az oktatóanyag zökkenőmentes futtatását.

2.1. Kafka

A Kafka telepítése helyi gépünkre meglehetősen egyszerű és megtalálható a hivatalos dokumentáció részeként. A Kafka 2.1.0 kiadását fogjuk használni.

Továbbá, Kafka az Apache Zookeepert futtatja de ennek az oktatóanyagnak a felhasználásával a Kafkához csomagolt egyetlen csomópontú Zookeeper példányt fogjuk felhasználni.

Miután sikerült elindítanunk a Zookeepert és a Kafkát a hivatalos útmutató nyomán, folytathatjuk az "üzenetek" nevű témánk létrehozását:

 $ KAFKA_HOME $ \ bin \ windows \ kafka-topics.bat --create \ --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 \ --topic messages

Ne feledje, hogy a fenti szkript Windows platformra vonatkozik, de vannak hasonló szkriptek Unix-szerű platformokhoz is.

2.2. Szikra

A Spark a Hadoop kliens könyvtárait használja a HDFS és a YARN számára. Következésképpen, nagyon bonyolult összeállítani mindezek kompatibilis verzióit. A Spark hivatalos letöltése azonban előre csomagolva van a Hadoop népszerű verzióival. Ehhez az oktatóanyaghoz az „Apache Hadoop 2.7 és újabb verziók számára előre elkészített” 2.3.0 csomagot fogjuk használni.

Miután kibontotta a megfelelő Spark csomagot, a rendelkezésre álló szkriptek felhasználhatók pályázatok benyújtására. Ezt később látni fogjuk, amikor kifejlesztjük a Spring Boot alkalmazásunkat.

2.3. Cassandra

A DataStax elérhetővé teszi a Cassandra közösségi kiadását különböző platformokhoz, beleértve a Windows rendszert is. A hivatalos dokumentációt követve ezt nagyon egyszerűen letölthetjük és telepíthetjük helyi gépünkre. A 3.9.0 verziót fogjuk használni.

Miután sikerült telepíteni és elindítani a Cassandrát a helyi gépünkön, folytathatjuk a kulcstér és a táblázat létrehozását. Ez a telepítéssel együtt szállított CQL Shell használatával történhet:

CREPA KEYSPACE szókincs replikációval = {'class': 'SimpleStrategy', 'replication_factor': 1}; USE szókincs; CREATE TABLE szavak (szószöveg ELSŐKULCS, számlálás int);

Ne feledje, hogy létrehoztunk egy névteret szójegyzék és egy benne lévő asztal hívott szavak két oszloppal, szó, és számol.

3. Függőségek

A Kafka és a Spark függőségeket Mavenen keresztül integrálhatjuk alkalmazásunkba. Ezeket a függőségeket a Maven Central-ból vesszük ki:

  • Core Spark
  • SQL Spark
  • Spark közvetítése
  • Kafka Spark közvetítése
  • Cassandra Spark
  • Cassandra Java Spark

És ennek megfelelően hozzáadhatjuk őket a pomunkhoz:

 org.apache.spark spark-core_2.11 2.3.0 biztosított org.apache.spark spark-sql_2.11 2.3.0 biztosított org.apache.spark spark-streaming_2.11 2.3.0 biztosított org.apache.spark spark-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark spark-cassandra-connector_2.11 2.3.0 com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2 

Ne feledje, hogy ezek a függőségek némelyike ​​jelölve van biztosítani terjedelmében. Ez azért van, mert ezeket a Spark telepítés teszi elérhetővé, ahol a végrehajtás iránti kérelmet szikra-beküldéssel nyújtjuk be.

4. Spark Streaming - Kafka integrációs stratégiák

Ezen a ponton érdemes röviden beszélni a Spark és Kafka integrációs stratégiáiról.

A Kafka új fogyasztói API-t vezetett be a 0.8 és 0.10 verziók között. Ezért a megfelelő Spark Streaming csomagok mindkét bróker verzióhoz elérhetők. Fontos a megfelelő csomag kiválasztása a rendelkezésre álló brókertől és a kívánt funkcióktól függően.

4.1. Spark Streaming Kafka 0.8

Az 0.8 verzió a stabil integrációs API a vevő-alapú vagy a közvetlen megközelítés használatának lehetőségeivel. Nem foglalkozunk e megközelítések részleteivel, amelyeket a hivatalos dokumentációban találhatunk. Fontos megjegyezni, hogy ez a csomag kompatibilis a Kafka Broker 0.8.2.1 vagy újabb verzióival.

4.2. Spark Streaming Kafka 0.10

Ez jelenleg kísérleti állapotban van, és csak a Kafka Broker 0.10.0 vagy újabb verzióival kompatibilis. Ez a csomag csak a közvetlen megközelítést kínálja, most az új Kafka fogyasztói API-t használja. Erről további részleteket a hivatalos dokumentációban találhatunk. Fontos, hogy az nem kompatibilis a régebbi Kafka Broker verziókkal.

Felhívjuk figyelmét, hogy ehhez az oktatóanyaghoz a 0.10-es csomagot fogjuk használni. Az előző szakaszban említett függőség csak erre utal.

5. Adatvezeték fejlesztése

Létrehozunk egy egyszerű alkalmazást a Java-ban a Spark segítségével, amely integrálódik a korábban létrehozott Kafka témába. Az alkalmazás elolvassa az üzeneteket kiküldöttként, és minden üzenetben megszámolja a szavak gyakoriságát. Ezt majd a korábban létrehozott Cassandra táblában frissítjük.

Gyorsan szemléljük az adatok áramlását:

5.1. Szerzés JavaStreamingContext

Először a. Inicializálásával kezdjük JavaStreamingContext amely az összes Spark Streaming alkalmazás belépési pontja:

SparkConf sparkConf = új SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = új JavaStreamingContext (sparkConf, Durations.seconds (1));

5.2. Szerzés DStream Kafkától

Most kapcsolódhatunk a Kafka témához a JavaStreamingContext:

Térkép kafkaParams = new HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("value.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "legújabb"); kafkaParams.put ("enable.auto.commit", hamis); Gyűjtemény témák = Arrays.asList ("üzenetek"); JavaInputDStream üzenetek = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Feliratkozás (témák, kafkaParams));

Felhívjuk figyelmét, hogy itt meg kell adnunk a kulcs és az érték leeresztését. Az általános adattípusokhoz, mint például Húr, a deserializer alapértelmezés szerint elérhető. Ha azonban egyedi adattípusokat szeretnénk beolvasni, akkor egyedi deserializátorokat kell megadnunk.

Itt megszereztük JavaInputDStream amely a diszkretizált folyamok vagy A DStreams, a Spark Streaming által biztosított alapvető absztrakció. Belsőleg a DStreams nem más, mint az RDD-k folyamatos sorozata.

5.3. Feldolgozás megszerezve DStream

Most egy sor műveletet hajtunk végre a JavaInputDStream a szavak frekvenciájának megszerzéséhez az üzenetekben:

JavaPairDStream eredmények = üzenetek .mapToPair (rekord -> új Tuple2 (rekord.kulcs (), rekord.érték ())); JavaDStream vonalak = results .map (tuple2 -> tuple2._2 ()); JavaDStream szavak = vonalak .flatMap (x -> Tömbök.asList (x.split ("\ s +")). Iterator ()); JavaPairDStream wordCounts = szavak .mapToPair (s -> új Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);

5.4. Folyamatosan feldolgozott DStream be Cassandra

Végül iterálhatunk a feldolgozottakon JavaPairDStream hogy beillessze őket a Cassandra asztalunkba:

wordCounts.foreachRDD (javaRdd -> {Térkép wordCountMap = javaRdd.collectAsMap (); for (String kulcs: wordCountMap.keySet ()) {List wordList = Arrays.asList (új Word (kulcs, wordCountMap.get (kulcs))); JavaRDD rdd = streamingContext.sparkContext (). Parallelize (wordList); javaFunctions (rdd) .writerBuilder ("szókincs", "szavak", mapToRow (Word.class)). SaveToCassandra ();}});

5.5. Az alkalmazás futtatása

Mivel ez egy adatfolyam-feldolgozó alkalmazás, ezt szeretnénk folyamatosan működtetni:

streamingContext.start (); streamingContext.awaitTermination ();

6. Az ellenőrzőpontok kihasználása

Egy adatfolyam-feldolgozó alkalmazásban gyakran hasznos megőrizni az állapotot a feldolgozott adatcsomagok között.

Például előző kísérletünkben csak a szavak aktuális gyakoriságát tudjuk tárolni. Mi van, ha inkább a kumulatív frekvenciát akarjuk tárolni? A Spark Streaming egy ellenőrzőpont nevű koncepció révén teszi lehetővé.

Most módosítjuk a korábban létrehozott folyamatot az ellenőrzési pontok kihasználása érdekében:

Felhívjuk figyelmét, hogy ellenőrzőpontokat csak az adatfeldolgozás munkamenetéhez használunk. Ez nem nyújt hibatűrést. Az ellenőrző pontozás azonban használható a hibatűrésre is.

Néhány változtatást kell végrehajtanunk alkalmazásunkban az ellenőrzési pontok kiaknázására. Ez magában foglalja a JavaStreamingContext ellenőrzőpont helyével:

streamingContext.checkpoint ("./. ellenőrzőpont");

Itt a helyi fájlrendszert használjuk az ellenőrzőpontok tárolására. A robusztusság érdekében azonban ezt olyan helyen kell tárolni, mint a HDFS, az S3 vagy a Kafka. További információ a hivatalos dokumentációban található.

Ezután be kell töltenünk az ellenőrző pontot, és létre kell hoznunk a szavak összesített számát, miközben minden partíciót leképező függvény segítségével feldolgozunk:

JavaMapWithStateDStream cumulativeWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); Tuple2 output = new Tuple2 (szó, összeg); állapot.frissítés (összeg); visszatérő kimenet;}));

Amint megkapjuk a kumulatív szószámokat, folytathatjuk az ismétlést és elmenthetjük őket a Cassandra-ba, mint korábban.

Felhívjuk figyelmét, hogy míg Az adatellenőrzés az állapotfeldolgozáshoz hasznos, késleltetési költséggel jár. Ezért ezt okosan kell használni, az optimális ellenőrzési intervallum mellett.

7. Az ellentételezések megértése

Ha felidézzük a korábban beállított Kafka-paramétereket:

kafkaParams.put ("auto.offset.reset", "legújabb"); kafkaParams.put ("enable.auto.commit", hamis);

Ezek alapvetően ezt jelentik nem akarunk automatikusan elvégezni az eltolást, és minden fogyasztói csoport inicializálásakor szeretnénk kiválasztani a legújabb eltolást. Következésképpen alkalmazásunk csak az általa futtatott időszakban küldhet üzeneteket.

Ha az összes elküldött üzenetet el akarjuk fogyasztani, függetlenül attól, hogy az alkalmazás futott-e vagy sem, és nyomon akarjuk tartani a már elküldött üzeneteket is, megfelelően kell beállítanunk az eltolást az eltolás állapotának mentésével együtt, bár ez kissé kívül esik ennek az oktatóanyagnak.

Ez a Spark Streaming egyfajta garanciát is kínál, például „pontosan egyszer”. Ez alapvetően azt jelenti, hogy a Kafka témában közzétett minden üzenetet csak egyszer dolgozza fel a Spark Streaming.

8. Alkalmazás telepítése

Tudunk telepítse az alkalmazásunkat a Spark-submit szkript segítségével amely a Spark telepítéssel előre van csomagolva:

$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies .befőttes üveg

Felhívjuk figyelmét, hogy a Maven használatával létrehozott edénynek tartalmaznia kell azokat a függőségeket, amelyek nincsenek megjelölve biztosítani terjedelmében.

Miután elküldtük ezt a pályázatot és elküldtünk néhány üzenetet a korábban létrehozott Kafka témában, látnunk kell, hogy a kumulatív szószámok felkerülnek-e a korábban létrehozott Cassandra táblába.

9. Következtetés

Összefoglalva, ebben az oktatóanyagban megtanultuk, hogyan hozhatunk létre egy egyszerű adatvezetéket a Kafka, a Spark Streaming és a Cassandra segítségével. Megtanultuk azt is, hogyan lehet az ellenőrzőpontokat kihasználni a Spark Streaming szolgáltatásban a kötegek közötti állapot fenntartása érdekében.

Mint mindig, a példák kódja elérhető a GitHub oldalon.