Bevezetés Apache Kafkába Tavasszal

Kitartás felső

Most jelentettem be az újat Tanulj tavaszt tanfolyam, amelynek középpontjában az 5. tavasz és a tavaszi bakancs 2 alapjai állnak:

>> ELLENŐRIZZE A FOLYAMATOT

1. Áttekintés

Az Apache Kafka egy elosztott és hibatűrő adatfeldolgozó rendszer.

Ebben a cikkben a Kafka tavaszi támogatásával és az absztrakciók szintjével foglalkozunk a natív Kafka Java kliens API-k felett.

A Spring Kafka az egyszerű és tipikus Spring sablon programozási modellt hozza a KafkaTemplate és Üzenet-vezérelt POJO-k keresztül @KafkaListener annotáció.

2. Telepítés és beállítás

A Kafka letöltéséhez és telepítéséhez olvassa el az itt található hivatalos útmutatót.

Hozzá kell adnunk a tavasz-kafka függőség a mi pom.xml:

 org.springframework.kafka spring-kafka 2.3.7.FELHASZNÁLÁS 

A műtárgy legújabb verziója itt található.

Példaképünk a Spring Boot alkalmazás lesz.

Ez a cikk feltételezi, hogy a kiszolgálót az alapértelmezett konfigurációval indították el, és egyetlen kiszolgálóport sem változott.

3. Témák beállítása

Korábban parancssori eszközöket futtattunk olyan témák létrehozásához a Kafka-ban, mint például:

$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic

De a bevezetésével AdminClient Kafkában most programozottan hozhatunk létre témákat.

Hozzá kell adnunk a KafkaAdmin Tavaszi bab, amely automatikusan hozzáadja a témákat az összes típusú babhoz Új téma:

@Configuration public class KafkaTopicConfig {@Value (value = "$ {kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin (config); } @Bean public NewTopic topic1 () {return new NewTopic ("baeldung", 1, (rövid) 1); }}

4. Üzenetek előállítása

Üzenetek létrehozásához először konfigurálnunk kell a ProducerFactory amely meghatározza a Kafka létrehozásának stratégiáját Termelő példányok.

Akkor szükségünk van egy KafkaTemplate amely beburkolja a Termelő példányt, és kényelmi módszereket biztosít az üzenetek Kafka témákhoz történő elküldéséhez.

Termelő a példányok szálbiztosak, ezért egyetlen példány használata az alkalmazás egészében nagyobb teljesítményt nyújt. Következésképpen, KakfaTemplate példányok is szálbiztosak, és egy példány használata ajánlott.

4.1. Gyártói konfiguráció

@Configuration public class KafkaProducerConfig {@Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate kafkaTemplate () {return new KafkaTemplate (producerFactory ()); }}

4.2. Üzenetek közzététele

A. Segítségével üzeneteket küldhetünk KafkaTemplate osztály:

@Autowired privát KafkaTemplate kafkaTemplate; public void sendMessage (String msg) {kafkaTemplate.send (topicName, msg); }

A Küld Az API a Hallható jövő tárgy. Ha blokkolni akarjuk a küldő szálat, és megkapjuk az eredményt az elküldött üzenetről, akkor felhívhatjuk a kap API-ja Hallható jövő tárgy. A szál megvárja az eredményt, de ez lelassítja a producert.

A Kafka egy gyors adatfolyam-feldolgozó platform. Ezért jobb ötlet az eredményeket aszinkron módon kezelni, hogy a következő üzenetek ne várják meg az előző üzenet eredményét. Ezt visszahívással tehetjük meg:

public void sendMessage (String üzenet) {ListenableFuture jövő = kafkaTemplate.send (topicNév, üzenet); future.addCallback (új ListenableFutureCallback() {@Override public void onSuccess (SendResult result) {System.out.println ("Elküldött üzenet = [" + üzenet + "] eltolással = [" + result.getRecordMetadata (). Offset () + "]") ; } @Orride public void onFailure (Throwable ex) {System.out.println ("Nem lehet üzenetet küldeni = [" + üzenet + "] a következő miatt:" + ex.getMessage ()); }}); }

5. Üzenetek fogyasztása

5.1. Fogyasztói konfiguráció

Üzenetek fogyasztásához be kell állítanunk a ConsumerFactory és a KafkaListenerContainerFactory. Amint ezek a babok rendelkezésre állnak a tavaszi babgyárban, a POJO-alapú fogyasztókat a segítségével konfigurálhatjuk @KafkaListener annotáció.

@EnableKafka megjegyzés szükséges a konfigurációs osztályon a @KafkaListener felirat a tavaszi kezelt babokra:

@EnableKafka @Configuration nyilvános osztály KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory (kellékek); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); gyár.setConsumerFactory (consumerFactory ()); visszatérő gyár; }}

5.2. Üzenetek fogyasztása

@KafkaListener (topic = "topicName", groupId = "foo") public void listenGroupFoo (String üzenet) {System.out.println ("Fogadott üzenet a csoport foo-ban:" + üzenet); }

Több hallgató is megvalósítható egy témához, mindegyik más-más csoport azonosítóval rendelkezik. Ezenkívül egy fogyasztó különböző témákból származó üzeneteket hallgathat:

@KafkaListener (topic = "topic1, topic2", groupId = "foo")

A Spring egy vagy több üzenetfejléc letöltését is támogatja a @Fejléc annotáció a hallgatóban:

@KafkaListener (topic = "topicName") public void listenWithHeaders (@Payload String üzenet, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partíció) {System.out.println ("Fogadott üzenet:" + üzenet "+" a partícióból: "+ partíció);}

5.3. Üzenetek fogyasztása egy adott partícióról

Mint észrevette, mi hoztuk létre a témát baeldung csak egy partícióval. A több partícióval rendelkező téma esetében azonban a @KafkaListener kifejezetten feliratkozhat egy téma adott partíciójára kezdeti eltolással:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition (@Payload String üzenet, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partíció) {System.out.println ("Fogadott üzenet:" + üzenet "+" partíció: ;}

Mivel a initialOffset elküldték 0-nak ebben a figyelőben, a 0. és három partícióból az összes korábban elfogyasztott üzenet minden egyes felhasználásakor újra felhasználásra kerül. Ha az eltolás beállítása nem szükséges, akkor a partíciók tulajdona @TopicPartition jelölés csak a partíciók beállításához offset nélkül:

@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partíciók = {"0", "1"}))

5.4. Üzenetszűrő hozzáadása a hallgatók számára

A hallgatók egyedi szűrő hozzáadásával konfigurálhatók bizonyos típusú üzenetek fogyasztására. Ezt megtehetjük az a beállításával RecordFilterStrategy hoz KafkaListenerContainerFactory:

@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); gyár.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (rekord -> rekord.érték ((). tartalmazza ("Világ")); visszatérő gyár; }

Ezután a hallgató beállítható a tároló gyár használatára:

@KafkaListener (topic = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter (String üzenet) {System.out.println ("Fogadott üzenet szűrt hallgatóban:" + üzenet); }

Ebben a hallgatóban minden a szűrőnek megfelelő üzenetek eldobásra kerülnek.

6. Egyéni üzenetátalakítók

Eddig csak a Strings üzenetként küldését és fogadását ismertettük. Küldhetünk és fogadhatunk egyedi Java objektumokat is. Ehhez meg kell adni a megfelelő sorosítót ProducerFactory és deserializáló ConsumerFactory.

Nézzünk meg egy egyszerű babosztályt, amelyet üzenetként küldünk:

nyilvános osztály Üdvözlet {privát karakterlánc üzenet; privát karakterlánc neve; // szabványos mérőeszközök, beállítók és kivitelezők}

6.1. Egyéni üzenetek előállítása

Ebben a példában a következőket fogjuk használni JsonSerializer. Nézzük meg a kódot ProducerFactory és KafkaTemplate:

@Bean public ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate greetingKafkaTemplate () {return new KafkaTemplate (greetingProducerFactory ()); }

Ez az új KafkaTemplate felhasználható a Üdvözlet üzenet:

kafkaTemplate.send (topicNév, új üdvözlet ("Hello", "World"));

6.2. Egyéni üzenetek fogyasztása

Hasonlóképpen módosítsuk a ConsumerFactory és KafkaListenerContainerFactory az üdvözlő üzenet helyes deserializálásához:

@Bean public ConsumerFactory greetingConsumerFactory () {// ... return new DefaultKafkaConsumerFactory (props, new StringDeserializer (), new JsonDeserializer (Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (greetingConsumerFactory ()); visszatérő gyár; }

A spring-kafka JSON serializer és deserializer a Jackson könyvtárat használja, amely szintén választható maven függőség a spring-kafka projektnél. Tehát tegyük hozzá a sajátunkhoz pom.xml:

 com.fasterxml.jackson.core jackson-databaseind ​​2.9.7 

A Jackson legújabb verziójának használata helyett ajánlott a pom.xml a tavasz-kafka.

Végül meg kell írnunk egy hallgatót, amelyet el lehet fogyasztani Üdvözlet üzenetek:

@KafkaListener (topic = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener (Greeting greeting) {// üdvözlő üzenet feldolgozása}

7. Következtetés

Ebben a cikkben az Apache Kafka tavaszi támogatásának alapjait ismertettük. Röviden áttekintettük azokat az osztályokat, amelyeket üzenetek küldésére és fogadására használnak.

A cikk teljes forráskódja megtalálható a GitHub oldalon. A kód futtatása előtt ellenőrizze, hogy a Kafka szerver fut-e, és a témákat manuálisan hozzák-e létre.

Perzisztencia alsó

Most jelentettem be az újat Tanulj tavaszt tanfolyam, amelynek középpontjában az 5. tavasz és a tavaszi bakancs 2 alapjai állnak:

>> ELLENŐRIZZE A FOLYAMATOT

$config[zx-auto] not found$config[zx-overlay] not found