Bevezetés Apache Kafkába Tavasszal
Most jelentettem be az újat Tanulj tavaszt tanfolyam, amelynek középpontjában az 5. tavasz és a tavaszi bakancs 2 alapjai állnak:
>> ELLENŐRIZZE A FOLYAMATOT1. Áttekintés
Az Apache Kafka egy elosztott és hibatűrő adatfeldolgozó rendszer.
Ebben a cikkben a Kafka tavaszi támogatásával és az absztrakciók szintjével foglalkozunk a natív Kafka Java kliens API-k felett.
A Spring Kafka az egyszerű és tipikus Spring sablon programozási modellt hozza a KafkaTemplate és Üzenet-vezérelt POJO-k keresztül @KafkaListener annotáció.
2. Telepítés és beállítás
A Kafka letöltéséhez és telepítéséhez olvassa el az itt található hivatalos útmutatót.
Hozzá kell adnunk a tavasz-kafka függőség a mi pom.xml:
org.springframework.kafka spring-kafka 2.3.7.FELHASZNÁLÁS
A műtárgy legújabb verziója itt található.
Példaképünk a Spring Boot alkalmazás lesz.
Ez a cikk feltételezi, hogy a kiszolgálót az alapértelmezett konfigurációval indították el, és egyetlen kiszolgálóport sem változott.
3. Témák beállítása
Korábban parancssori eszközöket futtattunk olyan témák létrehozásához a Kafka-ban, mint például:
$ bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic
De a bevezetésével AdminClient Kafkában most programozottan hozhatunk létre témákat.
Hozzá kell adnunk a KafkaAdmin Tavaszi bab, amely automatikusan hozzáadja a témákat az összes típusú babhoz Új téma:
@Configuration public class KafkaTopicConfig {@Value (value = "$ {kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin () {Map configs = new HashMap (); configs.put (AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin (config); } @Bean public NewTopic topic1 () {return new NewTopic ("baeldung", 1, (rövid) 1); }}
4. Üzenetek előállítása
Üzenetek létrehozásához először konfigurálnunk kell a ProducerFactory amely meghatározza a Kafka létrehozásának stratégiáját Termelő példányok.
Akkor szükségünk van egy KafkaTemplate amely beburkolja a Termelő példányt, és kényelmi módszereket biztosít az üzenetek Kafka témákhoz történő elküldéséhez.
Termelő a példányok szálbiztosak, ezért egyetlen példány használata az alkalmazás egészében nagyobb teljesítményt nyújt. Következésképpen, KakfaTemplate példányok is szálbiztosak, és egy példány használata ajánlott.
4.1. Gyártói konfiguráció
@Configuration public class KafkaProducerConfig {@Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate kafkaTemplate () {return new KafkaTemplate (producerFactory ()); }}
4.2. Üzenetek közzététele
A. Segítségével üzeneteket küldhetünk KafkaTemplate osztály:
@Autowired privát KafkaTemplate kafkaTemplate; public void sendMessage (String msg) {kafkaTemplate.send (topicName, msg); }
A Küld Az API a Hallható jövő tárgy. Ha blokkolni akarjuk a küldő szálat, és megkapjuk az eredményt az elküldött üzenetről, akkor felhívhatjuk a kap API-ja Hallható jövő tárgy. A szál megvárja az eredményt, de ez lelassítja a producert.
A Kafka egy gyors adatfolyam-feldolgozó platform. Ezért jobb ötlet az eredményeket aszinkron módon kezelni, hogy a következő üzenetek ne várják meg az előző üzenet eredményét. Ezt visszahívással tehetjük meg:
public void sendMessage (String üzenet) {ListenableFuture jövő = kafkaTemplate.send (topicNév, üzenet); future.addCallback (új ListenableFutureCallback() {@Override public void onSuccess (SendResult result) {System.out.println ("Elküldött üzenet = [" + üzenet + "] eltolással = [" + result.getRecordMetadata (). Offset () + "]") ; } @Orride public void onFailure (Throwable ex) {System.out.println ("Nem lehet üzenetet küldeni = [" + üzenet + "] a következő miatt:" + ex.getMessage ()); }}); }
5. Üzenetek fogyasztása
5.1. Fogyasztói konfiguráció
Üzenetek fogyasztásához be kell állítanunk a ConsumerFactory és a KafkaListenerContainerFactory. Amint ezek a babok rendelkezésre állnak a tavaszi babgyárban, a POJO-alapú fogyasztókat a segítségével konfigurálhatjuk @KafkaListener annotáció.
@EnableKafka megjegyzés szükséges a konfigurációs osztályon a @KafkaListener felirat a tavaszi kezelt babokra:
@EnableKafka @Configuration nyilvános osztály KafkaConsumerConfig {@Bean public ConsumerFactory consumerFactory () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put (ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory (kellékek); } @Bean public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); gyár.setConsumerFactory (consumerFactory ()); visszatérő gyár; }}
5.2. Üzenetek fogyasztása
@KafkaListener (topic = "topicName", groupId = "foo") public void listenGroupFoo (String üzenet) {System.out.println ("Fogadott üzenet a csoport foo-ban:" + üzenet); }
Több hallgató is megvalósítható egy témához, mindegyik más-más csoport azonosítóval rendelkezik. Ezenkívül egy fogyasztó különböző témákból származó üzeneteket hallgathat:
@KafkaListener (topic = "topic1, topic2", groupId = "foo")
A Spring egy vagy több üzenetfejléc letöltését is támogatja a @Fejléc annotáció a hallgatóban:
@KafkaListener (topic = "topicName") public void listenWithHeaders (@Payload String üzenet, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partíció) {System.out.println ("Fogadott üzenet:" + üzenet "+" a partícióból: "+ partíció);}
5.3. Üzenetek fogyasztása egy adott partícióról
Mint észrevette, mi hoztuk létre a témát baeldung csak egy partícióval. A több partícióval rendelkező téma esetében azonban a @KafkaListener kifejezetten feliratkozhat egy téma adott partíciójára kezdeti eltolással:
@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partitionOffsets = {@PartitionOffset (partition = "0", initialOffset = "0"), @PartitionOffset (partition = "3", initialOffset = "0")}) , containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition (@Payload String üzenet, @Header (KafkaHeaders.RECEIVED_PARTITION_ID) int partíció) {System.out.println ("Fogadott üzenet:" + üzenet "+" partíció: ;}
Mivel a initialOffset elküldték 0-nak ebben a figyelőben, a 0. és három partícióból az összes korábban elfogyasztott üzenet minden egyes felhasználásakor újra felhasználásra kerül. Ha az eltolás beállítása nem szükséges, akkor a partíciók tulajdona @TopicPartition jelölés csak a partíciók beállításához offset nélkül:
@KafkaListener (topicPartitions = @TopicPartition (topic = "topicName", partíciók = {"0", "1"}))
5.4. Üzenetszűrő hozzáadása a hallgatók számára
A hallgatók egyedi szűrő hozzáadásával konfigurálhatók bizonyos típusú üzenetek fogyasztására. Ezt megtehetjük az a beállításával RecordFilterStrategy hoz KafkaListenerContainerFactory:
@Bean public ConcurrentKafkaListenerContainerFactory filterKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); gyár.setConsumerFactory (consumerFactory ()); factory.setRecordFilterStrategy (rekord -> rekord.érték ((). tartalmazza ("Világ")); visszatérő gyár; }
Ezután a hallgató beállítható a tároló gyár használatára:
@KafkaListener (topic = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter (String üzenet) {System.out.println ("Fogadott üzenet szűrt hallgatóban:" + üzenet); }
Ebben a hallgatóban minden a szűrőnek megfelelő üzenetek eldobásra kerülnek.
6. Egyéni üzenetátalakítók
Eddig csak a Strings üzenetként küldését és fogadását ismertettük. Küldhetünk és fogadhatunk egyedi Java objektumokat is. Ehhez meg kell adni a megfelelő sorosítót ProducerFactory és deserializáló ConsumerFactory.
Nézzünk meg egy egyszerű babosztályt, amelyet üzenetként küldünk:
nyilvános osztály Üdvözlet {privát karakterlánc üzenet; privát karakterlánc neve; // szabványos mérőeszközök, beállítók és kivitelezők}
6.1. Egyéni üzenetek előállítása
Ebben a példában a következőket fogjuk használni JsonSerializer. Nézzük meg a kódot ProducerFactory és KafkaTemplate:
@Bean public ProducerFactory greetingProducerFactory () {// ... configProps.put (ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory (configProps); } @Bean public KafkaTemplate greetingKafkaTemplate () {return new KafkaTemplate (greetingProducerFactory ()); }
Ez az új KafkaTemplate felhasználható a Üdvözlet üzenet:
kafkaTemplate.send (topicNév, új üdvözlet ("Hello", "World"));
6.2. Egyéni üzenetek fogyasztása
Hasonlóképpen módosítsuk a ConsumerFactory és KafkaListenerContainerFactory az üdvözlő üzenet helyes deserializálásához:
@Bean public ConsumerFactory greetingConsumerFactory () {// ... return new DefaultKafkaConsumerFactory (props, new StringDeserializer (), new JsonDeserializer (Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory greetingKafkaListenerContainerFactory () {ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory (); factory.setConsumerFactory (greetingConsumerFactory ()); visszatérő gyár; }
A spring-kafka JSON serializer és deserializer a Jackson könyvtárat használja, amely szintén választható maven függőség a spring-kafka projektnél. Tehát tegyük hozzá a sajátunkhoz pom.xml:
com.fasterxml.jackson.core jackson-databaseind 2.9.7
A Jackson legújabb verziójának használata helyett ajánlott a pom.xml a tavasz-kafka.
Végül meg kell írnunk egy hallgatót, amelyet el lehet fogyasztani Üdvözlet üzenetek:
@KafkaListener (topic = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener (Greeting greeting) {// üdvözlő üzenet feldolgozása}
7. Következtetés
Ebben a cikkben az Apache Kafka tavaszi támogatásának alapjait ismertettük. Röviden áttekintettük azokat az osztályokat, amelyeket üzenetek küldésére és fogadására használnak.
A cikk teljes forráskódja megtalálható a GitHub oldalon. A kód futtatása előtt ellenőrizze, hogy a Kafka szerver fut-e, és a témákat manuálisan hozzák-e létre.
Perzisztencia alsó