Bevezetés a Java-ban található KafkaStreams-be

1. Áttekintés

Ebben a cikkben megnézzük a KafkaStreams könyvtár.

KafkaStreams az Apache Kafka alkotói tervezték. A szoftver elsődleges célja, hogy lehetővé tegye a programozók számára, hogy hatékony, valós idejű, streaming alkalmazásokat hozzanak létre, amelyek Microservice-ként működhetnek.

KafkaStreams lehetővé teszi számunkra, hogy a Kafka-témakörökből felhasználjuk, elemezzük vagy átalakítsuk az adatokat, és esetleg továbbítsuk azokat egy másik Kafka-témára.

Demonstrálni KafkaStreams, létrehozunk egy egyszerű alkalmazást, amely beolvassa a témából származó mondatokat, megszámolja a szavak előfordulását és kiírja a szavankénti számot.

Fontos megjegyezni, hogy a KafkaStreams a könyvtár nem reaktív, és nem támogatja az aszinkron műveleteket és az ellennyomás kezelését.

2. Maven-függőség

Az adatfolyam-feldolgozási logika használatának megkezdése KafkaStreams, hozzá kell adnunk egy függőséget kafka-patakok és kafka-kliensek:

 org.apache.kafka kafka-streaming 1.0.0 org.apache.kafka kafka-clients 1.0.0 

Szükségünk van az Apache Kafka telepítésére és elindítására is, mert Kafka témát fogunk használni. Ez a témakör lesz az adatfolyam adatfolyamunkhoz.

A Kafka és más szükséges függőségeket letölthetjük a hivatalos weboldalról.

3. A KafkaStreams bemenet konfigurálása

Az első dolog, amit meg fogunk tenni, az a bemeneti Kafka-téma meghatározása.

Használhatjuk a Összefolyó letöltött eszköz - egy Kafka szervert tartalmaz. Tartalmazza a kafka-konzol-producer amellyel üzeneteket tehetünk közzé a Kafka számára.

A kezdéshez futtassuk Kafka-fürtünket:

./folyó indítás

Miután a Kafka elindult, a segítségével meghatározhatjuk az adatforrásunkat és az alkalmazásunk nevét APPLICATION_ID_CONFIG:

String inputTopic = "inputTopic";
Properties streamsConfiguration = new Properties (); streamsConfiguration.put (StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-live-test");

Fontos konfigurációs paraméter a BOOTSTRAP_SERVER_CONFIG. Ez az URL a helyi Kafka példányunkhoz, amelyet most indítottunk el:

private String bootstrapServers = "localhost: 9092"; streamsConfiguration.put (StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);

Ezután át kell adnunk a kulcs típusát és az elfogyasztandó üzenetek értékét inputTopic:

streamsConfiguration.put (StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ()); streamsConfiguration.put (StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String (). getClass (). getName ());

Az adatfolyam-feldolgozás gyakran állapotszerű. Ha közbenső eredményeket akarunk menteni, meg kell adnunk a STATE_DIR_CONFIG paraméter.

Tesztünk során helyi fájlrendszert használunk:

streamsConfiguration.put (StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory (). getAbsolutePath ()); 

4. Streaming topológia felépítése

Miután meghatároztuk a bemeneti témánkat, létrehozhatunk egy Streaming topológiát - ez meghatározza, hogyan kell kezelni és átalakítani az eseményeket.

Példánkban egy szószámlálót szeretnénk megvalósítani. Minden elküldött mondatért inputTopic, szavakra akarjuk bontani és kiszámítani minden szó előfordulását.

Használhatjuk a KStreamsBuilder osztály, hogy elkezdje felépíteni topológiánkat:

KStreamBuilder builder = new KStreamBuilder (); KStream textLines = builder.stream (inputTopic); Mintaminta = Pattern.compile ("\ W +", Pattern.UNICODE_CHARACTER_CLASS); KTable wordCounts = textLines .flatMapValues ​​(érték -> Arrays.asList (pattern.split (value.toLowerCase ()))) .groupBy ((kulcs, szó) -> szó) .count ();

A szószám végrehajtásához először is fel kell osztanunk az értékeket a reguláris kifejezés segítségével.

A split módszer egy tömböt ad vissza. Használjuk a flatMapValues ​​() hogy ellapítsa. Ellenkező esetben tömbök listájával állnánk elő, és kényelmetlen lenne kódot írni ilyen struktúra segítségével.

Végül összesítjük az egyes szavak értékeit, és felhívjuk a számol() amely kiszámítja egy adott szó előfordulását.

5. Az eredmények kezelése

Már kiszámoltuk a bemeneti üzeneteink szavak számát. Most nyomtassuk ki az eredményeket a standard kimenetre a az egyes() módszer:

wordCounts .foreach ((w, c) -> System.out.println ("szó:" + w + "->" + c));

A gyártás során gyakran előfordul, hogy egy ilyen streaming munka közzéteszi a kimenetet egy másik Kafka témában.

Ezt megtehetnénk a to () módszer:

String outputTopic = "outputTopic"; Serde stringSerde = Serdes.String (); Serde longSerde = Serdes.Long (); wordCounts.to (stringSerde, longSerde, outputTopic);

A Serde osztály előre konfigurált sorosítókat ad nekünk a Java típusokhoz, amelyek az objektumok bájt tömbjéig történő sorosítására szolgálnak. A bájt tömböt ezután elküldjük a Kafka témára.

Használjuk Húr témánk kulcsként és Hosszú a tényleges számlálás értékeként. A nak nek() metódus menti a kapott adatokat a outputTopic.

6. A KafkaStream Job elindítása

Eddig a pontig felépítettünk egy végrehajtható topológiát. A munka azonban még nem kezdődött el.

Kifejezetten el kell kezdenünk munkánkat a Rajt() módszer a KafkaStreams példa:

KafkaStreams adatfolyamok = új KafkaStreams (készítő, streamsConfiguration); patakok.start (); Szál.alszik (30000); patakok.zár ();

Vegye figyelembe, hogy 30 másodpercet várunk a munka befejezésére. Valódi forgatókönyv szerint ez a munka folyamatosan futna, feldolgozva az eseményeket Kafka-ból, amikor megérkeznek.

Kipróbálhatjuk munkánkat, ha néhány eseményt közzéteszünk a Kafka témában.

Kezdjük a kafka-konzol-producer és kézzel küldjön néhány eseményt a mi oldalunkra inputTopic:

./kafka-console-producer --topic inputTopic - broker-list localhost: 9092> "ez egy póni"> "ez egy ló és póni" 

Így két eseményt tettünk közzé a Kafka-nak. Alkalmazásunk felhasználja ezeket az eseményeket, és a következő kimenetet nyomtatja ki:

szó: -> 1 szó: ez -> 1 szó: van -> 1 szó: a -> 1 szó: póni -> 1 szó: -> 2 szó: ez -> 2 szó: az -> 2 szó: a - > 2 szó: ló -> 1 szó: és -> 1 szó: póni -> 2

Láthatjuk, hogy amikor megérkezett az első üzenet, a szó póniló csak egyszer fordult elő. De amikor elküldtük a második üzenetet, a szót póniló másodszor történt meg a nyomtatás:szó: póni -> 2 ″.

6. Következtetés

Ez a cikk azt tárgyalja, hogyan hozhat létre elsődleges adatfolyam-feldolgozó alkalmazást az Apache Kafka adatforrásként, és a KafkaStreams könyvtár, mint adatfolyam feldolgozó könyvtár.

Mindezek a példák és kódrészletek megtalálhatók a GitHub projektben - ez egy Maven projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.


$config[zx-auto] not found$config[zx-overlay] not found