Bevezetés a Kafka csatlakozókba

1. Áttekintés

Az Apache Kafka® egy elosztott streaming platform. Egy korábbi oktatóanyagban megvitattuk, hogyan lehet a Kafka fogyasztókat és gyártókat megvalósítani a Spring használatával.

Ebben az oktatóanyagban megtanuljuk a Kafka csatlakozók használatát.

Megnézzük:

  • Különböző típusú Kafka csatlakozók
  • A Kafka Connect jellemzői és módjai
  • Csatlakozók konfigurálása tulajdonságfájlok, valamint a REST API segítségével

2. A Kafka Connect és a Kafka Connectors alapjai

A Kafka Connect egy keretrendszer a Kafka és külső rendszerek összekapcsolására mint például adatbázisok, kulcsérték-tárolók, keresési indexek és fájlrendszerek, ún Csatlakozók.

A Kafka csatlakozók használatra kész alkatrészek, amelyek segítségünkre lehetnek adatok importálása külső rendszerekből a Kafka témákba és exportálja az adatokat a Kafka témákból külső rendszerekbe. Használhatjuk a meglévő csatlakozó megvalósításokat általános adatforrásokhoz és mosogatókhoz, vagy megvalósíthatjuk saját csatlakozóinkat.

A forrás csatlakozó adatokat gyűjt egy rendszerből. A forrásrendszerek lehetnek teljes adatbázisok, adatfolyam-táblák vagy üzenetközvetítők. Egy forráscsatlakozó metrikákat gyűjthet az alkalmazáskiszolgálókról a Kafka témákba is, így az adatok alacsony késleltetéssel elérhetővé válhatnak a folyam feldolgozásához.

A mosogató csatlakozó adatokat szolgáltat a Kafka témákból más rendszerekbe, amelyek lehetnek indexek, például Elasticsearch, kötegelt rendszerek, például Hadoop, vagy bármilyen adatbázis.

Egyes csatlakozókat a közösség fenntart, míg másokat a Confluent vagy partnerei támogatnak. Valójában a legnépszerűbb rendszerek, például az S3, a JDBC és a Cassandra csatlakozóit megtalálhatjuk, csak néhányat említve.

3. Jellemzők

A Kafka Connect szolgáltatásai:

  • Keret a külső rendszerek és a Kafka összekapcsolására - leegyszerűsíti a csatlakozók fejlesztését, telepítését és kezelését
  • Elosztott és önálló módok - ez segít a nagy klaszterek telepítésében a Kafka elosztott jellegének kihasználásával, valamint fejlesztési, tesztelési és kicsi gyártási telepítések beállításával
  • REST interfész - a csatlakozókat egy REST API segítségével kezelhetjük
  • Automatikus ofszetkezelés - A Kafka Connect segít nekünk a ellentételezési eljárás, ami megspórolja a problémát a csatlakozók fejlesztésének ezen hibára hajlamos részének manuális végrehajtásával
  • Alapértelmezés szerint terjesztett és méretezhető - A Kafka Connect a meglévő csoportkezelési protokollt használja; további munkatársakat vehetünk fel a Kafka Connect fürt bővítéséhez
  • Streaming és kötegelt integráció - A Kafka Connect ideális megoldás a streaming és kötegelt adatrendszerek áthidalására a Kafka meglévő képességeivel összefüggésben
  • Transzformációk - ezek lehetővé teszik az egyszerű üzenetek egyszerű és könnyű módosítását

4. Beállítás

A sima Kafka disztribúció helyett letöltjük a Confluent Platform nevű Kafka disztribúciót, amelyet a Kafka mögött álló Confluent, Inc. szolgáltat. A Confluent Platform néhány további eszközt és klienst tartalmaz, a sima Kafkához képest, valamint néhány további, előre beépített csatlakozót.

Esetünkben elegendő az Open Source kiadás, amely megtalálható a Confluent webhelyén.

5. Gyors indítás Kafka Connect

Kezdetnek megvitatjuk a Kafka Connect elvét, a legalapvetőbb Csatlakozók használatával, amelyek a fájlok forrás csatlakozó és a fájl mosogató csatlakozó.

Kényelmesen a Confluent Platform mindkét csatlakozóval, valamint referencia konfigurációkkal együtt érkezik.

5.1. Forráscsatlakozó konfigurálása

A forráscsatlakozóhoz a referencia konfiguráció a következő címen érhető el $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:

név = local-file-source connector.class = FileStreamSource feladatok.max = 1 topic = connect-test fájl = test.txt

Ez a konfiguráció néhány tulajdonsággal rendelkezik, amelyek minden forráscsatlakozónál közösek:

  • név a felhasználó által megadott név a csatlakozási példányhoz
  • csatlakozó.osztály meghatározza a megvalósítási osztályt, alapvetően a csatlakozó fajtáját
  • feladatok.max meghatározza, hogy a forráscsatlakozónk hány példányának fusson párhuzamosan, és
  • téma meghatározza azt a témát, amelyre a csatlakozónak el kell küldenie a kimenetet

Ebben az esetben van egy csatlakozóspecifikus attribútumunk is:

  • fájl meghatározza azt a fájlt, amelyből a csatlakozónak be kell olvasnia a bemenetet

Ahhoz, hogy ez működjön, hozzunk létre egy alapfájlt néhány tartalommal:

echo -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt

Vegye figyelembe, hogy a munkakönyvtár $ CONFLUENT_HOME.

5.2. Mosogató csatlakozó konfigurálása

A mosogató csatlakozónkhoz a hivatkozási konfigurációt használjuk $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:

name = local-file-sink connector.class = FileStreamSink task.max = 1 fájl = test.sink.txt témák = connect-test

Logikailag pontosan ugyanazokat a paramétereket tartalmazza, bár ezúttal csatlakozó.osztály meghatározza a mosogató csatlakozó megvalósítását, és fájl az a hely, ahol az összekötőnek meg kell írnia a tartalmat.

5.3. Worker Config

Végül konfigurálnunk kell a Connect dolgozót, amely integrálja a két csatlakozónkat, és elolvassa a forrás csatlakozóját, és írja az írót a mosogató csatlakozójába.

Ehhez használhatjuk $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:

bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java

Vegye figyelembe, hogy plugin.path tárolhatja az elérési utak listáját, ahol elérhetőek a csatlakozó megvalósításai

Mivel a Kafkához mellékelt csatlakozókat használjuk, beállíthatjuk plugin.path nak nek $ CONFLUENT_HOME / share / java. A Windows rendszerrel való együttműködés során szükség lehet abszolút elérési út megadására.

A többi paraméterhez megadhatjuk az alapértelmezett értékeket:

  • bootstrap.szerverek tartalmazza a Kafka brókerek címét
  • kulcs.konverter és érték.konverter határozzon meg átalakító osztályokat, amelyek sorosítják és deserializálják az adatokat, amikor azok a forrásból a Kafkába, majd a Kafkából a mosogatóba áramlanak
  • key.converter.schemas.enable és value.converter.schemas.enable konverter-specifikus beállítások
  • offset.storage.file.filename a legfontosabb beállítás a Connect önálló módban történő futtatásakor: meghatározza, hogy a Connect hol tárolja az eltolás adatait
  • offset.flush.interval.ms meghatározza azt az intervallumot, amikor a munkavállaló megpróbálja ellensúlyozni a feladatokat

És a paraméterek listája meglehetősen kiforrott, ezért nézze meg a teljes dokumentációt a hivatalos dokumentációban.

5.4. A Kafka Connect önálló módban

És ezzel megkezdhetjük az első csatlakozó beállítását:

$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. tulajdonságait

Először a parancssor segítségével ellenőrizhetjük a téma tartalmát:

$ CONFLUENT_HOME / bin / kafka-console-consumer - bootstrap-server localhost: 9092 - topic connect-test - from-elejétől

Mint láthatjuk, a forráscsatlakozó az adatokat a test.txt fájlt, átalakította JSON-vá és elküldte Kafkának:

{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "választható": false}, "hasznos teher": "bár"}

És ha megnézzük a mappát $ CONFLUENT_HOME, láthatjuk, hogy egy fájl test.sink.txt itt jött létre:

macska $ CONFLUENT_HOME / test.sink.txt foo bar

Ahogy a mosogató csatlakozója kivonja az értéket a hasznos teher attribútumot, és beírja a célfájlba, az adatokat test.sink.txt rendelkezik az eredeti tartalmával test.txt fájl.

Most adjunk hozzá további sorokat test.txt.

Amikor ezt megtesszük, látjuk, hogy a forráscsatlakozó automatikusan észleli ezeket a változásokat.

Csak egy új sort kell beillesztenünk a végére, különben a forráscsatlakozó nem veszi figyelembe az utolsó sort.

Ezen a ponton állítsuk le a Csatlakozás folyamatot, mivel elkezdjük a Csatlakozást elosztott mód néhány sorban.

6. Csatlakoztassa a REST API-t

Eddig minden konfigurációt úgy hajtottunk végre, hogy a tulajdonságfájlokat a parancssoron keresztül továbbítottuk. Mivel azonban a Connect szolgáltatást kívánja futtatni, rendelkezésre áll egy REST API is.

Alapértelmezés szerint a következő címen érhető el: // localhost: 8083. Néhány végpont:

  • GET / csatlakozók - visszaküldi az összes használt csatlakozó listáját
  • GET / csatlakozók / {név} - egy adott csatlakozó részleteit adja vissza
  • POST / csatlakozók - létrehoz egy új csatlakozót; a kérelem törzsének egy JSON objektumnak kell lennie, amely tartalmaz egy string nevű mezőt és egy objektum konfigurációs mezőt a csatlakozó konfigurációs paramétereivel
  • GET / csatlakozók / {név} / állapot - visszaadja a csatlakozó aktuális állapotát - beleértve, ha fut, sikertelen vagy szüneteltetve - melyik dolgozóhoz van rendelve, hibainformációk, ha nem sikerült, és az összes feladatának állapota
  • DELETE / csatlakozók / {név} - töröl egy csatlakozót, kecsesen leállítja az összes feladatot és törli annak konfigurációját
  • GET / csatlakozó-bővítmények - visszaadja a Kafka Connect fürtbe telepített csatlakozó beépülő modulok listáját

A hivatalos dokumentáció felsorolja az összes végpontot.

A következő részben a REST API-t fogjuk használni új csatlakozók létrehozásához.

7. A Kafka Connect elosztott módban

Az önálló mód tökéletesen működik fejlesztéshez és teszteléshez, valamint kisebb beállításokhoz. Ha azonban teljes mértékben ki akarjuk használni a Kafka elosztott természetét, akkor el kell indítanunk a Connectet elosztott módban.

Ezzel a csatlakozási beállításokat és a metaadatokat a fájlrendszer helyett a Kafka témák tárolják. Ennek eredményeként a munkavállalói csomópontok valóban hontalanok.

7.1. A Connect elindítása

Az elosztott mód referencia konfigurációja megtalálható a $ CONFLUENT_HOME oldalon/etc/kafka/connect-distributed.properties.

A paraméterek többnyire megegyeznek az önálló móddal. Csak néhány különbség van:

  • csoport.id meghatározza a Connect fürtcsoport nevét. Az értéknek különböznie kell a fogyasztói csoport azonosítójától
  • offset.storage.topic, config.storage.topic és status.storage.topic adjon meg témákat ezekhez a beállításokhoz. Minden témához meghatározhatunk egy replikációs tényezőt is

Ismét a hivatalos dokumentáció tartalmazza az összes paraméter listáját.

A Connectet elosztott módban a következőképpen tudjuk elindítani:

$ CONFLUENT_HOME / bin / connect-distribution $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

7.2. Csatlakozók hozzáadása a REST API használatával

Most, az önálló indítási paranccsal összehasonlítva, nem adtunk át csatlakozókonfigurációkat argumentumként. Ehelyett a REST API segítségével kell létrehoznunk a csatlakozókat.

Az előző példánk felállításához két POST kérést kell küldenünk a címre // localhost: 8083 / connectors amely a következő JSON-struktúrákat tartalmazza.

Először létre kell hoznunk a POST forráscsatlakozó törzsét JSON fájlként. Itt hívjuk connect-file-source.json fájl:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-spread.txt", "téma ":" connect-distribution "}}

Vegye figyelembe, hogy ez elég hasonlít az első alkalommal használt referencia konfigurációs fájlhoz.

És akkor POSTOLJUK:

curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Ezután ugyanezt tesszük a mosogató csatlakozónál is, meghívjuk a fájlt connect-file-sink.json:

{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "task.max": 1, "file": "test-spread.sink.txt", "témakörök": "csatlakozás-terjesztés"}}

És hajtsa végre a POST-ot, mint korábban:

curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Tartalomtípus: alkalmazás / json" \ -X POST // localhost: 8083 / csatlakozók

Szükség esetén ellenőrizhetjük, hogy ez a beállítás megfelelően működik-e:

$ CONFLUENT_HOME / bin / kafka-console-consumer - bootstrap-server localhost: 9092 - topic connect-distribution - from-elejétől {"schema": {"type": "string", "opcionális": false}, "hasznos teher": "foo"} {"séma": {"type": "string", "opcionális": false}, "payload": "bar"}

És ha megnézzük a mappát $ CONFLUENT_HOME, láthatjuk, hogy egy fájl test-spread.sink.txt itt jött létre:

macska $ CONFLUENT_HOME / test-distribution.sink.txt foo bar

Miután teszteltük az elosztott telepítést, tisztítsuk meg a két csatlakozó eltávolításával:

curl -X DELETE // localhost: 8083 / connectors / local-file-source curl -X DELETE // localhost: 8083 / connectors / local-file-sink

8. Adatok átalakítása

8.1. Támogatott transzformációk

Az átalakítások lehetővé teszik az egyszerű üzenetek egyszerű és könnyű módosítását.

A Kafka Connect a következő beépített átalakításokat támogatja:

  • InsertField - Adjon hozzá egy mezőt statikus adatok vagy metaadatok rögzítésével
  • ReplaceField - Mezők szűrése vagy átnevezése
  • MaskField - Cseréljen egy mezőt a típus érvénytelen null értékére (például nulla vagy üres karakterlánc)
  • HoistField - Csomagolja be az egész eseményt egy mezővé egy struktúra vagy egy térkép belsejében
  • ExtractField - Kivonat egy adott mezőt a struktúrából és a térképből, és csak ezt a mezőt vegye fel az eredményekbe
  • SetSchemaMetadata - Módosítsa a séma nevét vagy verzióját
  • TimestampRouter - Módosítsa a rekord témáját az eredeti téma és az időbélyeg alapján
  • RegexRouter - Módosítsa a rekord témáját az eredeti téma, a helyettesítő karakterlánc és a reguláris kifejezés alapján

Az átalakítás a következő paraméterek segítségével konfigurálható:

  • átalakul - vesszővel elválasztott álnevek listája a transzformációkhoz
  • átalakul. $ alias.type - Az átalakítás osztályneve
  • átalakítja. $ alias. $ transformSpecificConfig - Konfiguráció az adott transzformációhoz

8.2. Transzformátor alkalmazása

Néhány transzformációs tulajdonság teszteléséhez állítsuk be a következő két transzformációt:

  • Először tekerjük be a teljes üzenetet JSON struktúraként
  • Ezután adjunk hozzá egy mezőt ehhez a struktúrához

Transzformációink alkalmazása előtt be kell állítanunk a Csatlakozást a sémátlan JSON használatához, a connect-distributed.tulajdonságok:

key.converter.schemas.enable = hamis érték.converter.schemas.enable = hamis

Ezt követően újra kell indítanunk a Connectet, ismét elosztott módban:

$ CONFLUENT_HOME / bin / connect-distribution $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties

Ismét létre kell hoznunk a POST forráscsatlakozó törzsét JSON fájlként. Itt fogjuk hívni connect-file-source-transform.json fájl.

A már ismert paraméterek mellett hozzáadunk néhány sort a két szükséges transzformációhoz:

{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-transform.txt", "téma ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-file-source "}}

Ezt követően hajtsuk végre a POST-ot:

curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors

Írjunk néhány sort a sajátunkhoz teszt-transzformáció.txt:

Foo bár

Ha most megvizsgáljuk a összekapcsolódás-transzformáció témában a következő sorokat kell kapnunk:

{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}

9. A Ready csatlakozók használata

Miután ezeket az egyszerű csatlakozókat használta, nézzük meg a fejlettebb használatra kész csatlakozókat és azok telepítésének módját.

9.1. Hol találhatók a csatlakozók

Az előre épített csatlakozók különböző forrásokból állnak rendelkezésre:

  • Néhány csatlakozó sima Apache Kafka-val van ellátva (fájlok és konzolok forrása és mosogatója)
  • Néhány további csatlakozó a Confluent Platformhoz van kötve (ElasticSearch, HDFS, JDBC és AWS S3)
  • Nézze meg a Confluent Hub alkalmazást is, amely egyfajta alkalmazásbolt a Kafka csatlakozókhoz. A felajánlott csatlakozók száma folyamatosan növekszik:
    • Összefolyó csatlakozók (a Confluent fejlesztette, tesztelte, dokumentálta és teljes mértékben támogatja őket)
    • Hitelesített csatlakozók (egy harmadik fél által megvalósított és a Confluent tanúsított)
    • A közösség által kifejlesztett és támogatott csatlakozók
  • Ezen túlmenően a Confluent egy Connectors oldalt is kínál, néhány csatlakozóval, amelyek szintén elérhetők a Confluent Hub-ban, de további közösségi csatlakozókkal is
  • És végül vannak olyan gyártók is, akik termékeik részeként biztosítják a csatlakozókat. Például a Landoop biztosítja a Lenses nevű streaming könyvtárat, amely ~ 25 nyílt forráskódú csatlakozót is tartalmaz (sokukat más helyeken is keresztbe sorolják)

9.2. Csatlakozók telepítése a Confluent Hubról

A Confluent vállalati verziója parancsfájlt biztosít a csatlakozók és más összetevők telepítéséhez a Confluent Hubból (a parancsfájl nem szerepel az Open Source verzióban). Ha a vállalati verziót használjuk, akkor a következő paranccsal telepíthetünk egy csatlakozót:

$ CONFLUENT_HOME / bin / confluent-hub install confluentinc / kafka-connect-mqtt: 1.0.0-preview

9.3. Csatlakozók kézi telepítése

Ha olyan csatlakozóra van szükségünk, amely nem áll rendelkezésre a Confluent Hubon, vagy ha a Confluent nyílt forráskódú verziójával rendelkezünk, manuálisan telepíthetjük a szükséges csatlakozókat. Ehhez le kell töltenünk és kibontanunk a csatlakozót, valamint áthelyezzük a mellékelt lib-eket a következőként megadott mappába plugin.path.

Minden egyes csatlakozó esetében az archívumnak tartalmaznia kell két számunkra érdekes mappát:

  • A lib mappa tartalmazza például a csatlakozó edényt, kafka-connect-mqtt-1.0.0-preview.jar, valamint még néhány üveg, amelyet a csatlakozó megkövetel
  • A stb. mappa egy vagy több referencia konfigurációs fájlt tartalmaz

Meg kell mozgatnunk a lib mappa ide $ CONFLUENT_HOME / share / java, vagy akármelyik utat is megadtuk plugin.path ban ben connect- standalone.properties és connect-distributed.tulajdonságok. Ennek során értelme lehet a mappa átnevezése valami értelmesre.

A config fájlokat felhasználhatjuk a stb. vagy hivatkozással rájuk, miközben önálló módban indulunk, vagy egyszerűen megragadhatjuk a tulajdonságokat, és létrehozhatunk belőlük JSON fájlt.

10. Következtetés

Ebben az oktatóanyagban megvizsgáltuk a Kafka Connect telepítését és használatát.

Megvizsgáltuk a csatlakozók típusait, mind a forrást, mind a mosogatót. Megvizsgáltunk néhány olyan funkciót és módot, amelyekben a Connect futtatható. Ezután áttekintettük a transzformátorokat. És végül megtanultuk, hogy hol szerezzünk be és hogyan telepítsünk egyedi csatlakozókat.

Mint mindig, a konfigurációs fájlok megtalálhatók a GitHubon.


$config[zx-auto] not found$config[zx-overlay] not found