ETL a Spring Cloud Data Flow-val

1. Áttekintés

A Spring Cloud Data Flow egy felhő-natív eszközkészlet valós idejű adatcsatornák és kötegelt folyamatok kiépítésére. A Spring Cloud Data Flow készen áll számos adatfeldolgozási felhasználási eset felhasználására, például egyszerű importálás / exportálás, ETL-feldolgozás, esemény streaming és prediktív elemzés.

Ebben az oktatóanyagban megismerhetünk egy valós idejű átalakítás és betöltés (ETL) példát egy olyan adatfolyam segítségével, amely kivonja az adatokat egy JDBC adatbázisból, átalakítja egyszerű POJO-kká és betölt egy MongoDB-be.

2. ETL és Event-Stream feldolgozás

Az ETL-t - kivonat, átalakítás és betöltés - általában folyamatnak nevezték, amely több adatbázisból és rendszerből adatokat kötegelten tölt be egy közös adattárházba. Ebben az adattárházban nehéz elvégezni az adatok elemzését anélkül, hogy ez veszélyeztetné a rendszer általános teljesítményét.

Az új trendek azonban megváltoztatják ennek módját. Az ETL-nek továbbra is szerepe van az adatok adattárházakba és adattavakba történő továbbításában.

Manapság ezt meg lehet tenni folyamok eseményfolyam-architektúrában a Spring Cloud Data Flow segítségével.

3. Tavaszi felhő adatfolyam

A Spring Cloud Data Flow (SCDF) segítségével a fejlesztők kétféle ízt hozhatnak létre:

  • Hosszú élettartamú, valós idejű adatfolyam-alkalmazások a Spring Cloud Stream segítségével
  • Rövid életű kötegelt feladatalkalmazások a Spring Cloud Task használatával

Ebben a cikkben kitérünk az első, a Spring Cloud Stream-en alapuló, hosszú életű streaming alkalmazásra.

3.1. Tavaszi Cloud Stream alkalmazások

Az SCDF Stream csővezetékek lépésekből állnak, holminden lépés egy Spring Boot stílusban épített alkalmazás, a Spring Cloud Stream mikrokeret segítségével. Ezeket az alkalmazásokat egy olyan üzenetküldő köztes szoftver integrálja, mint az Apache Kafka vagy a RabbitMQ.

Ezeket az alkalmazásokat forrásokba, processzorokba és mosogatókba sorolják. Az ETL folyamattal összehasonlítva azt mondhatnánk, hogy a forrás a „kivonat”, a processzor a „transzformátor”, a mosogató pedig a „terhelés” rész.

Bizonyos esetekben alkalmazhatunk egy alkalmazásindítót a folyamat egy vagy több lépésében. Ez azt jelenti, hogy egy lépéshez nem kell új alkalmazást telepítenünk, hanem konfigurálnia kell egy már meglévő alkalmazásindítót.

Az alkalmazásindítók listája itt található.

3.2. Spring Cloud Data Flow kiszolgáló

Az architektúra utolsó része a Spring Cloud Data Flow Server. Az SCDF Server az alkalmazások és a folyamatfolyam telepítését a Spring Cloud Deployer Specification segítségével hajtja végre. Ez a specifikáció támogatja az SCDF felhőalapú ízét azáltal, hogy számos modern futási időre telepíti, mint például a Kubernetes, az Apache Mesos, a fonal és a Cloud Foundry.

Ezenkívül futtathatjuk az adatfolyamot helyi telepítésként.

További információ az SCDF architektúráról itt található.

4. Környezet beállítása

Mielőtt nekilátnánk, meg kell válassza ki ennek a komplex telepítésnek a darabjait. Az első meghatározandó darab az SCDF Server.

Teszteléshez az SCDF Server Local-ot fogjuk használni a helyi fejlesztéshez. A gyártási telepítéshez később választhatunk felhőalapú futást, például az SCDF Server Kubernetes szolgáltatást. A szerver futásideinek listáját itt találjuk.

Most ellenőrizzük a szerver futtatásához szükséges rendszerkövetelményeket.

4.1. rendszerkövetelmények

Az SCDF Server futtatásához két függőséget kell meghatároznunk és beállítanunk:

  • az üzenetküldő köztes szoftver, és
  • az RDBMS.

Az üzenetkezelő köztes szoftverhez dolgozni fogunk a RabbitMQ-val, és a PostgreSQL-t választjuk RDBMS-nek a csővezeték-áramlat-meghatározásaink tárolásához.

A RabbitMQ futtatásához töltse le itt a legújabb verziót, és indítson egy RabbitMQ példányt az alapértelmezett konfiguráció használatával, vagy futtassa a következő Docker parancsot:

docker run --name dataflow-rabbit -p 15672: 15672 -p 5672: 5672 -d rabbitmq: 3-management

Utolsó telepítési lépésként telepítse és futtassa a PostgreSQL RDBMS-t az alapértelmezett 5432-es portra. Ezután hozzon létre egy adatbázist, ahol az SCDF az alábbi szkript segítségével tárolhatja adatfolyam-definícióit:

CREATE DATABASE adatfolyam;

4.2. Spring Cloud Data Flow Server helyi

Az SCDF Server Local futtatásához választhatjuk, hogy a szervert a következő használatával indítjuk dokkoló-ír, vagy elindíthatjuk Java alkalmazásként.

Itt futtatjuk az SCDF Server Local programot Java alkalmazásként. Az alkalmazás konfigurálásához meg kell határoznunk a konfigurációt Java alkalmazás paramétereként. Szükségünk lesz a Java 8-ra a rendszer elérési útjában.

Az üvegek és függőségek tárolásához létre kell hoznunk egy otthoni mappát az SCDF szerverünk számára, és le kell töltenünk az SCDF Server helyi disztribúciót ebbe a mappába. Az SCDF Server Local legújabb terjesztését itt töltheti le.

Létre kell hoznunk egy lib mappát, és oda kell raknunk egy JDBC illesztőprogramot. A PostgreSQL illesztőprogram legújabb verziója itt érhető el.

Végül futtassuk az SCDF helyi szervert:

$ java -Dloader.path = lib -jar spring-cloud-dataflow-server-local-1.6.3.RELEASE.jar \ --spring.datasource.url = jdbc: postgresql: //127.0.0.1: 5432 / dataflow \ --spring.datasource.username = postgres_username \ --spring.datasource.password = postgres_password \ --spring.datasource.driver-class-name = org.postgresql.Driver \ --spring.rabbitmq.host = 127.0.0.1 \ --spring.rabbitmq.port = 5672 \ --spring.rabbitmq.username = vendég \ --spring.rabbitmq.password = vendég

A következő URL-címen ellenőrizhetjük, hogy fut-e:

// localhost: 9393 / műszerfal

4.3. Tavaszi felhő adatfolyam héj

Az SCDF Shell egy parancssori eszköz, amely megkönnyíti alkalmazások és csővezetékek összeállítását és telepítését. Ezek a Shell parancsok a Spring Cloud Data Flow Server REST API-n futnak.

Töltse le az üveg legújabb verzióját az SCDF otthoni mappájába, amely itt érhető el. Miután elkészült, futtassa a következő parancsot (frissítse a verziót, ha szükséges):

$ java -jar spring-cloud-dataflow-shell-1.6.3.RELEASE.jar ____ ____ _ __ / ___ | _ __ _ __ (_) _ __ __ _ / ___ | | ___ _ _ __ | | \ ___ \ | '_ \ | '__ | | "_ \ / _" | | | | | / _ \ | | | | / _` | ___) | | _) | | | | | | | (_ | | | | ___ | | (_) | | _ | | (_ | | | ____ / | .__ / | _ | | _ | _ | | _ | \ __, | \ ____ | _ | \ ___ / \ __, _ | \ __, _ | ____ | _ | _ __ | ___ / __________ | _ \ __ _ | | _ __ _ | ___ | | _____ __ \ \ \ \ \ \ | | | | / _` | __ / _` | | | _ | | / _ \ \ / \ / / \ \ \ \ \ \ | | _ | | (_ | | || (_ | | | _ | | | (_) \ VV / / / / / / / / | ____ / \ __, _ | \ __ \ __, _ | | _ | | _ | \ ___ / \ _ / \ _ / / _ / _ / _ / _ / _ / Üdvözöljük a Spring Cloud Data Flow héj. Segítségért nyomja meg a TAB billentyűt, vagy írja be a "help" elemet

Ha a „adatfolyam:> ” kapsz "szerver-ismeretlen:> ” az utolsó sorban nem az localhost-on futtatja az SCDF szervert. Ebben az esetben futtassa a következő parancsot egy másik gazdagéphez való csatlakozáshoz:

server-unknown:> adatfolyam konfigurációs kiszolgáló // {host}

Most a Shell csatlakozik az SCDF szerverhez, és futtathatjuk a parancsokat.

Az első dolog, amit meg kell tennünk a Shellben, az az alkalmazásindítók importálása. Itt megtalálja a RabbitMQ + Maven legújabb verzióját a Spring Boot 2.0.x alkalmazásban, és futtassa a következő parancsot (frissítse újra a verziót, itt “Darwin-SR1", szükség szerint):

$ dataflow:> alkalmazásimport --uri //bit.ly/Darwin-SR1-stream-applications-rabbit-maven

A telepített alkalmazások ellenőrzéséhez futtassa a következő Shell parancsot:

$ dataflow:> alkalmazáslista

Ennek eredményeként látnunk kell egy táblázatot, amely tartalmazza az összes telepített alkalmazást.

Az SCDF emellett egy grafikus felületet is kínál, név szerint Flo, amelyet ezen a címen érhetünk el: // localhost: 9393 / műszerfal. Használata azonban nem tartozik a cikk hatálya alá.

5. ETL csővezeték összeállítása

Most hozzuk létre az adatfolyam-vezetékünket. Ehhez a JDBC Source alkalmazásindítót használjuk fel információk kinyerésére a relációs adatbázisunkból.

Ezenkívül létrehozunk egy egyedi processzort az információs struktúra átalakításához, valamint egy egyedi mosogatót, amely az adatainkat egy MongoDB-be tölti be.

5.1. Kivonat - Relációs adatbázis előkészítése a kibontáshoz

Hozzunk létre egy adatbázist a nevével crm és egy táblázatot a nevével vevő:

CREATE DATABASE crm;
CREATE TABLE ügyfél (id bigint NULL, importált logikai DEFAULT hamis, változó ügyfél_neve (50), ELSŐKULCS (id))

Ne feledje, hogy zászlót használunk importált, amely tárolja, melyik rekordot már importálták. Ezeket az információkat szükség esetén egy másik táblázatban is tárolhatjuk.

Most illesszünk be néhány adatot:

INSERT INTO ügyfél (azonosító, ügyfél_neve, importált) ÉRTÉKEK (1, 'John Doe', hamis);

5.2. Átalakítás - leképezés JDBC Mezők a MongoDB Mezők felépítése

Az átalakítási lépéshez elvégezzük a mező egyszerű fordítását Ügyfél neve a forrás táblából egy új mezőbe név. Itt más átalakítások is elvégezhetők, de rövidítsük a példát.

Ehhez létrehozunk egy új projektet a névvel ügyfél-átalakítás. Ennek legegyszerűbb módja a Spring Initializr webhely használata a projekt létrehozásához. A weboldal elérése után válasszon egy csoportot és egy műnevet. Majd használjuk com.ügyfél és ügyfél-átalakítás, illetőleg.

Ha ez megtörtént, kattintson a „Projekt létrehozása” gombra a projekt letöltéséhez. Ezután csomagolja ki a projektet, és importálja a kedvenc IDE-be, és adja hozzá a következő függőséget a pom.xml:

 org.springframework.cloud tavasz-felhő-patak-kötőanyag-nyúl 

Most elkezdjük kódolni a mezőnév átalakítását. Ehhez létrehozzuk a Vevő osztályban adapterként működni. Ez az osztály megkapja a Ügyfél neve a setName () módszerrel, és értékét a getName módszer.

A @JsonProperty a kommentárok elvégzik az átalakítást, miközben a JSON-ról Java-ra deserializálnak:

public class Ügyfél {private Long id; privát karakterlánc neve; @JsonProperty ("ügyfél_neve") public void setName (karakterlánc neve) {this.name = név; } @JsonProperty ("név") public String getName () {return name; } // Getters and Setters}

A processzornak adatokat kell fogadnia egy bemenetről, meg kell tennie az átalakítást és az eredményt egy kimeneti csatornához kell kötnie. Hozzunk létre egy osztályt erre:

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Processor; import org.springframework.integration.annotation.Transformer; @EnableBinding (Processor.class) public class CustomerProcessorConfiguration {@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Ügyfél convertToPojo (Customer payload) {return payload; }}

A fenti kódban megfigyelhetjük, hogy az átalakulás automatikusan bekövetkezik. A bemenet akkor fogadja az adatokat, amikor JSON és Jackson deserializálja a-ba Vevő objektum a készlet mód.

Az ellenkezője a kimenetnek, az adatok a JSON használatával sorosodnak kap mód.

5.3. Betöltés - Mosogató MongoDB-ben

A transzformációs lépéshez hasonlóan létrehozunk egy másik maven projektet, most a névvel vevő-mongodb-mosogató. Ismét nyissa meg a tavaszi Initializr alkalmazást, a Csoport választásával com.ügyfél, és az Artefact számára válassza ügyfél-mongodb-mosogató. Ezután írja be MongoDB a függőségek keresőmezőbe, és töltse le a projektet.

Ezután csomagolja ki és importálja kedvenc IDE-jébe.

Ezután adja hozzá ugyanazt az extra függőséget, mint a ügyfél-átalakítás projekt.

Most létrehozunk egy másikat Vevő osztály, a bemenet fogadásához ebben a lépésben:

import org.springframework.data.mongodb.core.mapping.Document; @Document (collection = "customer") public class Ügyfél {private Long id; privát karakterlánc neve; // Getters and Setters}

A süllyesztésért Vevő, létrehozunk egy Figyelő osztályt, amely menteni fogja az ügyfél entitást a CustomerRepository:

@EnableBinding (Sink.class) public class CustomerListener {@Autowired private CustomerRepository repository; @StreamListener (Sink.INPUT) public void save (Ügyfél ügyfél) {repository.save (ügyfél); }}

És a CustomerRepository, ebben az esetben a MongoRepository a tavaszi adatokból:

import org.springframework.data.mongodb.repository.MongoRepository; import org.springframework.stereotype.Repository; @Repository nyilvános felület Az CustomerRepository kiterjeszti a MongoRepository-t {} 

5.4. Adatfolyam meghatározása

Most, mindkét egyedi alkalmazás készen áll az SCDF Server regisztrálására. Ennek megvalósításához fordítsa le mindkét projektet a Maven paranccsal mvn install.

Ezután regisztráljuk őket a Spring Cloud Data Flow Shell segítségével:

alkalmazás-regisztráció - név ügyfél-átalakítás - típusú processzor --uri maven: //com.customer: ügyfél-transzformáció: 0.0.1-SNAPSHOT
alkalmazás-regisztráció --name customer-mongodb-sink - type sink --uri maven: //com.customer: customer-mongodb-sink: jar: 0.0.1-SNAPSHOT

Végül ellenőrizzük, hogy az alkalmazásokat az SCDF tárolja-e, futtassuk az alkalmazáslista parancsot a shellben:

alkalmazáslista

Ennek eredményeként mindkét eredményt látnunk kell a kapott táblázatban.

5.4.1. Stream csővezeték-tartományspecifikus nyelv - DSL

A DSL meghatározza az alkalmazások közötti konfigurációt és adatáramlást. Az SCDF DSL egyszerű. Az első szóban meghatározzuk az alkalmazás nevét, majd a konfigurációkat.

A szintaxis egy Unix által ihletett Pipeline szintaxis, amely függőleges sávokat, más néven „csöveket” használ több alkalmazás összekapcsolására:

http --port = 8181 | napló

Ez létrehoz egy HTTP alkalmazást, amelyet a 8181-es port szolgál ki, amely a fogadott test hasznos terhelését naplóba küldi.

Most nézzük meg, hogyan lehet létrehozni a JDBC forrás DSL adatfolyam-definícióját.

5.4.2. JDBC forrásfolyam meghatározása

A JDBC forrás legfontosabb konfigurációi a következők: lekérdezés és frissítés.lekérdezés közben olvasatlan rekordokat választ ki frissítés megváltoztat egy zászlót, hogy megakadályozza az aktuális rekordok újbóli olvasását.

Ezenkívül meghatározzuk a JDBC forrást, hogy rögzített 30 másodperces késleltetéssel és legfeljebb 1000 sor lekérdezésével végezzen lekérdezést. Végül meghatározzuk a kapcsolat konfigurációit, például az illesztőprogramot, a felhasználónevet, a jelszót és a kapcsolat URL-jét:

jdbc --query = 'SELECT id, customer_name FROM public.customer WHERE import = false' --update = 'UPDATE public.customer SET importált = true WHERE id itt: (id)' --max-rows-per-poll = 1000 --fixed-delay = 30 --time-unit = SECONDS --driver-class-name = org.postgresql. Driver --url = jdbc: postgresql: // localhost: 5432 / crm --username = postgres - jelszó = postgres

További JDBC forráskonfigurációs tulajdonságok itt találhatók.

5.4.3. Ügyfél MongoDB Sink Stream Definition

Mivel nem definiáltuk a kapcsolati konfigurációkat a alkalmazás.tulajdonságok nak,-nek ügyfél-mongodb-mosogató, a DSL paramétereken keresztül konfigurálunk.

Alkalmazásunk teljes mértékben a MongoDataAutoConfiguration. A többi lehetséges konfigurációt itt tekintheti meg. Alapvetően meghatározzuk a tavasz.adatok.mongodb.uri:

customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main

5.4.4. A Stream létrehozása és telepítése

Először is, a végső adatdefiníció létrehozásához térjen vissza a Shellhez, és hajtsa végre a következő parancsot (sortörés nélkül az imént beillesztették az olvashatóság érdekében):

adatfolyam létrehozása --name jdbc-to-mongodb --definition "jdbc --query = 'SELECT id, customer_name FROM public.customer WHERE import = false' - fix-delay = 30 --max-rows-per-poll = 1000 --update = 'UPDATE customer SET import = true WHERE id in (: id)' --time-unit = SECONDS - jelszó = postgres --driver-class-name = org.postgresql.Driver - felhasználónév = postgres --url = jdbc: postgresql: // localhost: 5432 / crm | customer-transform | customer-mongodb-sink --spring.data.mongodb.uri = mongodb: // localhost / main " 

Ez a DSL adatfolyam meghatározza a jdbc nevű adatfolyamot-nak nek-mongodb. Következő, a streamet a neve szerint telepítjük:

stream telepítés --name jdbc-to-mongodb 

Végül meg kell látnunk az összes elérhető napló helyét a napló kimenetében:

A naplók a következőben lesznek: {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.customer-mongodb-sink A naplók a következőben lesznek: {PATH_TO_LOG} / spring-cloud-deployer / jdbc-to-mongod A /jdbc-to-mongodb.customer-transform naplók itt lesznek: {PATH_TO_LOG} /spring-cloud-deployer/jdbc-to-mongodb/jdbc-to-mongodb.jdbc

6. Következtetés

Ebben a cikkben egy teljes példát láthattunk egy ETL adatvezetékre a Spring Cloud Data Flow segítségével.

A legemlékezetesebb, hogy láttuk az alkalmazásindító konfigurációit, létrehoztunk egy ETL-adatfolyamot a Spring Cloud Data Flow Shell segítségével, és egyedi alkalmazásokat valósítottunk meg az adatok olvasására, átalakítására és írására.

Mint mindig, a példakód megtalálható a GitHub projektben.