Adatvezeték építése Flink és Kafka segítségével

1. Áttekintés

Az Apache Flink egy adatfolyam-feldolgozó keretrendszer, amely könnyen használható a Java-val. Az Apache Kafka egy elosztott adatfolyam-feldolgozó rendszer, amely nagy hibatűrést támogat.

Ebben az oktatóanyagban áttekintjük, hogyan lehet adatcsatornát építeni e két technológia felhasználásával.

2. Telepítés

Az Apache Kafka telepítéséhez és konfigurálásához olvassa el a hivatalos útmutatót. A telepítés után a következő parancsokkal hozhatjuk létre az új nevű témákat flink_input és flink_output:

 bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replication-factor 1 - partitions 1 \ --topic flink_output bin / kafka-topics.sh --create \ --zookeeper localhost: 2181 \ --replication-factor 1 - partitions 1 \ --top flink_input

Az oktatóanyag érdekében az Apache Kafka alapértelmezett konfigurációját és alapértelmezett portjait használjuk.

3. Pislogás használata

Az Apache Flink valós idejű adatfolyam-feldolgozási technológiát tesz lehetővé. A keretrendszer lehetővé teszi több harmadik féltől származó rendszer használatát adatforrásként vagy befogóként.

A Flink-ben különféle csatlakozók állnak rendelkezésre:

  • Apache Kafka (forrás / mosogató)
  • Apache Cassandra (mosogató)
  • Amazon Kinesis adatfolyamok (forrás / mosogató)
  • Elasticsearch (mosogató)
  • Hadoop FileSystem (mosogató)
  • RabbitMQ (forrás / mosogató)
  • Apache NiFi (forrás / mosogató)
  • Twitter Streaming API (forrás)

Flink hozzáadásához a projektünkhöz a következő Maven-függőségeket kell felvennünk:

 org.apache.flink flink-core 1.5.0 org.apache.flink flink-connector-kafka-0.11_2.11 1.5.0 

Ezen függőségek hozzáadása lehetővé teszi számunkra a Kafka-témák fogyasztását és előállítását. A Flink aktuális verzióját a Maven Central oldalon találja meg.

4. Kafka húrfogyasztó

A Kafka adatainak a Flink használatával meg kell adnunk egy témát és egy Kafka címet. Meg kell adnunk egy csoportazonosítót is, amelyet az eltolások megtartására használunk, hogy ne mindig a teljes adatokat olvassuk el a kezdetektől.

Hozzunk létre egy statikus módszert, amely megalkotja FlinkKafkaFogyasztó könnyebb:

public static FlinkKafkaConsumer011 createStringConsumerForTopic (String topic, String kafkaAddress, String kafkaGroup) {Properties props = new Properties (); props.setProperty ("bootstrap.servers", kafkaAddress); props.setProperty ("group.id", kafkaGroup); FlinkKafkaConsumer011 fogyasztó = new FlinkKafkaConsumer011 (téma, új SimpleStringSchema (), kellékek); visszatérő fogyasztó; }

Ez a módszer a topic, kafkaAddress, és kafkaGroup és létrehozza a FlinkKafkaFogyasztó amely a megadott témakör adatait fogja felhasználni a-ként Húr mivel használtuk SimpleStringSchema az adatok dekódolásához.

A szám 011 osztály nevében a Kafka verzióra utal.

5. Kafka vonósproducer

Az adatok előállításához a Kafka számára meg kell adnunk a használni kívánt Kafka címet és témát. Ismét létrehozhatunk egy statikus módszert, amely elősegíti a különböző témák producereinek létrehozását:

public static FlinkKafkaProducer011 createStringProducer (String topic, String kafkaAddress) {return new FlinkKafkaProducer011 (kafkaAddress, topic, new SimpleStringSchema ()); }

Ez a módszer csak addig tart téma és kafkaCím érvként, mivel nem szükséges megadni a csoportazonosítót, amikor Kafka témához dolgozunk.

6. String Stream feldolgozása

Ha teljesen működő fogyasztónk és termelőnk van, megpróbálhatjuk feldolgozni a Kafka adatait, majd elmenteni az eredményeket a Kafkába. Az adatfolyam-feldolgozáshoz használható funkciók teljes listája itt található.

Ebben a példában nagybetűket írunk minden Kafka-bejegyzésben, majd visszaírjuk Kafka-nak.

Erre a célra létre kell hoznunk egy szokást MapFunction:

public class WordsCapitalizer megvalósítja a MapFunction {@Orride public String map (String s) {return s.toUpperCase (); }}

A függvény létrehozása után felhasználhatjuk az adatfolyam feldolgozásában:

public static void nagybetűs () {String inputTopic = "flink_input"; String outputTopic = "flink_output"; Karakterlánc consumerGroup = "baeldung"; Karakterlánc címe = "localhost: 9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment .getExecutionEnvironment (); FlinkKafkaConsumer011 flinkKafkaConsumer = createStringConsumerForTopic (inputTopic, address, consumerGroup); DataStream stringInputStream = környezet .addSource (flinkKafkaConsumer); FlinkKafkaProducer011 flinkKafkaProducer = createStringProducer (outputTopic, cím); stringInputStream .map (új WordsCapitalizer ()) .addSink (flinkKafkaProducer); }

Az alkalmazás beolvassa az adatokat a flink_input témakörben hajtsa végre a műveleteket a folyamon, majd mentse az eredményeket a flink_output téma Kafkában.

Láttuk, hogyan kell kezelni a húrokat a Flink és a Kafka használatával. De gyakran szükség van az egyedi objektumok műveleteinek végrehajtására. A következő fejezetekben megtudjuk, hogyan kell ezt megtenni.

7. Egyedi objektum deserializáció

A következő osztály egy egyszerű üzenetet képvisel, amely tartalmazza a feladó és a címzett adatait:

@JsonSerialize public class InputMessage {String sender; String címzett; LocalDateTime sentAt; Karakterlánc üzenet; }

Korábban mi használtuk SimpleStringSchema hogy Kaerka üzeneteit deserializálja, de most az adatokat deszerializálni szeretnénk közvetlenül az egyedi objektumokhoz.

Ehhez szükségünk van egy szokásra Deserializáció Séma:

public class InputMessageDeserializationSchema implementálja a DeserializationSchema {static ObjectMapper objectMapper = new ObjectMapper () .registerModule (új JavaTimeModule ()); @Orride public InputMessage deserialize (byte [] bytes) IOException dobja {return objectMapper.readValue (byte, InputMessage.class); } @Orride public boolean isEndOfStream (InputMessage inputMessage) {return false; } @Orride public TypeInformation getProducedType () {return TypeInformation.of (InputMessage.class); }}

Itt feltételezzük, hogy az üzeneteket JSON néven tartják Kafkában.

Mivel van egy típusú mezőnk LocalDateTime, meg kell adnunk a JavaTimeModule, amely gondoskodik a feltérképezésről LocalDateTime kifogásolja a JSON-t.

A Flink sémák nem rendelkezhetnek olyan mezőkkel, amelyek nem sorosíthatók mert az összes operátor (mint például a sémák vagy a függvények) a munka kezdetekor sorosítva van.

Hasonló problémák vannak az Apache Sparkban is. A probléma egyik ismert javítása a mezők inicializálása statikus, ahogy mi tettük ObjectMapper felett. Ez nem a legszebb megoldás, de viszonylag egyszerű és elvégzi a munkát.

A módszer, a metódus isEndOfStream használható arra a speciális esetre, amikor az adatfolyamot csak addig kell feldolgozni, amíg bizonyos adatokat meg nem kap. De a mi esetünkben nincs rá szükség.

8. Egyedi objektumok sorosítása

Tegyük fel, hogy azt akarjuk, hogy a rendszerünknek lehetősége legyen az üzenetek biztonsági másolatának létrehozására. Azt akarjuk, hogy a folyamat automatikus legyen, és minden egyes biztonsági másolatnak egy egész nap alatt küldött üzenetekből kell állnia.

Ezenkívül egy biztonsági mentési üzenetnek egyedi azonosítót kell rendelnie.

Erre a célra létrehozhatjuk a következő osztályt:

public class Backup {@JsonProperty ("inputMessages") List inputMessages; @JsonProperty ("backupTimestamp") LocalDateTime backupTimestamp; @JsonProperty ("uuid") UUID uuid; public Backup (List inputMessages, LocalDateTime backupTimestamp) {this.inputMessages = inputMessages; this.backupTimestamp = backupTimestamp; this.uuid = UUID.randomUUID (); }}

Felhívjuk figyelmét, hogy az UUID generáló mechanizmus nem tökéletes, mivel lehetővé teszi a duplikátumokat. Ez azonban elegendő e példa terjedelméhez.

Meg akarjuk menteni a sajátunkat biztonsági mentés objektum JSON néven Kafka felé, ezért létre kell hoznunk a Serializáció Séma:

public class BackupSerializationSchema implementálja a SerializationSchema {ObjectMapper objectMapper; Naplózó naplózó = LoggerFactory.getLogger (BackupSerializationSchema.class); @Orride public byte [] serialize (Backup backupMessage) {if (objectMapper == null) {objectMapper = new ObjectMapper () .registerModule (new JavaTimeModule ()); } próbálkozzon {return objectMapper.writeValueAsString (backupMessage) .getBytes (); } catch (com.fasterxml.jackson.core.JsonProcessingException e) {logger.error ("A JSON elemzése sikertelen", e); } return new byte [0]; }}

9. Időbélyegző üzenetek

Mivel minden nap minden üzenetéhez biztonsági másolatot akarunk létrehozni, az üzenetekhez időbélyegzőre van szükség.

A Flink három különböző időjellemzőt nyújt EventTime, ProcessingTime, és IngestionTime.

Esetünkben azt az időpontot kell használnunk, amikor az üzenetet elküldtük, ezért használni fogjuk EventTime.

Használni EventTimeszükségünk van egy TimestampAssigner amely az időbélyegeket bontja ki a bemeneti adatokból:

public class InputMessageTimestampAssigner implementálja az AssignerWithPunctuatedWatermarks {@Orride public long extractTimestamp (InputMessage elem, régebbi előzőElementTimestamp) {ZoneId zoneId = ZoneId.systemDefault (); return elem.getSentAt (). atZone (zoneId) .toEpochSecond () * 1000; } @ Nullable @Override public Watermark checkAndGetNextWatermark (InputMessage lastElement, long extractedTimestamp) {return new Watermark (extractTTestestamp - 1500); }}

Át kell alakítanunk a sajátunkat LocalDateTime nak nek KorszakMásodik mivel ezt a formát várja a Flink. Az időbélyegek kiosztása után az összes időalapú művelet felhasználja az időt sentAt mező működni.

Mivel Flink arra számít, hogy az időbélyegek milliszekundumokban és toEpochSecond () az időt másodpercben adja vissza, amelyet meg kellett szorozni 1000-tel, így a Flink helyesen fogja létrehozni az ablakokat.

Flink meghatározza az a fogalmát Vízjel. A vízjelek hasznosak olyan adatok esetén, amelyek nem a küldési sorrendben érkeznek. A vízjel meghatározza az elemek feldolgozásához engedélyezett maximális késést.

A vízjelnél alacsonyabb időbélyegzővel rendelkező elemeket egyáltalán nem dolgozzuk fel.

10. A Time Windows létrehozása

Annak biztosítására, hogy a biztonsági másolatunk csak egy nap alatt küldött üzeneteket gyűjtsön össze, használhatjuk a timeWindowAll metódus a folyamon, amely az üzeneteket ablakokra osztja fel.

Azonban továbbra is összesítenünk kell az üzeneteket az egyes ablakokból, és vissza kell küldenünk őket biztonsági mentés.

Ehhez szükségünk lesz egy szokásra AggregateFunction:

public class BackupAggregator megvalósítja az AggregateFunction funkciót {@Orride public list createAccumulator () {return new ArrayList (); } @Orride public List add (InputMessage inputMessage, List inputMessages) {inputMessages.add (inputMessage); return inputMessages; } @Orride public Backup getResult (List inputMessages) {return new Backup (inputMessages, LocalDateTime.now ()); } @Orride public list merge (List inputMessages, List acc1) {inputMessages.addAll (acc1); return inputMessages; }}

11. Biztonsági mentések összesítése

Megfelelő időbélyegek kijelölése és a mi megvalósítása után AggregateFunction, végre felvehetjük Kafka-inputunkat és feldolgozhatjuk:

public static void createBackup () dobja a {String inputTopic = "flink_input" kivételt; String outputTopic = "flink_output"; Karakterlánc consumerGroup = "baeldung"; Karakterlánc kafkaAddress = "192.168.99.100:9092"; StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment (); environment.setStreamTimeCharacteristic (TimeCharacteristic.EventTime); FlinkKafkaConsumer011 flinkKafkaConsumer = createInputMessageConsumer (inputTopic, kafkaAddress, consumerGroup); flinkKafkaConsumer.setStartFromEarliest (); flinkKafkaConsumer.assignTimestampsAndWatermarks (új InputMessageTimestampAssigner ()); FlinkKafkaProducer011 flinkKafkaProducer = createBackupProducer (outputTopic, kafkaAddress); DataStream inputMessagesStream = environment.addSource (flinkKafkaConsumer); inputMessagesStream .timeWindowAll (Time.hours (24)) .aggregate (új BackupAggregator ()) .addSink (flinkKafkaProducer); környezet.execute (); }

12. Következtetés

Ebben a cikkben bemutattuk, hogyan lehet létrehozni egy egyszerű adatvezetéket az Apache Flink és az Apache Kafka használatával.

Mint mindig, a kód megtalálható a Githubon.