Kafka és Spring Boot tesztelése

1. Áttekintés

Az Apache Kafka egy erőteljes, elosztott, hibatűrő adatfeldolgozó rendszer. Egy korábbi oktatóanyagban megtanultuk, hogyan kell együtt dolgozni Springvel és Kafkával.

Ebben az oktatóanyagban az előzőre építünk, és megtanuljuk, hogyan kell megbízható, önálló integrációs teszteket írni, amelyek nem támaszkodnak egy futó külső Kafka szerverre.

Először elkezdjük, de megnézzük, hogyan kell használni és konfigurálni a Kafka beágyazott példányát. Ezután meglátjuk, hogyan használhatjuk ki a tesztjeinkből a népszerű Testcontainers keretrendszert.

2. Függőségek

Természetesen hozzá kell adnunk a szabványt tavasz-kafka függőség a mi pom.xml:

 org.springframework.kafka spring-kafka 2.6.3.FELHASZNÁLÁS 

Akkor még két függőségre lesz szükségünk, kifejezetten a tesztjeinkhez. Először hozzáadjuk a tavasz-kafka-teszt műalkotás:

 org.springframework.kafka spring-kafka-test 2.6.3.KÖZLÍTÉSI teszt 

És végül hozzáadjuk a Testcontainers Kafka függőséget, amely a Maven Centralon is elérhető:

 org.testcontainers kafka 1.15.0 teszt 

Miután konfiguráltuk az összes szükséges függőséget, írhatunk egy egyszerű Spring Boot alkalmazást a Kafka segítségével.

3. Egyszerű Kafka Producer-Consumer alkalmazás

A bemutató során tesztjeink középpontjában az egyszerű termelő-fogyasztó Spring Boot Kafka alkalmazás áll.

Kezdjük azzal, hogy meghatározzuk az alkalmazás belépési pontját:

@SpringBootApplication @EnableAutoConfiguration public class KafkaProducerConsumerApplication {public static void main (String [] args) {SpringApplication.run (KafkaProducerConsumerApplication.class, args); }}

Mint láthatjuk, ez egy szokásos Spring Boot alkalmazás. Ahol lehetséges, ki akarjuk használni az alapértelmezett konfigurációs értékeket. Ezt szem előtt tartva felhasználjuk a @EnableAutoConfiguration kommentár az alkalmazás automatikus konfigurálásához.

3.1. Producer Setup

Vizsgáljuk meg egy termelői babot, amelyet felhasználva üzeneteket küldünk egy adott Kafka-témára:

@Component public class KafkaProducer {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaProducer.class); @Autowired privát KafkaTemplate kafkaTemplate; public void send (String topic, String payload) {LOGGER.info ("hasznos terhelés =" {} "küldése a topic =" {} "", hasznos teher, téma); kafkaTemplate.send (téma, hasznos teher); }}

A mi KafkaProducer A fentiekben definiált bab csupán egy burkoló a KafkaTemplate osztály. Ez az osztály magas szintű szálbiztonságos műveleteket biztosít, például adatokat küld a megadott témakörbe, pontosan ezt tesszük a mi tevékenységünkben Küld módszer.

3.2. Fogyasztói beállítások

Hasonlóképpen, most meghatározunk egy egyszerű fogyasztói babot, amely hallgat egy Kafka témát és üzeneteket fogad:

@Component public class KafkaConsumer {private static final Logger LOGGER = LoggerFactory.getLogger (KafkaConsumer.class); privát CountDownLatch retesz = new CountDownLatch (1); privát karakterlánc hasznos teher = null; @KafkaListener (topic = "$ {test.topic}") public void vétel (ConsumerRecord consumerRecord) {LOGGER.info ("kapott hasznos terhelés =" {} "", consumerRecord.toString ()); setPayload (consumerRecord.toString ()); retesz.countDown (); } public CountDownLatch getLatch () {return retesz; } public String getPayload () {return hasznos teher; }}

Egyszerű fogyasztónk használja a @KafkaListener kommentár a kap módszer egy adott témával kapcsolatos üzenetek meghallgatására. Később meglátjuk, hogyan konfiguráljuk a teszt.téma tesztjeinkből.

Ezenkívül a fogadási módszer tárolja az üzenet tartalmát a babunkban, és csökkenti a retesz változó. Ez a változó egy egyszerű szálbiztos számlálómező, amelyet később a tesztjeink során használunk annak biztosítására, hogy sikeresen kaptunk egy üzenetet.

Most, hogy a Spring Boot használatával egyszerû Kafka alkalmazásunk van, nézzük meg, hogyan írhatunk integrációs teszteket.

4. Egy szó a tesztelésről

Általánosságban elmondható, hogy tiszta integrációs tesztek írásakor nem szabad olyan külső szolgáltatásokra támaszkodnunk, amelyeket esetleg nem tudunk ellenőrizni, vagy hirtelen leállhat. Ez káros hatással lehet vizsgálati eredményeinkre.

Hasonlóképpen, ha külső szolgáltatástól függünk, ebben az esetben egy futó Kafka brókertől, akkor valószínűleg nem leszünk képesek beállítani, irányítani és lebontani a tesztjeinktől elvárt módon.

4.1. Alkalmazás tulajdonságai

Tesztjeinkből nagyon könnyű alkalmazáskonfigurációs tulajdonságokat fogunk használni. Ezeket a tulajdonságokat a src / test / resources / application.yml fájl:

tavasz: kafka: fogyasztó: auto-offset-reset: legkorábbi csoport-azonosító: baeldung teszt: téma: beágyazott-teszt-téma

Ez a minimális tulajdonságkészlet, amelyre szükségünk van, ha a Kafka beágyazott példányával vagy egy helyi közvetítővel dolgozunk.

Ezek többsége magától értetődő, de külön kiemelnünk kell a fogyasztói tulajdont auto-offset-reset: legkorábban. Ez a tulajdonság biztosítja, hogy fogyasztói csoportunk megkapja az általunk küldött üzeneteket, mert a tároló elindulhat a küldés befejezése után.

Ezenkívül egy téma tulajdonságot konfigurálunk az értékkel embedded-test-topic, amelyet a tesztjeink során használunk.

5. Tesztelés beágyazott Kafka használatával

Ebben a részben megvizsgáljuk, hogyan lehet memóriában lévő Kafka példányt használni tesztjeink futtatásához. Ez Embedded Kafka néven is ismert.

A függőség tavasz-kafka-teszt korábban hozzáadtunk néhány hasznos segédprogramot, amelyek segítséget nyújtanak alkalmazásunk teszteléséhez. Legfőképpen a BeágyazottKafkaBroker osztály.

Ezt szem előtt tartva folytassuk és írjuk meg az első integrációs tesztünket:

@SpringBootTest @DtiesContext @EmbeddedKafka (partíciók = 1, brokerProperties = {"hallgatók = PLAINTEXT: // localhost: 9092", "port = 9092"}) osztály EmbeddedKafkaIntegrationTest {@Autowired privát KafkaConsumer fogyasztó; @ Hivatalos KafkaProducer privát producer; @Value ("$ {test.topic}") privát karakterlánc-téma; @Test public void givenEmbeddedKafkaBroker_whenSendingtoSimpleProducer_thenMessageReceived () dobja a (z) {producer.send (topic, "Küldés saját egyszerű KafkaProducerrel") kivételt; consumer.getLatch (). várjon (10000, TimeUnit.MILLISECONDS); assertThat (fogyasztó.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), tartalmazzaString ("beágyazott-teszt-téma")); }}

Menjünk végig a tesztünk legfontosabb részein. Először azzal kezdjük, hogy teszt osztályunkat két szép szokásos tavaszi kommentárral díszítjük:

  • A @SpringBootTest a kommentár biztosítja, hogy a tesztindítónk átfogja a Spring alkalmazás kontextust
  • Használjuk a @DtiesContext megjegyzés, amely biztosítja, hogy ezt a kontextust megtisztítsák és visszaállítsák a különböző tesztek között

Itt jön a döntő rész, használjuk a @EmbeddedKafka annotáció egy BeágyazottKafkaBroker tesztjeinkbe. Ezenkívül számos elérhető tulajdonság használható a beágyazott Kafka csomópont konfigurálásához:

  • partíciók - ez a témánként használt partíciók száma. Annak érdekében, hogy a dolgok szépek és egyszerűek legyenek, csak egyet akarunk használni tesztjeink során
  • brokerTulajdonságok - további tulajdonságok a Kafka brókernél. Ismét egyszerűvé tesszük a dolgokat, és megadunk egy egyszerű szöveges figyelőt és egy portszámot

Ezután automatikusan bekötjük a vezetékünket fogyasztó és termelő osztályokat, és konfiguráljon egy témát a mi értékünk felhasználására alkalmazás.tulajdonságok.

A kirakós játék utolsó darabjához egyszerűen üzenetet küldünk a teszt témánkra, és ellenőrizzük, hogy az üzenet megérkezett-e, és tartalmazza-e a teszt témánk nevét.

Amikor lefuttatjuk a tesztünket, a bő tavaszi kimenet között láthatjuk:

... 12: 45: 35.099 [main] INFO cbkafka.embedded.KafkaProducer - sendload = "Küldés saját egyszerű KafkaProducerünkkel" a topic = "embedded-test-topic" ... 12: 45: 35.103 [org .springframework.kafka.KafkaListenerEndpointContainer # 0-0-C-1] INFO cbkafka.embedded.KafkaConsumer - kapott hasznos terhelés = 'ConsumerRecord (topic = embedded-test-topic, partíció = 0, leaderEpoch = 0, offset = 1, CreateTime = 1605267935099, sorosított kulcs mérete = -1, sorosított érték mérete = 41, fejlécek = RecordHeaders (fejléc = [], isReadOnly = hamis),  kulcs = null, érték = Küldés saját egyszerű KafkaProducerünkkel) ' 

Ez megerősíti, hogy tesztünk megfelelően működik. Fantasztikus! Most már módunk van önálló, független integrációs teszteket írni a memóriában lévő Kafka közvetítő segítségével.

6. Kafka tesztelése TestContainers-rel

Néha kis különbségeket tapasztalhatunk egy valódi külső szolgáltatás és egy kifejezetten tesztelési célokra nyújtott szolgáltatás beágyazott memória-példánya között. Bár valószínűtlen, az is lehet, hogy a tesztünk során használt port foglalt, ami hibát okoz.

Ezt szem előtt tartva, ebben a részben a Testcontainers keretrendszer használatával történő korábbi tesztelési megközelítésünk variációját láthatjuk. Meglátjuk, hogyan lehet egy Docker-tárolóban elhelyezett külső Apache Kafka brókert példányosítani és kezelni az integrációs tesztünkből.

Határozzunk meg egy másik integrációs tesztet, amely nagyon hasonló lesz az előző szakaszban láthatóhoz:

@RunWith (SpringRunner.class) @import (com.baeldung.kafka.testcontainers.KafkaTestContainersLiveTest.KafkaTestContainersConfiguration.class) @SpringBootTest (osztályok = KafkaProducerConsumerApplication.class) @DirtiesContext public class KafkaTestContainersLiveTest {@ClassRule public static KafkaContainer Kafka = új KafkaContainer (DockerImageName .parse ("confluentinc / cp-kafka: 5.4.3")); @ Hivatalos KafkaFogyasztói fogyasztó; @ Hivatalos KafkaProducer privát producer; @Value ("$ {test.topic}") privát karakterlánc-téma; @Test public void givenKafkaDockerContainer_whenSendingtoSimpleProducer_thenMessageReceived () dobja a (z) {producer.send (topic, "Küldés saját vezérlővel") kivételt; consumer.getLatch (). várjon (10000, TimeUnit.MILLISECONDS); assertThat (fogyasztó.getLatch (). getCount (), equalTo (0L)); assertThat (consumer.getPayload (), tartalmazzaString ("beágyazott-teszt-téma")); }}

Vessünk egy pillantást a különbségekre ezúttal. Kijelentjük a kafka mező, amely egy szokásos JUnit @ClassRule. Ez a mező a KafkaContainer osztály, amely előkészíti és kezeli a Kafkát futtató konténerünk életciklusát.

A portkonfliktusok elkerülése érdekében a Testcontainers dinamikusan osztja ki a portszámot, amikor a dokkolótárolónk elindul. Ezért az osztály használatával egyedi fogyasztói és gyártói gyári konfigurációt biztosítunk KafkaTestContainersConfiguration:

@Bean public Map consumerConfigs () {Map props = new HashMap (); props.put (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); props.put (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "legkorábban"); props.put (ConsumerConfig.GROUP_ID_CONFIG, "baeldung"); // több szabványos konfiguráció visszatérő kellék; } @Bean public ProducerFactory producerFactory () {Map configProps = new HashMap (); configProps.put (ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.getBootstrapServers ()); // több szabványos konfiguráció adja vissza az új DefaultKafkaProducerFactory (configProps); }

Ezután hivatkozunk erre a konfigurációra a @Import jelölés a tesztünk elején.

Ennek oka az, hogy szükségünk van egy módszerre, amellyel a szerver címét beadhatjuk az alkalmazásunkba, amelyet, ahogyan korábban említettük, dinamikusan generáljuk. Ezt úgy érjük el, hogy felhívjuk a getBootstrapServers () metódus, amely visszaadja a bootstrap kiszolgáló helyét:

bootstrap.servers = [PLAINTEXT: // localhost: 32789]

Most, amikor lefuttatjuk a tesztünket, látnunk kell, hogy a Testcontainers több dolgot is végrehajt:

  • Ellenőrizze a helyi Docker beállításokat.
  • Húzza a confluentinc / cp-kafka: 5.4.3 dokkoló kép, ha szükséges
  • Új tárolót indít, és várja, amíg készen áll
  • Végül leállítja és törli a tartályt, miután a tesztünk befejeződött

Ezt megint megerősíti a teszt kimenetének ellenőrzése:

13: 33: 10.396 [main] INFO 🐳 [confluentinc / cp-kafka: 5.4.3] - Tároló létrehozása a képhez: confluentinc / cp-kafka: 5.4.3 13: 33: 10.454 [main] INFO 🐳 [confluentinc / cp -kafka: 5.4.3] - kiindulva konténer ID: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3 13: 33: 10,785 [fő] INFO 🐳 [confluentinc / CP-Kafka: 5.4.3] - Container confluentinc / CP-Kafka: 5.4.3 kezd: b22b752cee2e9e9e6ade38e46d0c6d881ad941d17223bda073afe4d2fe0559c3

Gyors! Működő integrációs teszt Kafka dokkoló tároló használatával.

7. Következtetés

Ebben a cikkben megismerkedtünk a Kafka alkalmazások Spring Boot alkalmazással történő tesztelésének néhány megközelítésével. Az első megközelítésben azt láttuk, hogyan kell konfigurálni és használni a helyi memóriában lévő Kafka brókert.

Aztán láttuk, hogy a Testcontainers segítségével hogyan állíthatunk be egy külső Kafka brókert, amely a dokkoló tartály belsejében fut.

Mint mindig, a cikk teljes forráskódja elérhető a GitHubon.