Pontosan egyszer feldolgozás Kafkában Java-val

1. Áttekintés

Ebben az oktatóanyagban megnézzük, hogyan A Kafka pontosan egyszeri szállítást biztosít a gyártói és a fogyasztói alkalmazások között az újonnan bevezetett Transactional API-n keresztül.

Ezenkívül ezt az API-t használjuk tranzakciós gyártók és fogyasztók megvalósításához, hogy a WordCount példában a végponttól végpontig pontosan egyszeri teljesítést érjünk el.

2. Üzenet kézbesítése Kafkában

Különböző hibák miatt az üzenetkezelő rendszerek nem tudják garantálni az üzenet kézbesítését a gyártó és a fogyasztó közötti alkalmazások között. Attól függően, hogy az ügyfélalkalmazások hogyan lépnek kapcsolatba az ilyen rendszerekkel, a következő üzenetszemantika lehetséges:

  • Ha egy üzenetkezelő rendszer soha nem másol egy üzenetet, de előfordulhat, hogy hiányzik az alkalmi üzenet, akkor ezt hívjuk legfeljebb egyszer
  • Vagy ha soha nem fog hiányozni egy üzenet, de megismétli az alkalmi üzenetet, akkor hívjuk legalább egyszer
  • De ha mindig minden üzenetet eljuttat duplikáció nélkül, akkor pontosan-egyszer

Kezdetben a Kafka csak a legfeljebb egyszeri és a legalább egyszeri üzenetküldést támogatta.

Azonban, a Kafka brókerek és az ügyfélalkalmazások közötti tranzakciók bevezetése biztosítja az egyszeri szállítást a Kafkában. Hogy jobban megértsük, nézzük át gyorsan a tranzakciós kliens API-t.

3. Maven-függőségek

A tranzakciós API használatához a Kafka Java kliensre lesz szükségünk:

 org.apache.kafka kafka-kliensek 2.0.0 

4. A tranzakció fogyasztani-átalakítani-termelni Hurok

Például egy bemeneti témától fogunk üzeneteket fogyasztani, mondatok.

Ezután minden egyes mondathoz minden szót megszámlálunk, és az egyes szavak számát elküldjük egy kimeneti témához, számít.

A példában feltételezzük, hogy már vannak tranzakciós adatok a mondatok téma.

4.1. Tranzakciótudatos producer

Tehát először tegyünk hozzá egy tipikus Kafka-gyártót.

Properties producerProps = new Properties (); producerProps.put ("bootstrap.servers", "localhost: 9092");

Ezenkívül meg kell adnunk a ügyleti.id és engedélyezze idempotencia:

producerProps.put ("enable.idempotence", "true"); producerProps.put ("tranzakciós.id", "prod-1"); KafkaProducer producer = új KafkaProducer (producerProps);

Mivel engedélyeztük az idempotenciát, a Kafka ezt a tranzakcióazonosítót fogja használni algoritmusának részeként deduplikálja a gyártó minden üzenetétküld, biztosítva az idegességet.

Egyszerűen fogalmazva, ha a producer véletlenül többször küldi ugyanazt az üzenetet a Kafkának, ezek a beállítások lehetővé teszik számára, hogy észrevegye.

Csak annyit kell tennünk győződjön meg arról, hogy a tranzakcióazonosító minden termelő számára megkülönböztetett, bár az újraindítás során következetes.

4.2. A gyártó engedélyezése tranzakciókhoz

Ha készen állunk, akkor telefonálnunk is kell initTransaction felkészíteni a termelőt az ügyletek felhasználására:

producer.initTransactions ();

Ez nyilvántartásba veszi a termelőt a brókernél, amely tranzakciókat használhat, azzal azonosítva ügyleti.id és egy sorszámot vagy korszakot. Viszont a bróker ezeket felhasználja a tranzakciós napló minden műveletének előreírására.

Következésképpen a bróker eltávolítja a naplóból az összes olyan műveletet, amely ugyanazon tranzakcióazonosítóval rendelkező és korábbi gyártókékorszak, feltételezve, hogy a nem megszűnt ügyletekből származnak.

4.3. Tranzakciótudatos fogyasztó

Fogyasztáskor sorrendben elolvashatjuk a témakör partíciójának összes üzenetét. Bár, -val jelezhetjük elszigeteltség.szint várni kell a tranzakciós üzenetek elolvasására, amíg a társított tranzakciót el nem végzik:

Tulajdonságok consumerProps = új Tulajdonságok (); consumerProps.put ("bootstrap.servers", "localhost: 9092"); consumerProps.put ("group.id", "my-group-id"); consumerProps.put ("enable.auto.commit", "hamis"); consumerProps.put ("isolation.level", "read_committed"); KafkaConsumer fogyasztó = új KafkaConsumer (consumerProps); fogyasztó.feliratkozás (singleton („mondatok”));

Értékének használata read_committed biztosítja, hogy a tranzakció befejezése előtt ne olvassunk el tranzakciós üzeneteket.

A. Alapértelmezett értéke elszigeteltség.szint van read_uncommitted.

4.4. Fogyasztás és átalakítás tranzakcióval

Most, hogy a gyártó és a fogyasztó egyaránt konfigurálva van az ügyletek írására és olvasására, felhasználhatjuk a bemeneti témánk rekordjait, és minden egyes rekordot megszámolhatunk:

ConsumerRecords records = consumer.poll (ofSeconds (60)); Map wordCountMap = records.records (új TopicPartition ("input", 0)) .stream () .flatMap (rekord -> Stream.of (record.value (). Split (""))) .map (word -> Tuple.of (word, 1)) .collect (Collectors.toMap (tuple -> tuple.getKey (), t1 -> t1.getValue (), (v1, v2) -> v1 + v2));

Ne feledje, hogy a fenti kódban nincs semmi tranzakció. De, mivel használtuk read_committed, ez azt jelenti, hogy egyetlen felhasználó sem olvassa el azokat az üzeneteket, amelyeket ugyanabban a tranzakcióban írtak a bemeneti témakörbe, mindaddig, amíg meg nem írták őket.

Most elküldhetjük a kiszámított szószámot a kimeneti témához.

Lássuk, hogyan állíthatjuk elő eredményeinket, tranzakciós úton is.

4.5. API küldése

Ha új üzenetként szeretnénk elküldeni a számlálásunkat, de ugyanabban a tranzakcióban hívjuk beginTransaction:

producer.beginTransaction ();

Ezután mindegyiket beírhatjuk a „számít” témánkba úgy, hogy a kulcs a szó, a szám pedig az érték:

wordCountMap.forEach ((kulcs, érték) -> producer.send (új ProducerRecord ("számít", kulcs, érték.toString ()));

Vegye figyelembe, hogy mivel a gyártó a kulcs által particionálhatja az adatokat, ez azt jelenti a tranzakciós üzenetek több partícióra is kiterjedhetnek, mindegyiket külön fogyasztók olvashatják el. Ezért a Kafka bróker tárolja a tranzakció összes frissített partíciójának listáját.

Vegye figyelembe azt is, egy tranzakción belül a gyártó több szálat is használhat rekordok párhuzamos elküldéséhez.

4.6. Elszámolások végrehajtása

És végül el kell kötnünk az ellentételezésünket, amelyet éppen most fogyasztottunk el. A tranzakciókkal visszavezetjük az ellentételezéseket arra a bemeneti témára, ahonnan olvastuk, a szokásos módon. Bár, mi küldje el őket a termelő ügyletébe.

Mindezt egyetlen hívásban tehetjük meg, de először ki kell számolnunk az egyes témapartíciók eltolását:

Map offsetsToCommit = new HashMap (); for (TopicPartition partíció: records.partitions ()) {Lista partitionedRecords = records.records (partíció); hosszú eltolás = partitionedRecords.get (partitionedRecords.size () - 1). offset (); offsetsToCommit.put (partíció, új OffsetAndMetadata (offset + 1)); }

Ne feledje, hogy amit a tranzakcióra vállalunk, az a közelgő ellentételezés, vagyis hozzá kell adnunk 1-et.

Ezután elküldhetjük a tranzakcióra számított ellentételezésünket:

producer.sendOffsetsToTransaction (offsetsToCommit, "my-group-id");

4.7. A tranzakció végrehajtása vagy megszakítása

És végül el tudjuk kötni a tranzakciót, amely atomszerűen írja le az ellentételezéseket a consumer_offsets téma, valamint maga a tranzakció:

producer.commitTransaction ();

Ez minden pufferelt üzenetet áthúz a megfelelő partíciókra. Ezenkívül a Kafka bróker az ügylet összes üzenetét elérhetővé teszi a fogyasztók számára.

Természetesen, ha bármi elromlik a feldolgozás közben, például ha elkapunk egy kivételt, akkor hívhatunk abortTransaction:

próbáld meg {// ... olvasni a bemeneti témától // ... átalakítani // ... írni a kimeneti témához producer.commitTransaction (); } catch (e kivétel) {producer.abortTransaction (); }

Dobja el az összes pufferelt üzenetet, és távolítsa el a tranzakciót a brókertől.

Ha nem követünk el és nem is állunk meg a bróker konfigurálása előtt max.transaction.timeout.ms, a Kafka bróker maga fogja megszakítani a tranzakciót. A tulajdonság alapértelmezett értéke 900 000 ezredmásodperc vagy 15 perc.

5. Egyéb fogyasztani-átalakítani-termelni Hurkok

Amit most láttunk, az alapvető fogyasztani-átalakítani-termelni hurok, amely ugyanahhoz a Kafka-fürthöz olvas és ír.

Fordítva, Azoknak az alkalmazásoknak, amelyeknek különböző Kafka-fürtöknek kell írniuk és írniuk, a régebbi verziót kell használniuk kötelezettségSync és kötelezettségAsync API. Jellemzően az alkalmazások a fogyasztói ellentételezéseket külső állapotú tárhelyükre tárolják a tranzakció megtartása érdekében.

6. Következtetés

Adatkritikus alkalmazások esetében gyakran elengedhetetlen a végpontok közötti egyszeri feldolgozás.

Ebben az oktatóanyagban láttuk, hogyan használjuk a Kafkát pontosan erre, tranzakciók segítségével, és egy tranzakció alapú szószámlálási példát valósítottunk meg az elv szemléltetésére.

Nézze meg nyugodtan az összes kódmintát a GitHubon.