Bevezetés a Kafka csatlakozókba
1. Áttekintés
Az Apache Kafka® egy elosztott streaming platform. Egy korábbi oktatóanyagban megvitattuk, hogyan lehet a Kafka fogyasztókat és gyártókat megvalósítani a Spring használatával.
Ebben az oktatóanyagban megtanuljuk a Kafka csatlakozók használatát.
Megnézzük:
- Különböző típusú Kafka csatlakozók
- A Kafka Connect jellemzői és módjai
- Csatlakozók konfigurálása tulajdonságfájlok, valamint a REST API segítségével
2. A Kafka Connect és a Kafka Connectors alapjai
A Kafka Connect egy keretrendszer a Kafka és külső rendszerek összekapcsolására mint például adatbázisok, kulcsérték-tárolók, keresési indexek és fájlrendszerek, ún Csatlakozók.
A Kafka csatlakozók használatra kész alkatrészek, amelyek segítségünkre lehetnek adatok importálása külső rendszerekből a Kafka témákba és exportálja az adatokat a Kafka témákból külső rendszerekbe. Használhatjuk a meglévő csatlakozó megvalósításokat általános adatforrásokhoz és mosogatókhoz, vagy megvalósíthatjuk saját csatlakozóinkat.
A forrás csatlakozó adatokat gyűjt egy rendszerből. A forrásrendszerek lehetnek teljes adatbázisok, adatfolyam-táblák vagy üzenetközvetítők. Egy forráscsatlakozó metrikákat gyűjthet az alkalmazáskiszolgálókról a Kafka témákba is, így az adatok alacsony késleltetéssel elérhetővé válhatnak a folyam feldolgozásához.
A mosogató csatlakozó adatokat szolgáltat a Kafka témákból más rendszerekbe, amelyek lehetnek indexek, például Elasticsearch, kötegelt rendszerek, például Hadoop, vagy bármilyen adatbázis.
Egyes csatlakozókat a közösség fenntart, míg másokat a Confluent vagy partnerei támogatnak. Valójában a legnépszerűbb rendszerek, például az S3, a JDBC és a Cassandra csatlakozóit megtalálhatjuk, csak néhányat említve.
3. Jellemzők
A Kafka Connect szolgáltatásai:
- Keret a külső rendszerek és a Kafka összekapcsolására - leegyszerűsíti a csatlakozók fejlesztését, telepítését és kezelését
- Elosztott és önálló módok - ez segít a nagy klaszterek telepítésében a Kafka elosztott jellegének kihasználásával, valamint fejlesztési, tesztelési és kicsi gyártási telepítések beállításával
- REST interfész - a csatlakozókat egy REST API segítségével kezelhetjük
- Automatikus ofszetkezelés - A Kafka Connect segít nekünk a ellentételezési eljárás, ami megspórolja a problémát a csatlakozók fejlesztésének ezen hibára hajlamos részének manuális végrehajtásával
- Alapértelmezés szerint terjesztett és méretezhető - A Kafka Connect a meglévő csoportkezelési protokollt használja; további munkatársakat vehetünk fel a Kafka Connect fürt bővítéséhez
- Streaming és kötegelt integráció - A Kafka Connect ideális megoldás a streaming és kötegelt adatrendszerek áthidalására a Kafka meglévő képességeivel összefüggésben
- Transzformációk - ezek lehetővé teszik az egyszerű üzenetek egyszerű és könnyű módosítását
4. Beállítás
A sima Kafka disztribúció helyett letöltjük a Confluent Platform nevű Kafka disztribúciót, amelyet a Kafka mögött álló Confluent, Inc. szolgáltat. A Confluent Platform néhány további eszközt és klienst tartalmaz, a sima Kafkához képest, valamint néhány további, előre beépített csatlakozót.
Esetünkben elegendő az Open Source kiadás, amely megtalálható a Confluent webhelyén.
5. Gyors indítás Kafka Connect
Kezdetnek megvitatjuk a Kafka Connect elvét, a legalapvetőbb Csatlakozók használatával, amelyek a fájlok forrás csatlakozó és a fájl mosogató csatlakozó.
Kényelmesen a Confluent Platform mindkét csatlakozóval, valamint referencia konfigurációkkal együtt érkezik.
5.1. Forráscsatlakozó konfigurálása
A forráscsatlakozóhoz a referencia konfiguráció a következő címen érhető el $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties:
név = local-file-source connector.class = FileStreamSource feladatok.max = 1 topic = connect-test fájl = test.txt
Ez a konfiguráció néhány tulajdonsággal rendelkezik, amelyek minden forráscsatlakozónál közösek:
- név a felhasználó által megadott név a csatlakozási példányhoz
- csatlakozó.osztály meghatározza a megvalósítási osztályt, alapvetően a csatlakozó fajtáját
- feladatok.max meghatározza, hogy a forráscsatlakozónk hány példányának fusson párhuzamosan, és
- téma meghatározza azt a témát, amelyre a csatlakozónak el kell küldenie a kimenetet
Ebben az esetben van egy csatlakozóspecifikus attribútumunk is:
- fájl meghatározza azt a fájlt, amelyből a csatlakozónak be kell olvasnia a bemenetet
Ahhoz, hogy ez működjön, hozzunk létre egy alapfájlt néhány tartalommal:
echo -e "foo \ nbar \ n"> $ CONFLUENT_HOME / test.txt
Vegye figyelembe, hogy a munkakönyvtár $ CONFLUENT_HOME.
5.2. Mosogató csatlakozó konfigurálása
A mosogató csatlakozónkhoz a hivatkozási konfigurációt használjuk $ CONFLUENT_HOME / etc / kafka / connect-file-sink.properties:
name = local-file-sink connector.class = FileStreamSink task.max = 1 fájl = test.sink.txt témák = connect-test
Logikailag pontosan ugyanazokat a paramétereket tartalmazza, bár ezúttal csatlakozó.osztály meghatározza a mosogató csatlakozó megvalósítását, és fájl az a hely, ahol az összekötőnek meg kell írnia a tartalmat.
5.3. Worker Config
Végül konfigurálnunk kell a Connect dolgozót, amely integrálja a két csatlakozónkat, és elolvassa a forrás csatlakozóját, és írja az írót a mosogató csatlakozójába.
Ehhez használhatjuk $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties:
bootstrap.servers = localhost: 9092 key.converter = org.apache.kafka.connect.json.JsonConverter value.converter = org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable = false value.converter. schemas.enable = false offset.storage.file.filename = / tmp / connect.offsets offset.flush.interval.ms = 10000 plugin.path = / share / java
Vegye figyelembe, hogy plugin.path tárolhatja az elérési utak listáját, ahol elérhetőek a csatlakozó megvalósításai
Mivel a Kafkához mellékelt csatlakozókat használjuk, beállíthatjuk plugin.path nak nek $ CONFLUENT_HOME / share / java. A Windows rendszerrel való együttműködés során szükség lehet abszolút elérési út megadására.
A többi paraméterhez megadhatjuk az alapértelmezett értékeket:
- bootstrap.szerverek tartalmazza a Kafka brókerek címét
- kulcs.konverter és érték.konverter határozzon meg átalakító osztályokat, amelyek sorosítják és deserializálják az adatokat, amikor azok a forrásból a Kafkába, majd a Kafkából a mosogatóba áramlanak
- key.converter.schemas.enable és value.converter.schemas.enable konverter-specifikus beállítások
- offset.storage.file.filename a legfontosabb beállítás a Connect önálló módban történő futtatásakor: meghatározza, hogy a Connect hol tárolja az eltolás adatait
- offset.flush.interval.ms meghatározza azt az intervallumot, amikor a munkavállaló megpróbálja ellensúlyozni a feladatokat
És a paraméterek listája meglehetősen kiforrott, ezért nézze meg a teljes dokumentációt a hivatalos dokumentációban.
5.4. A Kafka Connect önálló módban
És ezzel megkezdhetjük az első csatlakozó beállítását:
$ CONFLUENT_HOME / bin / connect-standalone \ $ CONFLUENT_HOME / etc / kafka / connect-standalone.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-source.properties \ $ CONFLUENT_HOME / etc / kafka / connect-file-sink. tulajdonságait
Először a parancssor segítségével ellenőrizhetjük a téma tartalmát:
$ CONFLUENT_HOME / bin / kafka-console-consumer - bootstrap-server localhost: 9092 - topic connect-test - from-elejétől
Mint láthatjuk, a forráscsatlakozó az adatokat a test.txt fájlt, átalakította JSON-vá és elküldte Kafkának:
{"schema": {"type": "string", "optional": false}, "payload": "foo"} {"schema": {"type": "string", "választható": false}, "hasznos teher": "bár"}
És ha megnézzük a mappát $ CONFLUENT_HOME, láthatjuk, hogy egy fájl test.sink.txt itt jött létre:
macska $ CONFLUENT_HOME / test.sink.txt foo bar
Ahogy a mosogató csatlakozója kivonja az értéket a hasznos teher attribútumot, és beírja a célfájlba, az adatokat test.sink.txt rendelkezik az eredeti tartalmával test.txt fájl.
Most adjunk hozzá további sorokat test.txt.
Amikor ezt megtesszük, látjuk, hogy a forráscsatlakozó automatikusan észleli ezeket a változásokat.
Csak egy új sort kell beillesztenünk a végére, különben a forráscsatlakozó nem veszi figyelembe az utolsó sort.
Ezen a ponton állítsuk le a Csatlakozás folyamatot, mivel elkezdjük a Csatlakozást elosztott mód néhány sorban.
6. Csatlakoztassa a REST API-t
Eddig minden konfigurációt úgy hajtottunk végre, hogy a tulajdonságfájlokat a parancssoron keresztül továbbítottuk. Mivel azonban a Connect szolgáltatást kívánja futtatni, rendelkezésre áll egy REST API is.
Alapértelmezés szerint a következő címen érhető el: // localhost: 8083. Néhány végpont:
- GET / csatlakozók - visszaküldi az összes használt csatlakozó listáját
- GET / csatlakozók / {név} - egy adott csatlakozó részleteit adja vissza
- POST / csatlakozók - létrehoz egy új csatlakozót; a kérelem törzsének egy JSON objektumnak kell lennie, amely tartalmaz egy string nevű mezőt és egy objektum konfigurációs mezőt a csatlakozó konfigurációs paramétereivel
- GET / csatlakozók / {név} / állapot - visszaadja a csatlakozó aktuális állapotát - beleértve, ha fut, sikertelen vagy szüneteltetve - melyik dolgozóhoz van rendelve, hibainformációk, ha nem sikerült, és az összes feladatának állapota
- DELETE / csatlakozók / {név} - töröl egy csatlakozót, kecsesen leállítja az összes feladatot és törli annak konfigurációját
- GET / csatlakozó-bővítmények - visszaadja a Kafka Connect fürtbe telepített csatlakozó beépülő modulok listáját
A hivatalos dokumentáció felsorolja az összes végpontot.
A következő részben a REST API-t fogjuk használni új csatlakozók létrehozásához.
7. A Kafka Connect elosztott módban
Az önálló mód tökéletesen működik fejlesztéshez és teszteléshez, valamint kisebb beállításokhoz. Ha azonban teljes mértékben ki akarjuk használni a Kafka elosztott természetét, akkor el kell indítanunk a Connectet elosztott módban.
Ezzel a csatlakozási beállításokat és a metaadatokat a fájlrendszer helyett a Kafka témák tárolják. Ennek eredményeként a munkavállalói csomópontok valóban hontalanok.
7.1. A Connect elindítása
Az elosztott mód referencia konfigurációja megtalálható a $ CONFLUENT_HOME oldalon/etc/kafka/connect-distributed.properties.
A paraméterek többnyire megegyeznek az önálló móddal. Csak néhány különbség van:
- csoport.id meghatározza a Connect fürtcsoport nevét. Az értéknek különböznie kell a fogyasztói csoport azonosítójától
- offset.storage.topic, config.storage.topic és status.storage.topic adjon meg témákat ezekhez a beállításokhoz. Minden témához meghatározhatunk egy replikációs tényezőt is
Ismét a hivatalos dokumentáció tartalmazza az összes paraméter listáját.
A Connectet elosztott módban a következőképpen tudjuk elindítani:
$ CONFLUENT_HOME / bin / connect-distribution $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties
7.2. Csatlakozók hozzáadása a REST API használatával
Most, az önálló indítási paranccsal összehasonlítva, nem adtunk át csatlakozókonfigurációkat argumentumként. Ehelyett a REST API segítségével kell létrehoznunk a csatlakozókat.
Az előző példánk felállításához két POST kérést kell küldenünk a címre // localhost: 8083 / connectors amely a következő JSON-struktúrákat tartalmazza.
Először létre kell hoznunk a POST forráscsatlakozó törzsét JSON fájlként. Itt hívjuk connect-file-source.json fájl:
{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-spread.txt", "téma ":" connect-distribution "}}
Vegye figyelembe, hogy ez elég hasonlít az első alkalommal használt referencia konfigurációs fájlhoz.
És akkor POSTOLJUK:
curl -d @ "$ CONFLUENT_HOME / connect-file-source.json" \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors
Ezután ugyanezt tesszük a mosogató csatlakozónál is, meghívjuk a fájlt connect-file-sink.json:
{"name": "local-file-sink", "config": {"connector.class": "FileStreamSink", "task.max": 1, "file": "test-spread.sink.txt", "témakörök": "csatlakozás-terjesztés"}}
És hajtsa végre a POST-ot, mint korábban:
curl -d @ $ CONFLUENT_HOME / connect-file-sink.json \ -H "Tartalomtípus: alkalmazás / json" \ -X POST // localhost: 8083 / csatlakozók
Szükség esetén ellenőrizhetjük, hogy ez a beállítás megfelelően működik-e:
$ CONFLUENT_HOME / bin / kafka-console-consumer - bootstrap-server localhost: 9092 - topic connect-distribution - from-elejétől {"schema": {"type": "string", "opcionális": false}, "hasznos teher": "foo"} {"séma": {"type": "string", "opcionális": false}, "payload": "bar"}
És ha megnézzük a mappát $ CONFLUENT_HOME, láthatjuk, hogy egy fájl test-spread.sink.txt itt jött létre:
macska $ CONFLUENT_HOME / test-distribution.sink.txt foo bar
Miután teszteltük az elosztott telepítést, tisztítsuk meg a két csatlakozó eltávolításával:
curl -X DELETE // localhost: 8083 / connectors / local-file-source curl -X DELETE // localhost: 8083 / connectors / local-file-sink
8. Adatok átalakítása
8.1. Támogatott transzformációk
Az átalakítások lehetővé teszik az egyszerű üzenetek egyszerű és könnyű módosítását.
A Kafka Connect a következő beépített átalakításokat támogatja:
- InsertField - Adjon hozzá egy mezőt statikus adatok vagy metaadatok rögzítésével
- ReplaceField - Mezők szűrése vagy átnevezése
- MaskField - Cseréljen egy mezőt a típus érvénytelen null értékére (például nulla vagy üres karakterlánc)
- HoistField - Csomagolja be az egész eseményt egy mezővé egy struktúra vagy egy térkép belsejében
- ExtractField - Kivonat egy adott mezőt a struktúrából és a térképből, és csak ezt a mezőt vegye fel az eredményekbe
- SetSchemaMetadata - Módosítsa a séma nevét vagy verzióját
- TimestampRouter - Módosítsa a rekord témáját az eredeti téma és az időbélyeg alapján
- RegexRouter - Módosítsa a rekord témáját az eredeti téma, a helyettesítő karakterlánc és a reguláris kifejezés alapján
Az átalakítás a következő paraméterek segítségével konfigurálható:
- átalakul - vesszővel elválasztott álnevek listája a transzformációkhoz
- átalakul. $ alias.type - Az átalakítás osztályneve
- átalakítja. $ alias. $ transformSpecificConfig - Konfiguráció az adott transzformációhoz
8.2. Transzformátor alkalmazása
Néhány transzformációs tulajdonság teszteléséhez állítsuk be a következő két transzformációt:
- Először tekerjük be a teljes üzenetet JSON struktúraként
- Ezután adjunk hozzá egy mezőt ehhez a struktúrához
Transzformációink alkalmazása előtt be kell állítanunk a Csatlakozást a sémátlan JSON használatához, a connect-distributed.tulajdonságok:
key.converter.schemas.enable = hamis érték.converter.schemas.enable = hamis
Ezt követően újra kell indítanunk a Connectet, ismét elosztott módban:
$ CONFLUENT_HOME / bin / connect-distribution $ CONFLUENT_HOME / etc / kafka / connect-distributed.properties
Ismét létre kell hoznunk a POST forráscsatlakozó törzsét JSON fájlként. Itt fogjuk hívni connect-file-source-transform.json fájl.
A már ismert paraméterek mellett hozzáadunk néhány sort a két szükséges transzformációhoz:
{"name": "local-file-source", "config": {"connector.class": "FileStreamSource", "task.max": 1, "file": "test-transform.txt", "téma ":" connect-transformation "," transforms ":" MakeMap, InsertSource "," transforms.MakeMap.type ":" org.apache.kafka.connect.transforms.HoistField $ Value "," transforms.MakeMap.field ": "line", "transforms.InsertSource.type": "org.apache.kafka.connect.transforms.InsertField $ Value", "transforms.InsertSource.static.field": "data_source", "transforms.InsertSource.static.value ":" test-file-source "}}
Ezt követően hajtsuk végre a POST-ot:
curl -d @ $ CONFLUENT_HOME / connect-file-source-transform.json \ -H "Content-Type: application / json" \ -X POST // localhost: 8083 / connectors
Írjunk néhány sort a sajátunkhoz teszt-transzformáció.txt:
Foo bár
Ha most megvizsgáljuk a összekapcsolódás-transzformáció témában a következő sorokat kell kapnunk:
{"line": "Foo", "data_source": "test-file-source"} {"line": "Bar", "data_source": "test-file-source"}
9. A Ready csatlakozók használata
Miután ezeket az egyszerű csatlakozókat használta, nézzük meg a fejlettebb használatra kész csatlakozókat és azok telepítésének módját.
9.1. Hol találhatók a csatlakozók
Az előre épített csatlakozók különböző forrásokból állnak rendelkezésre:
- Néhány csatlakozó sima Apache Kafka-val van ellátva (fájlok és konzolok forrása és mosogatója)
- Néhány további csatlakozó a Confluent Platformhoz van kötve (ElasticSearch, HDFS, JDBC és AWS S3)
- Nézze meg a Confluent Hub alkalmazást is, amely egyfajta alkalmazásbolt a Kafka csatlakozókhoz. A felajánlott csatlakozók száma folyamatosan növekszik:
- Összefolyó csatlakozók (a Confluent fejlesztette, tesztelte, dokumentálta és teljes mértékben támogatja őket)
- Hitelesített csatlakozók (egy harmadik fél által megvalósított és a Confluent tanúsított)
- A közösség által kifejlesztett és támogatott csatlakozók
- Ezen túlmenően a Confluent egy Connectors oldalt is kínál, néhány csatlakozóval, amelyek szintén elérhetők a Confluent Hub-ban, de további közösségi csatlakozókkal is
- És végül vannak olyan gyártók is, akik termékeik részeként biztosítják a csatlakozókat. Például a Landoop biztosítja a Lenses nevű streaming könyvtárat, amely ~ 25 nyílt forráskódú csatlakozót is tartalmaz (sokukat más helyeken is keresztbe sorolják)
9.2. Csatlakozók telepítése a Confluent Hubról
A Confluent vállalati verziója parancsfájlt biztosít a csatlakozók és más összetevők telepítéséhez a Confluent Hubból (a parancsfájl nem szerepel az Open Source verzióban). Ha a vállalati verziót használjuk, akkor a következő paranccsal telepíthetünk egy csatlakozót:
$ CONFLUENT_HOME / bin / confluent-hub install confluentinc / kafka-connect-mqtt: 1.0.0-preview
9.3. Csatlakozók kézi telepítése
Ha olyan csatlakozóra van szükségünk, amely nem áll rendelkezésre a Confluent Hubon, vagy ha a Confluent nyílt forráskódú verziójával rendelkezünk, manuálisan telepíthetjük a szükséges csatlakozókat. Ehhez le kell töltenünk és kibontanunk a csatlakozót, valamint áthelyezzük a mellékelt lib-eket a következőként megadott mappába plugin.path.
Minden egyes csatlakozó esetében az archívumnak tartalmaznia kell két számunkra érdekes mappát:
- A lib mappa tartalmazza például a csatlakozó edényt, kafka-connect-mqtt-1.0.0-preview.jar, valamint még néhány üveg, amelyet a csatlakozó megkövetel
- A stb. mappa egy vagy több referencia konfigurációs fájlt tartalmaz
Meg kell mozgatnunk a lib mappa ide $ CONFLUENT_HOME / share / java, vagy akármelyik utat is megadtuk plugin.path ban ben connect- standalone.properties és connect-distributed.tulajdonságok. Ennek során értelme lehet a mappa átnevezése valami értelmesre.
A config fájlokat felhasználhatjuk a stb. vagy hivatkozással rájuk, miközben önálló módban indulunk, vagy egyszerűen megragadhatjuk a tulajdonságokat, és létrehozhatunk belőlük JSON fájlt.
10. Következtetés
Ebben az oktatóanyagban megvizsgáltuk a Kafka Connect telepítését és használatát.
Megvizsgáltuk a csatlakozók típusait, mind a forrást, mind a mosogatót. Megvizsgáltunk néhány olyan funkciót és módot, amelyekben a Connect futtatható. Ezután áttekintettük a transzformátorokat. És végül megtanultuk, hogy hol szerezzünk be és hogyan telepítsünk egyedi csatlakozókat.
Mint mindig, a konfigurációs fájlok megtalálhatók a GitHubon.