Az adatfolyam-feldolgozás kezdete a Spring Cloud Data Flow segítségével

1. Bemutatkozás

Tavaszi felhő adatfolyam egy felhő-natív programozási és működési modell a komponálható adatmikroszolgáltatásokhoz.

Val vel Tavaszi felhő adatfolyam, a fejlesztők létrehozhatnak és összehangolhatnak adatcsatornákat olyan általános felhasználási esetekhez, mint például az adatok bevitele, a valós idejű elemzések és az adatok importálása / exportálása.

Ez az adatvezeték kétféle változatban érkezik, streaming és kötegelt adatcsatornák.

Az első esetben korlátlan mennyiségű adat kerül elfogyasztásra vagy előállításra az üzenetküldő köztes szoftver segítségével. Míg a második esetben a rövid életű feladat véges adatsort dolgoz fel, majd leáll.

Ez a cikk a streaming feldolgozására összpontosít.

2. Építészeti áttekintés

Az ilyen típusú architektúra kulcsfontosságú elemei Alkalmazások, a Data Flow Server, és a cél futási idő.

Ezen kulcsfontosságú elemek mellett általában van még egy Data Flow Shell és a üzenetközvetítő az építészeten belül.

Nézzük meg mindezeket az összetevőket részletesebben.

2.1. Alkalmazások

A streaming adatcsatorna általában magában foglalja a külső rendszerekből származó események fogyasztását, az adatfeldolgozást és a poliglot perzisztenciát. Ezeket a fázisokat általában úgy nevezik Forrás, Processzor, és Mosogató ban ben Tavaszi felhő terminológia:

  • Forrás: az eseményeket fogyasztó alkalmazás
  • Processzor: adatokat fogyaszt a Forrás, végez némi feldolgozást rajta, és a feldolgozott adatokat kiadja a folyamatban lévő következő alkalmazásnak
  • Mosogató: akár egyből fogyaszt Forrás vagy Processzor és az adatokat a kívánt perzisztencia rétegbe írja

Ezeket az alkalmazásokat kétféleképpen lehet csomagolni:

  • Spring Boot uber-jar, amelyet maven tárban, fájlban, http-ben vagy bármilyen más tavaszi erőforrás-megvalósításban tárolnak (ezt a módszert ebben a cikkben fogjuk használni)
  • Dokkmunkás

Számos forrás, processzor és mosogató alkalmazás általános használatra (pl. Jdbc, hdfs, http, router) már rendelkezésre áll és használatra kész Tavaszi felhő adatfolyam csapat.

2.2. Futásidő

Futási időre van szükség ezen alkalmazások futtatásához. A támogatott futási idők a következők:

  • Felhő öntöde
  • Apache FONAL
  • Kubernetes
  • Apache Mesos
  • Helyi kiszolgáló fejlesztéshez (amelyet ebben a cikkben használunk)

2.3. Data Flow Server

Az a komponens, amely felelős az alkalmazások futásidejű telepítéséért, a Data Flow Server. Van egy Data Flow Server futtatható jar minden cél futáshoz.

A Data Flow Server a tolmácsolásért felelős:

  • DSL adatfolyam, amely leírja az adatok logikai áramlását több alkalmazáson keresztül.
  • Telepítési jegyzék, amely leírja az alkalmazások futásidejű hozzárendelését.

2.4. Data Flow Shell

A Data Flow Shell kliens a Data Flow Server számára. A shell lehetővé teszi számunkra a kiszolgálóval való együttműködéshez szükséges DSL parancs végrehajtását.

Például a DSL-t, amely leírja az adatfolyamot a http forrásból a jdbc mosogatóba, „http | jdbc ”. A DSL-ben ezeket a neveket a Data Flow Server és feltérképezheti a Maven vagy a Docker adattárakban tárolható alkalmazás-leletekre.

A Spring egy grafikus felületet is kínál, név szerint Flo, adatfolyam-adatcsatornák létrehozására és felügyeletére. Használata azonban kívül esik a cikk tárgyalásán.

2.5. Message Broker

Amint az előző szakasz példáján láthattuk, a cső szimbólumot használtuk az adatfolyam meghatározásához. A csőszimbólum a két alkalmazás közötti kommunikációt jelenti az üzenetküldő köztes programokon keresztül.

Ez azt jelenti, hogy szükségünk van egy üzenetközvetítőre, amely a célkörnyezetben működik és fut.

A két támogatott üzenetküldő köztes szoftverközvetítő:

  • Apache Kafka
  • RabbitMQ

És most, miután áttekintettük az építészeti elemeket - itt az ideje, hogy megépítsük az első adatfolyam-feldolgozó csővezetékünket.

3. Telepítsen egy üzenetközvetítőt

Mint láttuk, a folyamatban lévő alkalmazásoknak kommunikációhoz üzenetküldő köztes szoftverre van szükségük. E cikk alkalmazásában folytatjuk RabbitMQ.

A telepítés teljes részleteiért kövesse az utasításokat a hivatalos oldalon.

4. A Helyi adatfolyam szerver

Alkalmazásaink előállításának felgyorsítása érdekében a Spring Initializr programot fogjuk használni; segítségével megszerezhetjük Tavaszi csizma alkalmazások néhány perc alatt.

A weboldal elérése után egyszerűen válassza a Csoport és egy Műalkotás név.

Ha ez megtörtént, kattintson a gombra Projekt létrehozása hogy elindítsa a Maven-műtárgy letöltését.

A letöltés befejezése után csomagolja ki a projektet, és importálja azt Maven-projektként a választott IDE-be.

Adjunk hozzá egy Maven-függőséget a projekthez. Ahogy szükségünk lesz rá Dataflow helyi kiszolgáló könyvtárak, adjuk hozzá a spring-cloud-starter-dataflow-server-local függőséget:

 org.springframework.cloud spring-cloud-starter-dataflow-server-local 

Most meg kell jegyeznünk a Tavaszi csizma főosztály azzal @EnableDataFlowServer kommentár:

@EnableDataFlowServer @SpringBootApplication public class SpringDataFlowServerApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowServerApplication.class, args); }} 

Ez minden. A mi Helyi adatfolyam-kiszolgáló készen áll a végrehajtásra:

mvn spring-boot: fut

Az alkalmazás a 9393-as porton indul.

5. Az adatfolyam-héj

Ismét lépjen a Tavaszi Initializr oldalra, és válassza a Csoport és Műalkotás név.

Miután letöltöttük és importáltuk a projektet, adjunk hozzá egy spring-cloud-dataflow-shell függőséget:

 org.springframework.cloud spring-cloud-dataflow-shell 

Most hozzá kell adnunk a @EnableDataFlowShell jegyzet a Tavaszi csizma főosztály:

@EnableDataFlowShell @SpringBootApplication public class SpringDataFlowShellApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowShellApplication.class, args); }} 

Most futtathatjuk a héjat:

mvn spring-boot: fut

Miután a héj fut, beírhatjuk a Segítség parancsot a parancssorban, hogy megtekinthessük a végrehajtandó parancsok teljes listáját.

6. A Forrás alkalmazás

Hasonlóképpen, az Initializr-en most létrehozunk egy egyszerű alkalmazást és hozzáadunk egy Patak Nyúl függőség, amelyet tavaszi felhő-kezdő-patak-nyúlnak hívnak:

 org.springframework.cloud tavasz-felhő-indító-patak-nyúl 

Ezután hozzáadjuk a @EnableBinding (Source.class) jegyzet a Tavaszi csizma főosztály:

@EnableBinding (Source.class) @SpringBootApplication public class SpringDataFlowTimeSourceApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeSourceApplication.class, args); }}

Most meg kell határoznunk a feldolgozandó adatok forrását. Ez a forrás bármilyen potenciálisan végtelen munkaterhelés lehet (a tárgyak internetének érzékelői adatai, a hét minden napján, a hét minden napján, az események feldolgozása, az online tranzakciós adatok bevitele).

Mintaprogramunkban 10 másodpercenként egy eseményt (az egyszerűség kedvéért egy új időbélyeget) állítunk elő a-val Poller.

A @InboundChannelAdapter az annotáció üzenetet küld a forrás kimeneti csatornájára, a visszatérési értéket használva az üzenet hasznosságaként:

@Bean @InboundChannelAdapter (érték = Source.OUTPUT, poller = @Poller (fixedDelay = "10000", maxMessagesPerPoll = "1")) public MessageSource timeMessageSource () {return () -> MessageBuilder.withPayload (új dátum (). GetTime ()).épít(); } 

Adatforrásunk készen áll.

7. A processzor alkalmazása

Ezután létrehozunk egy alkalmazást, és hozzáadunk egy Patak Nyúl függőség.

Ezután hozzáadjuk a @EnableBinding (Processor.class) jegyzet a Tavaszi csizma főosztály:

@EnableBinding (Processor.class) @SpringBootApplication public class SpringDataFlowTimeProcessorApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowTimeProcessorApplication.class, args); }}

Ezután meg kell határoznunk egy módszert a forrásalkalmazásból származó adatok feldolgozására.

A transzformátor definiálásához meg kell jegyeznünk ezt a módszert @Transzformátor kommentár:

@Transformer (inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform (Long timestamp) {DateFormat dateFormat = new SimpleDateFormat ("yyyy / MM / dd hh: mm: yy"); Karakterlánc dátum = dateFormat.format (időbélyeg); visszatérítési dátum; }

Átalakítja az időbélyeget a „input” csatornáról egy formázott dátumra, amelyet elküld a „output” csatornára.

8. A mosogató alkalmazás

Az utolsó létrehozandó alkalmazás a Sink alkalmazás.

Ismét lépjen a Tavaszi Initializr oldalra, és válassza a Csoport, an Műalkotás név. A projekt letöltése után adjunk hozzá egy Patak Nyúl függőség.

Ezután adja hozzá a @EnableBinding (Sink.osztály) jegyzet a Tavaszi csizma főosztály:

@EnableBinding (Sink.class) @SpringBootApplication public class SpringDataFlowLoggingSinkApplication {public static void main (String [] args) {SpringApplication.run (SpringDataFlowLoggingSinkApplication.class, args); }}

Most szükségünk van egy módszerre a processzor alkalmazásból érkező üzenetek elfogására.

Ehhez hozzá kell adnunk a @StreamListener (Sink.INPUT) módszerünk kommentárja:

@StreamListener (Sink.INPUT) public void loggerSink (String date) {logger.info ("Fogadott:" + dátum); }

A módszer egyszerűen kinyomtatja a formázott dátumban átalakított időbélyeget egy naplófájlba.

9. Regisztráljon egy Stream alkalmazást

A Spring Cloud Data Flow Shell lehetővé teszi számunkra, hogy regisztráljunk egy Stream alkalmazást az App Registry alkalmazással a alkalmazás regisztráció parancs.

Meg kell adnunk egy egyedi nevet, alkalmazástípust és egy URI-t, amely feloldható az alkalmazás műtermékének. A típushoz adja meg a „forrás“, “processzorVagymosogató“.

Amikor a Maven sémával URI-t ad meg, a formátumnak meg kell felelnie a következőknek:

maven: //: [: [:]]:

A Forrás, Processzor és Mosogató korábban létrehozott alkalmazások, menjen a Tavaszi felhő adatfolyam héj és adja ki a következő parancsokat a parancssorból:

alkalmazás-regisztráció - név időforrás - típus forrás --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-source: jar: 0.0.1-SNAPSHOT alkalmazásregisztráció --name time -processzor --type processzor --uri maven: //com.baeldung.spring.cloud: spring-data-flow-time-processzor: jar: 0.0.1-SNAPSHOT alkalmazásregiszter - névnaplózó-mosogató --típusú mosogató --uri maven: //com.baeldung.spring.cloud: spring-data-flow-logging-sink: jar: 0.0.1-SNAPSHOT 

10. Hozza létre és telepítse az adatfolyamot

Új adatfolyam-definíció létrehozásához lépjen a Tavaszi felhő adatfolyam héj és hajtsa végre a következő shell parancsot:

stream létrehozása - név naplózása - definiálás 'időforrás | időfeldolgozó | fakitermelés

Ez meghatározza a nevű folyamot naplózási idő a DSL kifejezés alapján ’Időforrás | időfeldolgozó | fakitermelés.

Ezután a stream telepítéséhez hajtsa végre a következő shell parancsot:

adatfolyam telepítése --name time-to-log

A Data Flow Server megoldja időforrás, időfeldolgozó, és fakitermelés-süllyesztés koordinátákat ásni, és azokat használja a időforrás, időfeldolgozó és fakitermelés-süllyesztés a patak alkalmazásai.

Ha az adatfolyam megfelelően van telepítve, akkor a Data Flow Server naplózza a modulok elindítását és összekapcsolását:

2016-08-24 12: 29: 10.516 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: app time-to-log.logging-sink példány telepítése 0 A naplók a PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink 2016-08-24 12: 29: 17.600 INFO 8096 --- [io-9393-exec-10] oscd spi.local.LocalAppDeployer: az alkalmazás time-to-log.time-processzor példány telepítése 0 A naplók a PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034556862 / time-to-log.time-processzor lesznek 2016-08-24 12: 29: 23.280 INFO 8096 --- [io-9393-exec-10] oscdspi.local.LocalAppDeployer: az alkalmazás naplózási idő telepítése. Idő-forrás példány 0 naplók a PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034562861 / time-to-log.time-source

11. Az eredmény áttekintése

Ebben a példában a forrás egyszerűen másodpercenként üzenetként küldi az aktuális időbélyeget, a processzor formázza, és a naplófülke a naplózási keretrendszer segítségével adja ki a formázott időbélyeget.

A naplófájlok a. Könyvtárban található könyvtárban találhatók Data Flow ServerLog kimenetét, a fentiek szerint. Az eredmény megtekintéséhez faraghatjuk a naplót:

tail -f PATH_TO_LOG / spring-cloud-dataflow-1276836171391672089 / time-to-log-1472034549734 / time-to-log.logging-sink / stdout_0.log 2016-08-24 12: 40: 42.029 INFO 9488 --- [ r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Beérkezett: 2016/08/24 11:40:01 2016-08-24 12: 40: 52.035 INFO 9488 --- [r.time-to-log-1 ] scSpringDataFlowLoggingSinkApplication: Beérkezett: 2016/08/24 11:40:11 2016-08-24 12: 41: 02.030 INFO 9488 --- [r.time-to-log-1] scSpringDataFlowLoggingSinkApplication: Beérkezett: 2016/08 / 24 11:40:21

12. Következtetés

Ebben a cikkben azt láttuk, hogyan lehet adatfolyamot felépíteni a folyam feldolgozásához a Tavaszi felhő adatfolyam.

Továbbá láttuk a szerepét Forrás, Processzor és Mosogató alkalmazások a patak belsejében, valamint a modul csatlakoztatása és összekapcsolása a Data Flow Server használatával Data Flow Shell.

A példakód a GitHub projektben található.