Bevezetés az Apache Storm-ba

1. Áttekintés

Ez az oktatóanyag az Apache Storm bevezetője lesz, elosztott valós idejű számítási rendszer.

Összpontosítunk és kitérünk:

  • Mi is pontosan az Apache Storm és milyen problémákat old meg
  • Építészete, és
  • Hogyan lehet használni egy projektben

2. Mi az Apache Storm?

Az Apache Storm ingyenes és nyílt forráskódú elosztott rendszer valós idejű számításokhoz.

Hibatűrést, skálázhatóságot biztosít és garantálja az adatfeldolgozást, és különösen jó a korlátlan adatfolyamok feldolgozásában.

A Storm néhány jó felhasználási esete lehet a hitelkártya-műveletek feldolgozása csalás észlelésére vagy az intelligens otthonokból származó adatok feldolgozása a hibás érzékelők felderítésére.

A Storm lehetővé teszi az integrációt a piacon elérhető különféle adatbázisokkal és sorban álló rendszerekkel.

3. Maven-függőség

Az Apache Storm használata előtt be kell építenünk a vihar-magfüggőséget projektünkbe:

 org.apache.storm storm-core 1.2.2 biztosított 

Csak a biztosított hatókört ha az alkalmazásunkat a Storm fürtön kívánjuk futtatni.

Az alkalmazás helyi futtatásához használhatunk egy úgynevezett helyi módot, amely a Storm fürtöt egy helyi folyamatban szimulálja, ebben az esetben el kell távolítanunk a biztosítani.

4. Adatmodell

Az Apache Storm adatmodellje két elemből áll: sorokból és adatfolyamokból.

4.1. Tuple

A Tuple a dinamikus típusú elnevezett mezők rendezett listája. Ez azt jelenti, hogy nem kell kifejezetten deklarálnunk a mezők típusait.

A Stormnak tudnia kell, hogyan kell sorosítani az összes értéket, amelyet egy párosban használnak. Alapértelmezés szerint már képes sorosítani a primitív típusokat, Húrok és byte tömbök.

És mivel a Storm a Kryo sorosítást használja, a sorosítót a következővel kell regisztrálnunk Konfig az egyedi típusok használatához. Ezt kétféleképpen tehetjük meg:

Először a teljes név használatával regisztrálhatjuk az osztályt, hogy sorosítsuk:

Config config = új Config (); config.registerSerialization (Felhasználó.osztály);

Ilyen esetben Kryo a sorozatot használja az osztály használatával FieldSerializer. Alapértelmezés szerint ez az osztály összes nem átmeneti mezőjét sorosítja, mind a magán, mind a nyilvános mezőt.

Vagy ehelyett megadhatjuk mind az osztályt a sorozatosításhoz, mind pedig azt a sorosítót, amelyet a Stormnak az adott osztályhoz használni kívánunk:

Config config = új Config (); config.registerSerialization (User.class, UserSerializer.class);

Az egyéni sorosító létrehozásához ki kell terjesztenünk az általános osztályt Serializer ennek két módszere van ír és olvas.

4.2. Folyam

A Folyam a Storm ökoszisztéma alapvető absztrakciója. A Folyam a sorok korlátlan sorozata.

A Storms lehetővé teszi több adatfolyam párhuzamos feldolgozását.

Minden adatfolyamnak van egy azonosítója, amelyet megadnak és hozzárendelnek a deklaráció során.

5. Topológia

A valós idejű Storm alkalmazás logikája a topológiába van csomagolva. A topológia áll kifolyók és csavarok.

5.1. Kilövell

A kifolyók a patakok forrásai. Tollokat bocsátanak ki a topológiára.

A tömbök különféle külső rendszerekből olvashatók, mint a Kafka, a Kestrel vagy az ActiveMQ.

Kiöntő lehet megbízható vagy megbízhatatlan. Megbízható azt jelenti, hogy a kiöntő válaszolhat arra, hogy azt a duplát, amelyet a Storm nem dolgozott fel. Megbízhatatlan azt jelenti, hogy a kiöntő nem válaszol, mivel tűz-felejtés mechanizmust fog használni a hullámok kibocsátására.

Az egyedi kifolyó létrehozásához végre kell hajtanunk a IRichSpout interfészt, vagy kiterjeszthet bármely olyan osztályt, amely már megvalósítja a felületet, például egy absztrakt BaseRichSpout osztály.

Hozzunk létre egy megbízhatatlan kilövell:

a RandomIntSpout nyilvános osztály kiterjeszti a BaseRichSpout {private Random random; privát SpoutOutputCollector outputCollector; @Orride public void open (Térképtérkép, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); outputCollector = spoutOutputCollector; } @Orride public void nextTuple () {Utils.sleep (1000); outputCollector.emit (új értékek (random.nextInt (), System.currentTimeMillis ())); } @Orride public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (új mezők ("randomInt", "időbélyegző")); }}

A mi szokásunk RandomIntSpout másodpercenként véletlenszerű egész számot és időbélyeget generál.

5.2. Csavar

A csavarok feldolgozzák a patakokban lévő hullámokat. Különböző műveleteket hajthatnak végre, például szűrést, összesítést vagy egyedi funkciókat.

Egyes műveletek több lépést igényelnek, ezért ilyen esetekben több csavart kell használnunk.

Az egyedi létrehozásához Csavar, végre kell hajtanunk IRichBolt vagy egyszerűbb műveletekhez IBasicBolt felület.

A megvalósításhoz több segítő osztály is rendelkezésre áll Csavar. Ebben az esetben használni fogjuk BaseBasicBolt:

public class PrintingBolt kiterjeszti a BaseBasicBolt {@Override public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {System.out.println (tuple); } @Orride public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {}}

Ez a szokás PrintingBolt egyszerűen kinyomtatja az összes sorrendet a konzolra.

6. Egyszerű topológia létrehozása

Tegyük ezeket az ötleteket egy egyszerű topológiává. Topológiánkban egy kifolyó és három csavar lesz.

6.1. RandomNumberSpout

Az elején megbízhatatlan kiöntőt hozunk létre. Minden másodpercben véletlenszerű egész számokat generál a tartományból (0,100):

a RandomNumberSpout nyilvános osztály kiterjeszti a BaseRichSpout {private Random random; privát SpoutOutputCollector gyűjtő; @Orride public void open (Térképtérkép, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {random = new Random (); gyűjtő = spoutOutputCollector; } @Orride public void nextTuple () {Utils.sleep (1000); int művelet = random.nextInt (101); hosszú időbélyeg = System.currentTimeMillis (); Értékek = új értékek (művelet, időbélyeg); gyűjtő.emit (értékek); } @Orride public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (új mezők ("művelet", "időbélyegző")); }}

6.2. FilteringBolt

Ezután létrehozunk egy csavart, amely kiszűri az összes elemet művelet 0-val egyenlő:

public class FilteringBolt kiterjeszti a BaseBasicBolt {@Orride public void execute (Tuple tuple, BasicOutputCollector basicOutputCollector) {int művelet = tuple.getIntegerByField ("művelet"); if (művelet> 0) {basicOutputCollector.emit (tuple.getValues ​​()); }} @Orride public void declareOutputFields (OutputFieldsDeclarer outputFieldsDeclarer) {outputFieldsDeclarer.declare (új mezők ("művelet", "időbélyegző")); }}

6.3. ÖsszegzésBolt

Ezután hozzunk létre egy bonyolultabbat Csavar amely összesíti az összes pozitív műveletet az egyes napokból.

Erre a célra egy speciális osztályt fogunk használni, amelyet kifejezetten az ablakokkal működő csavarok megvalósításához használunk, ahelyett, hogy egyetlen sorrendben működnénk: BaseWindowedBolt.

ablakok a folyam feldolgozásának elengedhetetlen fogalma, a végtelen folyamokat véges darabokra osztva. Ezután számításokat alkalmazhatunk minden egyes darabra. Általában kétféle ablak van:

Az időablakokat egy adott időtartam elemeinek időbélyegzőkkel történő csoportosítására használják. Az időablakok eltérő számú elemet tartalmazhatnak.

A számláló ablakokkal meghatározott méretű ablakokat lehet létrehozni. Ilyen esetben az összes ablak azonos méretű lesz, és az ablak is nem bocsáthatók ki, ha a megadott méretnél kevesebb elem van.

A mi ÖsszegzésBolt az összes pozitív művelet összegét az a-ból generálja idő ablak kezdete és vége időbélyegzővel együtt:

public class AggregatingBolt kiterjeszti a BaseWindowedBolt {private OutputCollector outputCollector; @Orride public void prepar (Map stormConf, TopologyContext context, OutputCollector collector) {this.outputCollector = gyűjtő; } @Orride public void declareOutputFields (OutputFieldsDeclarer deklarer) {deklarer.declare (új mezők ("sumOfOperations", "kezdőTimestamp", "endTimestamp")); } @Orride public void execute (TupleWindow tupleWindow) {Sorszámok = tupleWindow.get (); tuples.sort (Összehasonlító.összehasonlítás (ez :: getTimestamp)); int sumOfOperations = tuples.stream () .mapToInt (tuple -> tuple.getIntegerByField ("művelet")) .sum (); Hosszú kezdeti időbélyeg = getTimestamp (tuples.get (0)); Long endTimestamp = getTimestamp (tuples.get (tuples.size () - 1)); Értékértékek = új értékek (sumOfOperations, kezdőTimestamp, endTimestamp); outputCollector.emit (értékek); } privát Long getTimestamp (Tuple tuple) {return tuple.getLongByField ("időbélyeg"); }}

Vegye figyelembe, hogy ebben az esetben a lista első elemének közvetlen megszerzése biztonságos. Ez azért van, mert minden ablak kiszámítása a időbélyeg mező Tuple, így lennie kell minden ablakban legalább egy elem.

6.4. FileWritingBolt

Végül létrehozunk egy csavart, amely minden elemet magával ragad sumOfOperations nagyobb, mint 2000, sorosítsa őket, és írja be a fájlba:

public class FileWritingBolt kiterjeszti a BaseRichBolt {public static Logger logger = LoggerFactory.getLogger (FileWritingBolt.class); privát BufferedWriter író; privát String filePath; private ObjectMapper objectMapper; @Orride public void cleanup () {try {író.close (); } catch (IOException e) {logger.error ("Nem sikerült bezárni az írót!"); }} @Orride public void prepar (Térképtérkép, TopologyContext topologyContext, OutputCollector outputCollector) {objectMapper = new ObjectMapper (); objectMapper.setVisibility (PropertyAccessor.FIELD, JsonAutoDetect.Visibility.ANY); próbáld ki: {író = új BufferedWriter (új FileWriter (filePath)); } catch (IOException e) {logger.error ("Nem sikerült megnyitni egy fájlt írásra.", e); }} @Orride public void execute (Tuple tuple) {int sumOfOperations = tuple.getIntegerByField ("sumOfOperations"); hosszú kezdési időbélyeg = tuple.getLongByField ("kezdő időbélyeg"); long endTimestamp = tuple.getLongByField ("endTimestamp"); if (sumOfOperations> 2000) {AggregatedWind aggregatedWindow = new AggregatedWindow (sumOfOperations, beginTimestamp, endTimestamp); próbáld ki az {író.írási (objectMapper.writeValueAsString (aggregatedWindow)) parancsot; író.új sor (); író.öblítés (); } catch (IOException e) {logger.error ("Nem sikerült adatokat fájlba írni.", e); }}} // nyilvános konstruktor és más módszerek}

Ne feledje, hogy nem kell deklarálnunk a kimenetet, mivel ez lesz az utolsó csavar topológiánkban

6.5. A topológia futtatása

Végül mindent összeszedhetünk és futtathatjuk topológiánkat:

public static void runTopology () {TopologyBuilder builder = new TopologyBuilder (); Spout random = new RandomNumberSpout (); builder.setSpout ("randomNumberSpout"); Csavarszűrés = új FilteringBolt (); builder.setBolt ("filteringBolt", szűrés) .shuffleGrouping ("randomNumberSpout"); Csavar összesítése = new AggregatingBolt () .withTimestampField ("időbélyeg") .withLag (BaseWindowedBolt.Duration.seconds (1)) .Window (BaseWindowedBolt.Duration.seconds (5)); builder.setBolt ("aggregatingBolt", aggregating) .shuffleGrouping ("filteringBolt"); String filePath = "./src/main/resources/data.txt"; Bolt fájl = új FileWritingBolt (filePath); builder.setBolt ("fileBolt", fájl) .shuffleGrouping ("aggregatingBolt"); Config config = új Config (); config.setDebug (hamis); LocalCluster fürt = new LocalCluster (); cluster.submitTopology ("Test", config, builder.createTopology ()); }

Ahhoz, hogy az adatáramlás a topológia egyes részein keresztül történjen, meg kell jelölnünk, hogyan lehet ezeket összekapcsolni. shuffleGroup lehetővé teszi számunkra, hogy kijelentsük ezeket az adatokat filteringBolt származni fog randomNumberSpout.

Az egyes Csavar, hozzá kell tennünk shuffleGroup amely meghatározza ennek a csavarnak az elemek forrását. Az elemek forrása lehet a Kilövell vagy egy másik Csavar. És ha ugyanazt a forrást több csavarra állítjuk be, a forrás mindegyik elemet kibocsátja.

Ebben az esetben topológiánk a LocalCluster hogy a munkát helyben vezesse.

7. Következtetés

Ebben az oktatóanyagban bemutattuk az Apache Storm elosztott valós idejű számítási rendszert. Hoztunk létre egy kifolyót, néhány csavart, és összehúztuk őket egy teljes topológiává.

És mint mindig, az összes kódminta megtalálható a GitHubon.