Bevezetés a Java-ban található KafkaStreams-be
1. Áttekintés
Ebben a cikkben megnézzük a KafkaStreams könyvtár.
KafkaStreams az Apache Kafka alkotói tervezték. A szoftver elsődleges célja, hogy lehetővé tegye a programozók számára, hogy hatékony, valós idejű, streaming alkalmazásokat hozzanak létre, amelyek Microservice-ként működhetnek.
KafkaStreams lehetővé teszi számunkra, hogy a Kafka-témakörökből felhasználjuk, elemezzük vagy átalakítsuk az adatokat, és esetleg továbbítsuk azokat egy másik Kafka-témára.
Demonstrálni KafkaStreams, létrehozunk egy egyszerű alkalmazást, amely beolvassa a témából származó mondatokat, megszámolja a szavak előfordulását és kiírja a szavankénti számot.
Fontos megjegyezni, hogy a KafkaStreams a könyvtár nem reaktív, és nem támogatja az aszinkron műveleteket és az ellennyomás kezelését.
2. Maven-függőség
Az adatfolyam-feldolgozási logika használatának megkezdése KafkaStreams, hozzá kell adnunk egy függőséget kafka-patakok és kafka-kliensek:
org.apache.kafka kafka-streaming 1.0.0 org.apache.kafka kafka-clients 1.0.0
Szükségünk van az Apache Kafka telepítésére és elindítására is, mert Kafka témát fogunk használni. Ez a témakör lesz az adatfolyam adatfolyamunkhoz.
A Kafka és más szükséges függőségeket letölthetjük a hivatalos weboldalról.
3. A KafkaStreams bemenet konfigurálása
Az első dolog, amit meg fogunk tenni, az a bemeneti Kafka-téma meghatározása.
Használhatjuk a Összefolyó letöltött eszköz - egy Kafka szervert tartalmaz. Tartalmazza a kafka-konzol-producer amellyel üzeneteket tehetünk közzé a Kafka számára.
A kezdéshez futtassuk Kafka-fürtünket:
./folyó indítás
Miután a Kafka elindult, a segítségével meghatározhatjuk az adatforrásunkat és az alkalmazásunk nevét APPLICATION_ID_CONFIG:
String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");
Fontos konfigurációs paraméter a BOOTSTRAP_SERVER_CONFIG. Ez az URL a helyi Kafka példányunkhoz, amelyet most indítottunk el:
private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
Ezután át kell adnunk a kulcs típusát és az elfogyasztandó üzenetek értékét inputTopic:
streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());
Az adatfolyam-feldolgozás gyakran állapotszerű. Ha közbenső eredményeket akarunk menteni, meg kell adnunk a STATE_DIR_CONFIG paraméter.
Tesztünk során helyi fájlrendszert használunk:
streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ());
4. Streaming topológia felépítése
Miután meghatároztuk a bemeneti témánkat, létrehozhatunk egy Streaming topológiát - ez meghatározza, hogyan kell kezelni és átalakítani az eseményeket.
Példánkban egy szószámlálót szeretnénk megvalósítani. Minden elküldött mondatért inputTopic, szavakra akarjuk bontani és kiszámítani minden szó előfordulását.
Használhatjuk a KStreamsBuilder osztály, hogy elkezdje felépíteni topológiánkat:
KStreamBuilder builder = new KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Mintaminta = Pattern.compile ("\ W +", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues (érték -> Arrays.asList (pattern.split (value.toLowerCase ()))) .groupBy ((kulcs, szó) -> szó) .count ();
A szószám végrehajtásához először is fel kell osztanunk az értékeket a reguláris kifejezés segítségével.
A split módszer egy tömböt ad vissza. Használjuk a flatMapValues () hogy ellapítsa. Ellenkező esetben tömbök listájával állnánk elő, és kényelmetlen lenne kódot írni ilyen struktúra segítségével.
Végül összesítjük az egyes szavak értékeit, és felhívjuk a számol() amely kiszámítja egy adott szó előfordulását.
5. Az eredmények kezelése
Már kiszámoltuk a bemeneti üzeneteink szavak számát. Most nyomtassuk ki az eredményeket a standard kimenetre a az egyes() módszer:
wordCounts .foreach ((w, c) -> System.out.println ("szó:" + w + "->" + c));
A gyártás során gyakran előfordul, hogy egy ilyen streaming munka közzéteszi a kimenetet egy másik Kafka témában.
Ezt megtehetnénk a to () módszer:
String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);
A Serde osztály előre konfigurált sorosítókat ad nekünk a Java típusokhoz, amelyek az objektumok bájt tömbjéig történő sorosítására szolgálnak. A bájt tömböt ezután elküldjük a Kafka témára.
Használjuk Húr témánk kulcsként és Hosszú a tényleges számlálás értékeként. A nak nek() metódus menti a kapott adatokat a outputTopic.
6. A KafkaStream Job elindítása
Eddig a pontig felépítettünk egy végrehajtható topológiát. A munka azonban még nem kezdődött el.
Kifejezetten el kell kezdenünk munkánkat a Rajt() módszer a KafkaStreams példa:
KafkaStreams adatfolyamok = új KafkaStreams (készítő, streamsConfiguration); patakok.start (); Szál.alszik (30000); patakok.zár ();
Vegye figyelembe, hogy 30 másodpercet várunk a munka befejezésére. Valódi forgatókönyv szerint ez a munka folyamatosan futna, feldolgozva az eseményeket Kafka-ból, amikor megérkeznek.
Kipróbálhatjuk munkánkat, ha néhány eseményt közzéteszünk a Kafka témában.
Kezdjük a kafka-konzol-producer és kézzel küldjön néhány eseményt a mi oldalunkra inputTopic:
./kafka-console-producer --topic inputTopic - broker-list localhost: 9092> "ez egy póni"> "ez egy ló és póni"
Így két eseményt tettünk közzé a Kafka-nak. Alkalmazásunk felhasználja ezeket az eseményeket, és a következő kimenetet nyomtatja ki:
szó: -> 1 szó: ez -> 1 szó: van -> 1 szó: a -> 1 szó: póni -> 1 szó: -> 2 szó: ez -> 2 szó: az -> 2 szó: a - > 2 szó: ló -> 1 szó: és -> 1 szó: póni -> 2
Láthatjuk, hogy amikor megérkezett az első üzenet, a szó póniló csak egyszer fordult elő. De amikor elküldtük a második üzenetet, a szót póniló másodszor történt meg a nyomtatás:szó: póni -> 2 ″.
6. Következtetés
Ez a cikk azt tárgyalja, hogyan hozhat létre elsődleges adatfolyam-feldolgozó alkalmazást az Apache Kafka adatforrásként, és a KafkaStreams könyvtár, mint adatfolyam feldolgozó könyvtár.
Mindezek a példák és kódrészletek megtalálhatók a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.