Bevezetés a Netflix Mantis-ba

1. Áttekintés

Ebben a cikkben a Netflix által kifejlesztett Mantis platformot vesszük szemügyre.

Felfedezzük a Mantis fő fogalmait egy adatfolyam-feldolgozási feladat létrehozásával, futtatásával és kivizsgálásával.

2. Mi a sáska?

A Mantis egy platform az adatfolyam-feldolgozó alkalmazások kiépítésére (munkahelyek). Ez egy egyszerű módja annak a munkák telepítésének és életciklusának kezelése. Sőt, azt is megkönnyíti az erőforrások elosztását, felfedezését és a kommunikációt e munkák között.

Ezért a fejlesztők a tényleges üzleti logikára koncentrálhatnak, miközben a robusztus és méretezhető platform nagy volumenű, alacsony késleltetésű, nem blokkoló alkalmazások futtatásához.

A Mantis feladat három különálló részből áll:

  • a forrás, felelős az adatok külső forrásból történő lekéréséért
  • egy vagy több szakasz, felelős a bejövő eseményfolyamok feldolgozásáért
  • és a mosogató amely összegyűjti a feldolgozott adatokat

Most vizsgáljuk meg mindegyiket.

3. Beállítás és függőségek

Kezdjük azzal, hogy hozzáadjuk a sáska-futásidő és jackson-databind függőségek:

 io.mantisrx mantis-runtime com.fasterxml.jackson.core jackson-databaseind 

A munkánk adatforrásának beállításához valósítsuk meg a Mantis-t Forrás felület:

public class RandomLogSource implementálja a Forrás {@Override public Observable call (Context context, Index index) {return Observable.just (Observable .interval (250, TimeUnit.MILLISECONDS) .map (this :: createRandomLogEvent)); } private String createRandomLogEvent (hosszú pipa) {// véletlenszerű naplóbejegyzés-karakterláncot generál ...}}

Mint láthatjuk, egyszerűen véletlenszerű naplóbejegyzéseket generál másodpercenként többször.

4. Első munkánk

Hozzunk létre most egy Mantis-feladatot, amely egyszerűen összegyűjti a naplóeseményeket a mi adatbázisunkból RandomLogSource. Később a komplexebb és érdekesebb eredmény érdekében hozzáadunk csoport- és összesítési transzformációkat.

Először hozzuk létre a LogEvent entitás:

public class LogEvent megvalósítja a JsonType {private Long index; privát húr szint; privát karakterlánc üzenet; // ...}

Ezután tegyük hozzá a sajátunkat TransformLogStage.

Ez egy egyszerű szakasz, amely megvalósítja a ScalarComputation felületet, és felosztja a naplóbejegyzést a LogEvent. Ezenkívül kiszűri a hibásan formázott karakterláncokat:

public class TransformLogStage implementálja a ScalarComputation {@Override public Observable call (Context context, Observable logEntry) {return logEntry .map (log -> log.split ("#")) .filter (parts -> parts.length == 3). térkép (LogEvent :: új); }}

4.1. A munka futtatása

Ezen a ponton elegendő építőelem áll rendelkezésünkre Mantis munkánk összeállításához:

a public class LogCollectingJob kiterjeszti a MantisJobProvider {@Orride public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), new ScalarToScalar.Config ()) .sink (Sinks.eagerSubscribe LogEvent :: toJsonString))) .metadata (új Metadata.Builder (). Build ()) .create (); }}

Vizsgáljuk meg közelebbről a munkánkat.

Mint láthatjuk, kiterjed MantisJobProvider. Eleinte adatokat gyűjt le a mi RandomLogSource és alkalmazza a TransformLogStage a behozott adatokra. Végül a feldolgozott adatokat elküldi a beépített mosogatónak, amely lelkesen feliratkozik és adatokat továbbít SSE-n keresztül.

Most konfiguráljuk a munkánkat az indításkor helyi végrehajtásra:

@SpringBootApplication nyilvános osztály A MantisApplication végrehajtja a CommandLineRunner {// ... @Orride public void run (String ... args) {LocalJobExecutorNetworked.execute (new LogCollectingJob (). GetJobInstance ()); }}

Futtassuk az alkalmazást. Látni fogunk egy naplóüzenetet, például:

... A modern HTTP SSE szerver mosogató kiszolgálása a 86XX porton

Most csatlakozzunk a mosogatóhoz becsavar:

$ curl localhost: 86XX adatok: {"index": 86, "level": "WARN", "message": "bejelentkezési kísérlet"} data: {"index": 87, "level": "ERROR", "üzenet ":" felhasználó létrehozta "} adatok: {" index ": 88," szint ":" INFO "," üzenet ":" felhasználó létrehozta "} adatok: {" index ": 89," szint ":" INFO ", "message": "bejelentkezési kísérlet"} data: {"index": 90, "level": "INFO", "message": "user created"} adatok: {"index": 91, "level": "HIBA" "," message ":" felhasználó létrehozta "} data: {" index ": 92," level ":" WARN "," message ":" bejelentkezési kísérlet "} data: {" index ": 93," level ": "INFO", "message": "felhasználó létrehozta"} ...

4.2. A mosogató konfigurálása

Eddig a beépített mosogatót használtuk feldolgozott adataink gyűjtéséhez. Lássuk, ha nagyobb rugalmasságot adhatunk a forgatókönyvünkhöz egy egyedi mosogató biztosításával.

Mi lenne, ha például a naplókat szeretnénk szűrni üzenet?

Hozzunk létre egy LogSink hogy megvalósítja a Mosogató felület:

public class LogSink megvalósítja a Sink {@Orride public void call (Context context, PortRequest portRequest, Observable logEventObservable) {SelfDocumentingSink sink = new ServerSentEventsSink.Builder () .withEncoder (LogEvent :: toJsonString) .withPredicate (filterBild). ; logEventObservable.subscribe (); sink.call (context, portRequest, logEventObservable); } private Predicate filterByLogMessage () {return new Predicate ("szűrés üzenet szerint", paraméterek -> {if (paraméterek! = null && parameters.containsKey ("szűrő")) {return logEvent -> logEvent.getMessage (). tartalmaz ( paraméterek.get ("szűrő"). get (0));} return logEvent -> true;}); }}

Ebben a mosogató megvalósításban konfiguráltunk egy predikátumot, amely a szűrő paraméter csak azoknak a naplóknak a lekérésére, amelyek tartalmazzák a szűrő paraméter:

$ curl localhost: 8874? filter = bejelentkezési adatok: {"index": 93, "level": "ERROR", "message": "bejelentkezési kísérlet"} data: {"index": 95, "level": "INFO "," message ":" bejelentkezési kísérlet "} data: {" index ": 97," level ":" HIBA "," message ":" bejelentkezési kísérlet "} ...

Megjegyzés: A Mantis egy hatékony lekérdező nyelvet, az MQL-t is kínál, amely használható adatfolyam lekérdezésére, átalakítására és elemzésére SQL módon.

5. Szakaszlánc

Tegyük fel most, hogy érdekel minket, hogy hány HIBA, FIGYELMEZTET, vagy INFO naplóbejegyzések egy adott időintervallumban. Ehhez hozzáadunk még két szakaszt a munkánkhoz, és összekapcsoljuk őket.

5.1. Csoportosítás

Először hozzuk létre a GroupLogStage.

Ez a szakasz a ToGroupComputation megvalósítás, amely megkapja a LogEvent adatfolyamot a meglévőből TransformLogStage. Ezt követően naplózási szint szerint csoportosítja a bejegyzéseket, és elküldi őket a következő szakaszba:

public class GroupLogStage megvalósítja a ToGroupComputation {@Override public Observable call (Context context, Observable logEvent) {return logEvent.map (log -> new MantisGroup (log.getLevel (), log)); } public static ScalarToGroup.Config config () {return new ScalarToGroup.Config () .description ("Group event data by level") .codec (JacksonCodecs.pojo (LogEvent.class)) .concurrentInput (); }}

Létrehoztunk egy egyéni szakaszkonfigurát is, megadva egy leírást, a kimenetet a kimenet sorosításához, és lehetővé tettük ennek a szakasznak a hívási metódusának egyidejű futtatását a concurrentInput ().

Egy dolgot meg kell jegyezni, hogy ez a szakasz vízszintesen méretezhető. Ez azt jelenti, hogy annyi példányt tudunk futtatni ebben a szakaszban, amennyire szükséges. Szintén említést érdemel, ha egy Mantis-fürtbe telepítve ez a szakasz adatokat küld a következő szakaszba, így egy adott csoporthoz tartozó összes esemény a következő szakasz ugyanazon munkavállalóján landol.

5.2. Összesítő

Mielőtt továbblépnénk és létrehoznánk a következő szakaszt, először tegyünk hozzá egy a-t LogAggregate entitás:

public class LogAggregate megvalósítja a JsonType {privát végső egész számot; magán végső String szint; }

Most hozzuk létre a lánc utolsó szakaszát.

Ez a szakasz megvalósítja GroupToScalarComputation és a naplócsoportok áramát skalárrá alakítja LogAggregate. Ezt úgy végzi, hogy megszámolja, hogy az egyes naplótípusok hányszor jelennek meg az adatfolyamban. Ezen kívül van még egy LogAggregationDuration paraméter, amely az összesítési ablak méretének szabályozására használható:

public class CountLogStage megvalósítja a GroupToScalarComputation {private int duration; @Orride public void init (Context context) {duration = (int) context.getParameters (). Get ("LogAggregationDuration", 1000); } @Orride public Observable call (Context context, Observable mantisGroup) {return mantisGroup .window (duration, TimeUnit.MILLISECONDS) .flatMap (o -> o.groupBy (MantisGroup :: getKeyValue) .flatMap (group -> group.reduce (0, (count, value) -> count = count + 1) .map ((count) -> new LogAggregate (count, group.getKey ())))); } public static GroupToScalar.Config config () {return new GroupToScalar.Config () .description ("naplószint eseményei") .codec (JacksonCodecs.pojo (LogAggregate.class)) .withParameters (getParameters ()); } public static List getParameters () {Lista params = new ArrayList (); params.add (új IntParameter () .name ("LogAggregationDuration") .description ("ablakméret az összesítéshez ezredmásodpercben") .validator (Validators.range (100, 10000)) .defaultValue (5000) .build ()); visszatérő paramek; }}

5.3. Konfigurálja és futtassa a munkát

Az egyetlen dolog, amit most meg kell tennie, a munkánk konfigurálása:

A public class LogAggregationJob kiterjeszti a MantisJobProvider {@Orride public Job getJobInstance () {return MantisJob .source (new RandomLogSource ()) .stage (new TransformLogStage (), TransformLogStage.stageConfig ()) .stage (new GroupLogStage (new GroupLogStage) (new GroupLogStage (new GroupLogStage) )) .stage (új CountLogStage (), CountLogStage.config ()) .sink (Sinks.eagerSubscribe (Sinks.sse (LogAggregate :: toJsonString))) .metadata (új Metadata.Builder (). build ()) .create (); }}

Amint futtatjuk az alkalmazást és végrehajtjuk az új feladatunkat, láthatjuk, hogy a naplószámok néhány másodpercenként lekérésre kerülnek:

$ curl localhost: 8133 adatok: {"count": 3, "level": "ERROR"} adatok: {"count": 13, "level": "INFO"} adatok: {"count": 4, "level" ":" WARN "} adatok: {" count ": 8," level ":" ERROR "} adatok: {" count ": 5," level ":" INFO "} adatok: {" count ": 7," szint ":" FIGYELEM "} ...

6. Következtetés

Összefoglalva, ebben a cikkben megnéztük, mi is az a Netflix Mantis, és mire használható. Ezenkívül megvizsgáltuk a fő fogalmakat, felhasználtuk őket munkahelyek felépítéséhez, és felfedeztük az egyedi konfigurációkat a különböző forgatókönyvekhez.

Mint mindig, a teljes kód elérhető a GitHubon.