Adatvezeték építése a Kafka, a Spark Streaming és a Cassandra segítségével
1. Áttekintés
Az Apache Kafka egy skálázható, nagy teljesítményű, alacsony késési platform lehetővé teszi az adatfolyamok olvasását és írását, mint egy üzenetkezelő rendszer. A Java-ban lévő Kafka-val meglehetősen könnyen kezdhetünk.
A Spark Streaming az Apache Spark platform része lehetővé teszi az adatfolyamok skálázható, nagy áteresztőképességű, hibatűrő feldolgozását. Bár a Scala nyelven íródott, a Spark Java API-kat kínál a munkához.
Apache Cassandra a elosztott és széles oszlopú NoSQL adattár. A Cassandráról további részletek az előző cikkünkben találhatók.
Ebben az oktatóanyagban ezeket egyesítjük a nagymértékben méretezhető és hibatűrő adatcsatorna valós idejű adatfolyamhoz.
2. Telepítések
Először is szükségünk lesz Kafka, Spark és Cassandra telepítésre a gépünkre az alkalmazás futtatásához. Majd meglátjuk, hogyan lehet e platformok segítségével kifejleszteni egy adatcsatornát.
Hagyunk azonban minden alapértelmezett konfigurációt, beleértve a portokat is, minden telepítéshez, ami elősegíti az oktatóanyag zökkenőmentes futtatását.
2.1. Kafka
A Kafka telepítése helyi gépünkre meglehetősen egyszerű és megtalálható a hivatalos dokumentáció részeként. A Kafka 2.1.0 kiadását fogjuk használni.
Továbbá, Kafka az Apache Zookeepert futtatja de ennek az oktatóanyagnak a felhasználásával a Kafkához csomagolt egyetlen csomópontú Zookeeper példányt fogjuk felhasználni.
Miután sikerült elindítanunk a Zookeepert és a Kafkát a hivatalos útmutató nyomán, folytathatjuk az "üzenetek" nevű témánk létrehozását:
$ KAFKA_HOME $ \ bin \ windows \ kafka-topics.bat --create \ --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 \ --topic messages
Ne feledje, hogy a fenti szkript Windows platformra vonatkozik, de vannak hasonló szkriptek Unix-szerű platformokhoz is.
2.2. Szikra
A Spark a Hadoop kliens könyvtárait használja a HDFS és a YARN számára. Következésképpen, nagyon bonyolult összeállítani mindezek kompatibilis verzióit. A Spark hivatalos letöltése azonban előre csomagolva van a Hadoop népszerű verzióival. Ehhez az oktatóanyaghoz az „Apache Hadoop 2.7 és újabb verziók számára előre elkészített” 2.3.0 csomagot fogjuk használni.
Miután kibontotta a megfelelő Spark csomagot, a rendelkezésre álló szkriptek felhasználhatók pályázatok benyújtására. Ezt később látni fogjuk, amikor kifejlesztjük a Spring Boot alkalmazásunkat.
2.3. Cassandra
A DataStax elérhetővé teszi a Cassandra közösségi kiadását különböző platformokhoz, beleértve a Windows rendszert is. A hivatalos dokumentációt követve ezt nagyon egyszerűen letölthetjük és telepíthetjük helyi gépünkre. A 3.9.0 verziót fogjuk használni.
Miután sikerült telepíteni és elindítani a Cassandrát a helyi gépünkön, folytathatjuk a kulcstér és a táblázat létrehozását. Ez a telepítéssel együtt szállított CQL Shell használatával történhet:
CREPA KEYSPACE szókincs replikációval = {'class': 'SimpleStrategy', 'replication_factor': 1}; USE szókincs; CREATE TABLE szavak (szószöveg ELSŐKULCS, számlálás int);
Ne feledje, hogy létrehoztunk egy névteret szójegyzék és egy benne lévő asztal hívott szavak két oszloppal, szó, és számol.
3. Függőségek
A Kafka és a Spark függőségeket Mavenen keresztül integrálhatjuk alkalmazásunkba. Ezeket a függőségeket a Maven Central-ból vesszük ki:
- Core Spark
- SQL Spark
- Spark közvetítése
- Kafka Spark közvetítése
- Cassandra Spark
- Cassandra Java Spark
És ennek megfelelően hozzáadhatjuk őket a pomunkhoz:
org.apache.spark spark-core_2.11 2.3.0 biztosított org.apache.spark spark-sql_2.11 2.3.0 biztosított org.apache.spark spark-streaming_2.11 2.3.0 biztosított org.apache.spark spark-streaming -kafka-0-10_2.11 2.3.0 com.datastax.spark spark-cassandra-connector_2.11 2.3.0 com.datastax.spark spark-cassandra-connector-java_2.11 1.5.2
Ne feledje, hogy ezek a függőségek némelyike jelölve van biztosítani terjedelmében. Ez azért van, mert ezeket a Spark telepítés teszi elérhetővé, ahol a végrehajtás iránti kérelmet szikra-beküldéssel nyújtjuk be.
4. Spark Streaming - Kafka integrációs stratégiák
Ezen a ponton érdemes röviden beszélni a Spark és Kafka integrációs stratégiáiról.
A Kafka új fogyasztói API-t vezetett be a 0.8 és 0.10 verziók között. Ezért a megfelelő Spark Streaming csomagok mindkét bróker verzióhoz elérhetők. Fontos a megfelelő csomag kiválasztása a rendelkezésre álló brókertől és a kívánt funkcióktól függően.
4.1. Spark Streaming Kafka 0.8
Az 0.8 verzió a stabil integrációs API a vevő-alapú vagy a közvetlen megközelítés használatának lehetőségeivel. Nem foglalkozunk e megközelítések részleteivel, amelyeket a hivatalos dokumentációban találhatunk. Fontos megjegyezni, hogy ez a csomag kompatibilis a Kafka Broker 0.8.2.1 vagy újabb verzióival.
4.2. Spark Streaming Kafka 0.10
Ez jelenleg kísérleti állapotban van, és csak a Kafka Broker 0.10.0 vagy újabb verzióival kompatibilis. Ez a csomag csak a közvetlen megközelítést kínálja, most az új Kafka fogyasztói API-t használja. Erről további részleteket a hivatalos dokumentációban találhatunk. Fontos, hogy az nem kompatibilis a régebbi Kafka Broker verziókkal.
Felhívjuk figyelmét, hogy ehhez az oktatóanyaghoz a 0.10-es csomagot fogjuk használni. Az előző szakaszban említett függőség csak erre utal.
5. Adatvezeték fejlesztése
Létrehozunk egy egyszerű alkalmazást a Java-ban a Spark segítségével, amely integrálódik a korábban létrehozott Kafka témába. Az alkalmazás elolvassa az üzeneteket kiküldöttként, és minden üzenetben megszámolja a szavak gyakoriságát. Ezt majd a korábban létrehozott Cassandra táblában frissítjük.
Gyorsan szemléljük az adatok áramlását:
5.1. Szerzés JavaStreamingContext
Először a. Inicializálásával kezdjük JavaStreamingContext amely az összes Spark Streaming alkalmazás belépési pontja:
SparkConf sparkConf = új SparkConf (); sparkConf.setAppName ("WordCountingApp"); sparkConf.set ("spark.cassandra.connection.host", "127.0.0.1"); JavaStreamingContext streamingContext = új JavaStreamingContext (sparkConf, Durations.seconds (1));
5.2. Szerzés DStream Kafkától
Most kapcsolódhatunk a Kafka témához a JavaStreamingContext:
Térkép kafkaParams = new HashMap (); kafkaParams.put ("bootstrap.servers", "localhost: 9092"); kafkaParams.put ("key.deserializer", StringDeserializer.class); kafkaParams.put ("value.deserializer", StringDeserializer.class); kafkaParams.put ("group.id", "use_a_separate_group_id_for_each_stream"); kafkaParams.put ("auto.offset.reset", "legújabb"); kafkaParams.put ("enable.auto.commit", hamis); Gyűjtemény témák = Arrays.asList ("üzenetek"); JavaInputDStream üzenetek = KafkaUtils.createDirectStream (streamingContext, LocationStrategies.PreferConsistent (), ConsumerStrategies. Feliratkozás (témák, kafkaParams));
Felhívjuk figyelmét, hogy itt meg kell adnunk a kulcs és az érték leeresztését. Az általános adattípusokhoz, mint például Húr, a deserializer alapértelmezés szerint elérhető. Ha azonban egyedi adattípusokat szeretnénk beolvasni, akkor egyedi deserializátorokat kell megadnunk.
Itt megszereztük JavaInputDStream amely a diszkretizált folyamok vagy A DStreams, a Spark Streaming által biztosított alapvető absztrakció. Belsőleg a DStreams nem más, mint az RDD-k folyamatos sorozata.
5.3. Feldolgozás megszerezve DStream
Most egy sor műveletet hajtunk végre a JavaInputDStream a szavak frekvenciájának megszerzéséhez az üzenetekben:
JavaPairDStream eredmények = üzenetek .mapToPair (rekord -> új Tuple2 (rekord.kulcs (), rekord.érték ())); JavaDStream vonalak = results .map (tuple2 -> tuple2._2 ()); JavaDStream szavak = vonalak .flatMap (x -> Tömbök.asList (x.split ("\ s +")). Iterator ()); JavaPairDStream wordCounts = szavak .mapToPair (s -> új Tuple2 (s, 1)) .reduceByKey ((i1, i2) -> i1 + i2);
5.4. Folyamatosan feldolgozott DStream be Cassandra
Végül iterálhatunk a feldolgozottakon JavaPairDStream hogy beillessze őket a Cassandra asztalunkba:
wordCounts.foreachRDD (javaRdd -> {Térkép wordCountMap = javaRdd.collectAsMap (); for (String kulcs: wordCountMap.keySet ()) {List wordList = Arrays.asList (új Word (kulcs, wordCountMap.get (kulcs))); JavaRDD rdd = streamingContext.sparkContext (). Parallelize (wordList); javaFunctions (rdd) .writerBuilder ("szókincs", "szavak", mapToRow (Word.class)). SaveToCassandra ();}});
5.5. Az alkalmazás futtatása
Mivel ez egy adatfolyam-feldolgozó alkalmazás, ezt szeretnénk folyamatosan működtetni:
streamingContext.start (); streamingContext.awaitTermination ();
6. Az ellenőrzőpontok kihasználása
Egy adatfolyam-feldolgozó alkalmazásban gyakran hasznos megőrizni az állapotot a feldolgozott adatcsomagok között.
Például előző kísérletünkben csak a szavak aktuális gyakoriságát tudjuk tárolni. Mi van, ha inkább a kumulatív frekvenciát akarjuk tárolni? A Spark Streaming egy ellenőrzőpont nevű koncepció révén teszi lehetővé.
Most módosítjuk a korábban létrehozott folyamatot az ellenőrzési pontok kihasználása érdekében:
Felhívjuk figyelmét, hogy ellenőrzőpontokat csak az adatfeldolgozás munkamenetéhez használunk. Ez nem nyújt hibatűrést. Az ellenőrző pontozás azonban használható a hibatűrésre is.
Néhány változtatást kell végrehajtanunk alkalmazásunkban az ellenőrzési pontok kiaknázására. Ez magában foglalja a JavaStreamingContext ellenőrzőpont helyével:
streamingContext.checkpoint ("./. ellenőrzőpont");
Itt a helyi fájlrendszert használjuk az ellenőrzőpontok tárolására. A robusztusság érdekében azonban ezt olyan helyen kell tárolni, mint a HDFS, az S3 vagy a Kafka. További információ a hivatalos dokumentációban található.
Ezután be kell töltenünk az ellenőrző pontot, és létre kell hoznunk a szavak összesített számát, miközben minden partíciót leképező függvény segítségével feldolgozunk:
JavaMapWithStateDStream cumulativeWordCounts = wordCounts .mapWithState (StateSpec.function ((word, one, state) -> {int sum = one.orElse (0) + (state.exists ()? state.get (): 0); Tuple2 output = new Tuple2 (szó, összeg); állapot.frissítés (összeg); visszatérő kimenet;}));
Amint megkapjuk a kumulatív szószámokat, folytathatjuk az ismétlést és elmenthetjük őket a Cassandra-ba, mint korábban.
Felhívjuk figyelmét, hogy míg Az adatellenőrzés az állapotfeldolgozáshoz hasznos, késleltetési költséggel jár. Ezért ezt okosan kell használni, az optimális ellenőrzési intervallum mellett.
7. Az ellentételezések megértése
Ha felidézzük a korábban beállított Kafka-paramétereket:
kafkaParams.put ("auto.offset.reset", "legújabb"); kafkaParams.put ("enable.auto.commit", hamis);
Ezek alapvetően ezt jelentik nem akarunk automatikusan elvégezni az eltolást, és minden fogyasztói csoport inicializálásakor szeretnénk kiválasztani a legújabb eltolást. Következésképpen alkalmazásunk csak az általa futtatott időszakban küldhet üzeneteket.
Ha az összes elküldött üzenetet el akarjuk fogyasztani, függetlenül attól, hogy az alkalmazás futott-e vagy sem, és nyomon akarjuk tartani a már elküldött üzeneteket is, megfelelően kell beállítanunk az eltolást az eltolás állapotának mentésével együtt, bár ez kissé kívül esik ennek az oktatóanyagnak.
Ez a Spark Streaming egyfajta garanciát is kínál, például „pontosan egyszer”. Ez alapvetően azt jelenti, hogy a Kafka témában közzétett minden üzenetet csak egyszer dolgozza fel a Spark Streaming.
8. Alkalmazás telepítése
Tudunk telepítse az alkalmazásunkat a Spark-submit szkript segítségével amely a Spark telepítéssel előre van csomagolva:
$ SPARK_HOME $ \ bin \ spark-submit \ --class com.baeldung.data.pipeline.WordCountingAppWithCheckpoint \ --master local [2] \ target \ spark-streaming-app-0.0.1-SNAPSHOT-jar-with-dependencies .befőttes üveg
Felhívjuk figyelmét, hogy a Maven használatával létrehozott edénynek tartalmaznia kell azokat a függőségeket, amelyek nincsenek megjelölve biztosítani terjedelmében.
Miután elküldtük ezt a pályázatot és elküldtünk néhány üzenetet a korábban létrehozott Kafka témában, látnunk kell, hogy a kumulatív szószámok felkerülnek-e a korábban létrehozott Cassandra táblába.
9. Következtetés
Összefoglalva, ebben az oktatóanyagban megtanultuk, hogyan hozhatunk létre egy egyszerű adatvezetéket a Kafka, a Spark Streaming és a Cassandra segítségével. Megtanultuk azt is, hogyan lehet az ellenőrzőpontokat kihasználni a Spark Streaming szolgáltatásban a kötegek közötti állapot fenntartása érdekében.
Mint mindig, a példák kódja elérhető a GitHub oldalon.