Bevezetés az Apache Flink Java-val

1. Áttekintés

Az Apache Flink egy nagy adatfeldolgozó keretrendszer, amely lehetővé teszi a programozók számára, hogy a hatalmas mennyiségű adatot nagyon hatékonyan és skálázható módon dolgozzák fel.

Ebben a cikkben bemutatjuk a az API - n belüli alapvető koncepciók és szabványos adatátalakítások Apache Flink Java API. Ennek az API-nak a kifinomult stílusa megkönnyíti a Flink központi konstrukciójával - az elosztott kollekcióval - való munkát.

Először megnézzük Flinkét DataSet API-átalakításokat, és felhasználja őket szószámláló program megvalósítására. Ezután rövid pillantást vetünk Flink-re Adatfolyam API, amely lehetővé teszi az eseményfolyamok valós idejű feldolgozását.

2. Maven-függőség

A kezdéshez hozzá kell adnunk a Maven-függőségeket flink-java és flink-test-utils könyvtárak:

 org.apache.flink flink-java 1.2.0 org.apache.flink flink-test-utils_2.10 1.2.0 test 

3. Core API koncepciók

A Flink-kel való együttműködés során néhány dolgot tudnunk kell az API-jához kapcsolódóan:

  • Minden Flink program átalakításokat hajt végre az elosztott adatgyűjteményeken. Az adatok átalakításához különféle funkciók állnak rendelkezésre, beleértve a szűrést, leképezést, egyesítést, csoportosítást és összesítést
  • A mosogató a Flink művelet kiváltja egy adatfolyam végrehajtását a program kívánt eredményének elérése érdekében, például elmentheti az eredményt a fájlrendszerbe, vagy kinyomtathatja a standard kimenetre
  • A flink transzformációk lusták, ami azt jelenti, hogy csak a mosogató művelet hívódik meg
  • Az Apache Flink API kétféle működési módot támogat - kötegelt és valós idejű. Ha korlátozott adatforrással van dolga, amelyet kötegelt módban lehet feldolgozni, akkor a DataSet API. Ha korlátlan adatfolyamokat szeretne valós időben feldolgozni, akkor a Adatfolyam API

4. DataSet API transzformációk

A Flink program belépési pontja a VégrehajtásKörnyezet osztály - ez határozza meg azt a kontextust, amelyben a program futtatásra kerül.

Hozzunk létre egy VégrehajtásKörnyezet a feldolgozás megkezdéséhez:

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment ();

Vegye figyelembe, hogy amikor az alkalmazást elindítja a helyi gépen, akkor az a helyi JVM-en fogja végrehajtani a feldolgozást. Ha el akarja kezdeni a feldolgozást egy gépcsoporton, telepítenie kell az Apache Flink alkalmazást ezekre a gépekre, és konfigurálnia kell a VégrehajtásKörnyezet Eszerint.

4.1. Adathalmaz létrehozása

Az adatátalakítások megkezdéséhez el kell látnunk a programunkat az adatokkal.

Hozzunk létre egy példányt a DataSet osztály használja a mi ExecutionEnvironment:

Adatkészlet összegei = környezetek elemei (1, 29, 40, 50);

Létrehozhat egy DataSet több forrásból, például Apache Kafka, CSV, fájl vagy gyakorlatilag bármely más adatforrás.

4.2. Szűrés és csökkentés

Miután létrehozta a DataSet osztályban transzformációkat alkalmazhatsz rá.

Tegyük fel, hogy egy bizonyos küszöbérték feletti számokat szeretne szűrni, és ezután összesíti azokat. Használhatja a szűrő() és csökkenteni () átalakítások ennek elérése érdekében:

int küszöb = 30; Lista gyűjt = összegek .szűrő (a -> a> küszöb) .csökkent ((egész, t1) -> egész + t1) .collect (); assertThat (collect.get (0)). isEqualTo (90); 

Vegye figyelembe, hogy a gyűjt() módszer a mosogató a tényleges adatátalakításokat kiváltó művelet.

4.3. Térkép

Tegyük fel, hogy van egy DataSet nak,-nek Személy tárgyak:

privát statikus osztály Személy {privát int kor; privát karakterlánc neve; // szabványos kivitelezők / szerelők / beállítók}

Ezután hozzunk létre egy DataSet ezen objektumok közül:

DataSet personDataSource = env.fromCollection (Arrays.asList (új személy (23, "Tom"), új személy (75, "Michael")));

Tegyük fel, hogy csak a kor mezőt a gyűjtemény minden tárgyából. Használhatja a térkép() átalakítás, hogy csak a Személy osztály:

Az életkorok listája = personDataSource .map (p -> p.age) .collect (); assertThat (korok) .hasSize (2); assertThat (korosztály). tartalmaz (23, 75);

4.4. Csatlakozik

Ha két adatkészlete van, érdemes egyesíteni őket id terület. Ehhez használhatja a csatlakozik() átalakítás.

Hozzunk létre egy felhasználó tranzakcióit és címeit:

Tuple3 cím = új Tuple3 (1, "5th Avenue", "London"); DataSet címek = env.fromElements (cím); Tuple2 firstTransaction = új Tuple2 (1, "Tranzakció_1"); DataSet tranzakciók = env.fromElements (elsőTranzakció, új Tuple2 (12, "Tranzakció_2")); 

Az első mező mindkét sorban egy Egész szám típusú, és ez egy id mező, amelyen mindkét adathalmazhoz csatlakozni akarunk.

A tényleges csatlakozási logika végrehajtásához végre kell hajtanunk a KeySelector interfész a címhez és a tranzakcióhoz:

privát statikus osztály, az IdKeySelectorTransaction valósítja meg a KeySelector-ot {@Orride public Integer getKey (Tuple2 value) {return value.f0; }} private static class Az IdKeySelectorAddress megvalósítja a KeySelector-ot {@Orride public Integer getKey (Tuple3 value) {return value.f0; }}

Minden választó csak azt a mezőt adja vissza, amelyen az összekapcsolást végre kell hajtani.

Sajnos itt nem lehet lambda kifejezéseket használni, mert a Flinknek általános típusú információkra van szüksége.

Ezután hajtsuk végre az egyesítő logikát az említett szelektorok segítségével:

Lista<>> join = tranzakciók.join (címek) .where (új IdKeySelectorTransaction ()) .equalTo (új IdKeySelectorAddress ()) .collect (); assertThat (csatlakozott) .hasSize (1); assertThat (csatlakozott). tartalmaz (új Tuple2 (elsőTranzakció, cím)); 

4.5. Fajta

Tegyük fel, hogy a következő gyűjteményed van Tuple2:

Tuple2 secondPerson = új Tuple2 (4, "Tom"); Tuple2 thirdPerson = új Tuple2 (5, "Scott"); Tuple2 negyedik személy = új Tuple2 (200, "Michael"); Tuple2 firstPerson = új Tuple2 (1, "Jack"); DataSet tranzakciók = env.fromElements (negyedik személy, második személy, harmadik személy, első személy); 

Ha a gyűjtemény első mezője szerint szeretné rendezni ezt a gyűjteményt, használhatja a sortPartitions () átalakítás:

Lista rendezve = tranzakciók .sortPartition (új IdKeySelectorTransaction (), Order.ASCENDING) .collect (); assertThat (rendezve) .containsExactly (firstPerson, secondPerson, thirdPerson, negyedikPerson);

5. Szószám

A szószámolási probléma az, amelyet általában a Big Data feldolgozó keretrendszer képességeinek bemutatására használnak. Az alapvető megoldás magában foglalja a szavak előfordulásának számlálását egy szövegbevitelben. Használjuk a Flink megoldást a probléma megoldására.

Megoldásunk első lépéseként létrehozzuk a LineSplitter osztály, amely a bemenetünket tokenekre (szavakra) osztja, minden egyes token számára gyűjtve a Tuple2 kulcsérték-párokból. Ezen kulcsok mindegyikében a kulcs a szövegben található szó, és az érték az egész (1).

Ez az osztály hajtja végre a FlatMapFunction kezelőfelület Húr bemenetként és előállítja a Tuple2:

nyilvános osztályú LineSplitter valósítja meg a FlatMapFunction funkciót {@Orride public void flatMap (Karakterlánc értéke, Gyűjtő out) {Stream.of (value.toLowerCase (). split ("\ W +")) .filter (t -> t.length ()> 0) .forEach (token -> out.collect (új Tuple2 (token) (1))); }}

Hívjuk a gyűjt() módszer a Gyűjtő osztály az adatok továbbadásához a feldolgozási folyamatban.

A következő és egyben utolsó lépésünk az, hogy az egyes elemeket első elemek (szavak) szerint csoportosítsuk, majd elvégezzük a összeg összesítse a második elemeket a szó előfordulások számának előállításához:

nyilvános statikus DataSet startWordCount (ExecutionEnvironment env, Listlist) dobja a {DataSet text = env.fromCollection (vonalak) kivételt; return text.flatMap (új LineSplitter ()) .groupBy (0) .aggregate (Aggregations.SUM, 1); }

A Flink transzformációk három típusát használjuk: flatMap (), csoportosít(), és összesítés ().

Írjunk egy tesztet annak igazolására, hogy a szószám-megvalósítás a várt módon működik:

Sorok felsorolása = Arrays.asList ("Ez egy első mondat", "Ez egy második mondat egy szóval"); DataSet eredmény = WordCount.startWordCount (env, sorok); Lista gyűjt = eredmény.collect (); assertThat (gyűjt) .containExactlyInAnyOrder (új Tuple2 ("a", 3), új Tuple2 ("mondat", 2), új Tuple2 ("szó", 1), új Tuple2 ("is", 2), új Tuple2 ( "ez", 2), új Tuple2 ("második", 1), új Tuple2 ("első", 1), új Tuple2 ("együtt", 1), új Tuple2 ("egy", 1));

6. DataStream API

6.1. DataStream létrehozása

Az Apache Flink az adatfolyamok feldolgozását is támogatja a DataStream API-n keresztül. Ha el akarjuk kezdeni az események fogyasztását, akkor először a StreamExecutionEnvironment osztály:

StreamExecutionEnvironment végrehajtásEnvironment = StreamExecutionEnvironment.getExecutionEnvironment ();

Ezután létrehozhatunk egy eseményfolyamot a végrehajtásKörnyezet különféle forrásokból. Lehet valami üzenetbusz, mint Apache Kafka, de ebben a példában egyszerűen létrehozunk egy forrást pár string elemből:

DataStream dataStream = végrehajtásEnvironment.fromElements ("Ez egy első mondat", "Ez egy második szó, egy szóval");

Alkalmazhatunk transzformációkat a Adatfolyam mint a normálisban DataSet osztály:

SingleOutputStreamOperator upperCase = text.map (String :: toUpperCase);

A végrehajtás elindításához meg kell hívnunk egy mosogató műveletet, mint pl nyomtatás() amely csak az átalakítások eredményét nyomtatja ki a standard kimenetre, a végrehajtani () módszer a StreamExecutionEnvironment osztály:

upperCase.print (); env.execute ();

A következő kimenetet fogja produkálni:

1> Ez az első mondat 2> Ez a második mondat egyetlen szóval

6.2. Az események felszámolása

Az események valós idejű feldolgozásakor néha előfordulhat, hogy össze kell csoportosítania az eseményeket, és bizonyos számításokat kell végrehajtania az események ablakán.

Tegyük fel, hogy van egy eseményfolyamunk, ahol minden esemény egy pár, amely az esemény számából és az időbélyegből áll, amikor az eseményt elküldtük a rendszerünknek, és hogy elviselhetjük a nem rendezett eseményeket, de csak akkor, ha nem több mint húsz másodperc késéssel.

Ehhez a példához először hozzunk létre egy folyamot, amely két percre van egymástól, és meghatározzunk egy időbélyeg-kivonót, amely meghatározza a késési küszöbünket:

SingleOutputStreamOperator windowed = env.fromElements (új Tuple2 (16, ZonedDateTime.now (). plusMinutes (25) .toInstant (). getEpochSecond ()), új Tuple2 (15, ZonedDateTime.now (). plusMinutes (2) .toInstant () .getEpochSecond ())) .assignTimestampsAndWatermarks (új BoundedOutOfOrdernessTimestampExtractor (Time.seconds (20)) {@Orride public long extractTimestamp (Tuple2 element) {return element.f1 * 1000; }});

Ezután definiáljunk egy ablakműveletet az eseményeink öt másodperces ablakokba csoportosítására, és alkalmazzunk transzformációt ezekre az eseményekre:

SingleOutputStreamOperator csökkentett = ablakos .windowAll (TumblingEventTimeWindows.of (Time.seconds (5))) .maxBy (0, true); csökkentett nyomat ();

Minden öt másodperces ablak utolsó elemét megkapja, így kiírja:

1> (15,1491221519)

Vegye figyelembe, hogy a második eseményt nem látjuk, mert később érkezett, mint a megadott késési küszöb.

7. Következtetés

Ebben a cikkben bemutattuk az Apache Flink keretrendszert, és megvizsgáltuk az API-jához mellékelt néhány átalakítást.

Szószámláló programot valósítottunk meg a Flink folyékony és funkcionális DataSet API segítségével. Ezután megnéztük a DataStream API-t, és egyszerű valós idejű átalakítást valósítottunk meg az események folyamán.

Ezeknek a példáknak és kódrészleteknek a megvalósítása megtalálható a GitHub-on - ez egy Maven-projekt, ezért könnyen importálhatónak és futtathatónak kell lennie.