Útmutató az Apache Crunchhoz

1. Bemutatkozás

Ebben az oktatóanyagban bemutatjuk az Apache Crunch alkalmazást egy példa adatfeldolgozó alkalmazással. Ezt az alkalmazást a MapReduce keretrendszer segítségével fogjuk futtatni.

Kezdjük azzal, hogy röviden bemutatjuk az Apache Crunch néhány fogalmát. Ezután beugrunk egy minta alkalmazásba. Ebben az alkalmazásban szöveges feldolgozást fogunk végezni:

  • Először elolvassuk a szöveges fájl sorait
  • Később szavakra bontjuk őket, és eltávolítunk néhány általános szót
  • Ezután csoportosítjuk a fennmaradó szavakat, hogy listát kapjunk az egyedi szavakról és azok számáról
  • Végül ezt a listát egy szöveges fájlba írjuk

2. Mi a Crunch?

A MapReduce egy elosztott, párhuzamos programozási keretrendszer nagy mennyiségű adat feldolgozásához egy kiszolgálófürtön. Az olyan szoftveres keretrendszerek, mint a Hadoop és a Spark, megvalósítják a MapReduce programot.

A Crunch keretet nyújt a MapReduce csővezetékek írásához, teszteléséhez és futtatásához Java-ban. Itt nem a MapReduce munkákat írjuk közvetlenül. Inkább meghatározzuk az adatvezetéket (azaz a bemeneti, feldolgozási és kimeneti lépések végrehajtásának műveleteit) a Crunch API-k segítségével. A Crunch Planner leképezi őket a MapReduce feladatokra, és szükség esetén végrehajtja őket.

Ezért minden Crunch adatcsatornát a Csővezeték felület. Ez az interfész meghatározza az adatok csővezetékbe történő beolvasásának módszereit is Forrás példányok és adatok kiírása egy csővezetékről a Cél példányok.

3 felületünk van az adatok ábrázolásához:

  1. PCgyűjtés - változhatatlan, elosztott elemgyűjtemény
  2. PTable<>, V> - megváltoztathatatlan, elosztott, rendezetlen kulcs- és értéktérkép
  3. PGroupedTable<>, V> - a K típusú kulcsok elosztott, rendezett térképe az an Iterálható V, amelyet pontosan egyszer lehet ismételni

DoFn az összes adatfeldolgozási funkció alaposztálya. Megfelel Mapper, Csökkentő és Kombinátor osztályok a MapReduce-ban. A fejlesztési idő nagy részét a logikai számítások megírásával és tesztelésével töltjük.

Most, hogy jobban ismerjük a Crunch-ot, használjuk fel a példa alkalmazás felépítésére.

3. Crunch projekt felállítása

Először hozzunk létre egy Crunch projektet Maven-lel. Kétféleképpen tehetjük meg:

  1. Adja hozzá a szükséges függőségeket a pom.xml meglévő projekt fájlja
  2. Használjon archetípust indító projekt előállításához

Vessünk egy gyors pillantást mindkét megközelítésre.

3.1. Maven-függőségek

A Crunch hozzáadásához egy meglévő projekthez adjuk hozzá a szükséges függőségeket a pom.xml fájl.

Először tegyük hozzá a ropogós mag könyvtár:

 org.apache.crunch crunch-core 0.15.0 

Ezután tegyük hozzá a hadoop-kliens könyvtárban kommunikálni Hadoop-tal. A Hadoop telepítésnek megfelelő verziót használjuk:

 org.apache.hadoop hadoop-client 2.2.0 biztosított 

Ellenőrizhetjük a Maven Centralon a crunch-core és a hadoop-kliens könyvtárak legújabb verzióit.

3.2. Maven archetípus

Egy másik megközelítés egy indító projekt gyors generálása a Crunch által biztosított Maven archetípus felhasználásával:

mvn archetípus: generál -Dfilter = org.apache.crunch: crunch-archetípus 

Amikor a fenti parancs kéri, megadjuk a Crunch verziót és a projekt műtárgy részleteit.

4. Crunch Pipeline Setup

A projekt beállítása után létre kell hoznunk a Csővezeték tárgy. A Crunchnak 3 van Csővezeték megvalósítások:

  • MRPipeline - a Hadoop MapReduce programon belül hajt végre
  • SparkPipeline - a Spark csővezetékek sorozataként hajtja végre
  • MemPipeline - memóriában hajtja végre a klienst, és hasznos az egység teszteléséhez

Általában a. Példányának felhasználásával fejlesztünk és tesztelünk MemPipeline. Később használjuk a MRPipeline vagy SparkPipeline a tényleges végrehajtáshoz.

Ha memóriában lévő vezetékre lenne szükségünk, használhatnánk a statikus módszert getInstance hogy megkapja a MemPipeline példa:

Csővezeték = MemPipeline.getInstance ();

De most hozzunk létre egy példányt MRPipeline hogy az alkalmazást Hadoop-tal hajtsa végre:

Pipeline pipeline = új MRPipeline (WordCount.class, getConf ());

5. Olvassa el a bemeneti adatokat

A folyamatobjektum létrehozása után be akarjuk olvasni a bemeneti adatokat. A Csővezeték Az interfész kényelmes módszert kínál a szövegfájlból származó bemenetek olvasására, readTextFile (elérési út).

Hívjuk ezt a módszert a bemeneti szövegfájl olvasására:

PCollection vonalak = pipeline.readTextFile (inputPath);

A fenti kód a szövegfájlt a fájl gyűjteményeként olvassa fel Húr.

Következő lépésként írjunk egy tesztesetet a bemenet olvasására:

@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Pipeline pipeline = MemPipeline.getInstance (); PCollection vonalak = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, sorok.asCollection () .getValue () .size ()); }

Ebben a tesztben ellenőrizzük, hogy megkapjuk-e a várt sorszámot egy szöveges fájl olvasása közben.

6. Adatfeldolgozási lépések

A bemeneti adatok elolvasása után fel kell dolgoznunk azokat. A Crunch API számos alosztályt tartalmaz DoFn az általános adatfeldolgozási forgatókönyvek kezelésére:

  • FilterFn - szűri a gyűjtemény tagjait logikai feltétel alapján
  • MapFn - minden bemeneti rekordot pontosan egy kimeneti rekordhoz térképez
  • CombineFn - számos értéket egyetlen értékbe egyesít
  • JoinFn - olyan csatlakozásokat hajt végre, mint a belső illesztés, a bal külső illesztés, a jobb külső illesztés és a teljes külső illesztés

A következő adatfeldolgozási logikát valósítsuk meg az alábbi osztályok használatával:

  1. Bontsa szavakra a bemeneti fájl minden sorát
  2. Távolítsa el a leállító szavakat
  3. Számolja meg az egyedi szavakat

6.1. Szétválaszt egy sort a szavakra

Először hozzuk létre a Tokenizer osztályban egy sort szavakra bontani.

Meghosszabbítjuk a DoFn osztály. Ennek az osztálynak van egy absztrakt módszere folyamat. Ez a módszer a bemeneti rekordokat dolgozza fel a PCgyűjtés és elküldi a kimenetet egy Kibocsátó.

Meg kell valósítanunk a felosztási logikát ebben a módszerben:

a public class Tokenizer kiterjeszti a DoFn {private static final Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @Orride public void process (String line, Emitter emitter) {for (String word: SPLITTER.split (line)) {emitter.emit (word); }}} 

A fenti megvalósításban a Hasító osztály a guavai könyvtárból, hogy szavakat vonjon ki egy sorból.

Ezután írjunk egy egység tesztet a Tokenizer osztály:

@RunWith (MockitoJUnitRunner.class) nyilvános osztályú TokenizerUnitTest {@Mock private Emitter emitter; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEissions () {Tokenizer splitter = new Tokenizer (); splitter.process ("hello world", emitter); verify (emitter) .emit ("hello"); verify (emitter) .emit ("világ"); VerifyNoMoreInteractions (emitter); }}

A fenti teszt igazolja, hogy a helyes szavakat adják vissza.

Végül osszuk szét a bemeneti szövegfájlból olvasott sorokat ezzel az osztállyal.

A párhuzamos Do a metódusa PCgyűjtés interfész alkalmazza az adott DoFn minden elemre, és újat ad vissza PCgyűjtés.

Hívjuk ezt a módszert a vonalak gyűjteményén, és adjunk át egy példányt Tokenizer:

PCollection words = vonalak.parallelDo (új Tokenizer (), Writables.strings ()); 

Ennek eredményeként megkapjuk a beviteli szövegfájl szavainak listáját. A következő lépésben eltávolítjuk a leállító szavakat.

6.2. Távolítsa el a Stop szavakat

Az előző lépéshez hasonlóan hozzunk létre egy StopWordFilter osztály a stop szavak kiszűrésére.

Mi azonban meghosszabbítjuk FilterFn ahelyett DoFn. FilterFn nevű elvont módszerrel rendelkezik elfogad. A szűrési logikát ebben a módszerben kell megvalósítanunk:

a StopWordFilter nyilvános osztály kiterjeszti a FilterFn {// angol stop szavakat, amelyeket Lucene-től kölcsönzött. privát statikus végleges készlet STOP_WORDS = ImmutableSet .copyOf (új String [] {"a", "és", "are", "as", "at", "be", "but", "by", "for" , "if", "in", "into", "is", "it", "no", "not", "of", "on", "vagy", "s", "such", " t "," hogy "," a "," azok "," akkor "," ott "," ezek "," ők "," ez "," hogy "," volt "," lesz "," a " }); @Orride public boolean accept (String word) {return! STOP_WORDS.contains (word); }}

Ezután írjuk meg az egység tesztet StopWordFilter osztály:

public class StopWordFilterUnitTest {@Test public void givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("a")); assertFalse (filter.accept ("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = new StopWordFilter (); assertTrue (filter.accept ("Hello")); assertTrue (filter.accept ("Világ")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("Ez", "is", "a", "teszt", "mondat"); PCollection noStopWords = szavak.szűrő (új StopWordFilter ()); assertEquals (ImmutableList.of ("Ez", "teszt", "mondat"), Lists.newArrayList (noStopWords.materialize ())); }}

Ez a teszt ellenőrzi, hogy a szűrési logika megfelelően van-e végrehajtva.

Végül használjuk StopWordFilter az előző lépésben létrehozott szavak listájának szűrése. A szűrő a metódusa PCgyűjtés interfész alkalmazza az adott FilterFn minden elemre, és újat ad vissza PCgyűjtés.

Hívjuk ezt a módszert a gyûjtemény szavakra, és adjunk át egy példányt StopWordFilter:

PCollection noStopWords = szavak.szűrő (új StopWordFilter ());

Ennek eredményeként megkapjuk a szűrt szógyűjteményt.

6.3. Számoljon egyedi szavakat

A szűrt szógyűjtemény megszerzése után meg akarjuk számolni, hogy az egyes szavak milyen gyakran fordulnak elő. PCgyűjtés Az interfésznek számos módszere van a közös összesítések végrehajtására:

  • min - a gyűjtemény minimális elemét adja vissza
  • max - a gyűjtemény maximális elemét adja vissza
  • hossz - visszaadja a gyűjtemény elemeinek számát
  • számol - adja vissza a PTable amely tartalmazza a gyűjtemény egyes egyedi elemeinek számát

Használjuk a számol módszer az egyedi szavak és számuk megszerzésére:

// A számlálási módszer a Crunch primitívek sorozatát alkalmazza, és // visszaadja a bemeneti PCollection egyedi szavainak térképét a számlálásukra. PTable count = noStopWords.count ();

7. Adja meg a kimenetet

Az előző lépések eredményeként egy táblázatot készítünk a szavakról és azok számáról. Ezt az eredményt egy szöveges fájlba akarjuk írni. A Csővezeték Az interfész kényelmi módszereket kínál a kimenet írásához:

void write (PCollection gyűjtemény, Target target); void write (PCollection gyűjtemény, Target target, Target.WriteMode writeMode); void writeTextFile (PCollection gyűjtemény, String pathName);

Ezért hívjuk a writeTextFile módszer:

pipeline.writeTextFile (számlál, outputPath); 

8. A csővezeték-végrehajtás kezelése

Az eddigi összes lépés pontosan meghatározta az adatvezetéket. Egyetlen bemenet sem olvasható, sem feldolgozva. Ez azért van, mert A Crunch lusta végrehajtási modellt használ.

Addig nem futtatja a MapReduce jobokat, amíg a Pipeline felületen meg nem hívják a job tervezését és végrehajtását vezérlő módszert:

  • fuss - végrehajtási tervet készít a szükséges kimenetek létrehozásához, majd szinkron módon végrehajtja
  • Kész - futtatja a kimenetek létrehozásához szükséges fennmaradó feladatokat, majd megtisztítja a létrehozott köztes adatfájlokat
  • runAsync - hasonló a futási módszerhez, de nem blokkoló módon hajt végre

Ezért hívjuk a Kész módszer a folyamat végrehajtására MapReduce feladatokként:

PipelineResult result = pipeline.done (); 

A fenti utasítás futtatja a MapReduce feladatokat a bemenet olvasására, feldolgozására és az eredmény beírására a kimeneti könyvtárba.

9. A csővezeték összekapcsolása

Eddig fejlesztettük és egységesen teszteltük a logikát a bemeneti adatok olvasására, feldolgozására és a kimeneti fájlba történő írásra.

Ezután állítsuk össze őket a teljes adatvezeték megépítéséhez:

public int run (String [] args) dobja a {String inputPath = args [0] kivételt; String outputPath = args [1]; // Hozzon létre egy objektumot a folyamat létrehozásának és végrehajtásának összehangolásához. Pipeline pipeline = új MRPipeline (WordCount.class, getConf ()); // Hivatkozás egy adott szövegfájlra Strings gyűjteményeként. PCollection vonalak = pipeline.readTextFile (inputPath); // Határozzon meg egy függvényt, amely a PCollection of Strings minden egyes sorát felosztja // a fájl egyes szavaiból álló PCollection-re. // A második argumentum beállítja a sorosítási formátumot. PCollection szavak = vonalak.parallelDo (új Tokenizer (), Writables.strings ()); // Vegyük a szavak gyűjteményét, és távolítsuk el az ismert leállítási szavakat. PCollection noStopWords = words.filter (új StopWordFilter ()); // A számlálási módszer a Crunch primitívek sorozatát alkalmazza, és // visszaadja a bemeneti PCollection egyedi szavainak térképét a számlálásukra. PTable count = noStopWords.count (); // utasítsa a folyamatot, hogy írja le a kapott számokat egy szövegfájlba. pipeline.writeTextFile (számlál, outputPath); // Végezze el a folyamatot MapReduce fájlként. PipelineResult result = pipeline.done (); visszatérési eredmény.sikerült ()? 0: 1; }

10. Hadoop indítás konfigurálása

Az adatvezeték így készen áll.

Az indításhoz azonban szükségünk van a kódra. Ezért írjuk meg a fő- módszer az alkalmazás elindítására:

public class A WordCount kiterjeszti a Configured implements eszközt {public static void main (String [] args) dobja a Kivételt {ToolRunner.run (new Configuration (), new WordCount (), args); }

ToolRunner.run elemzi a Hadoop konfigurációt a parancssorból, és végrehajtja a MapReduce feladatot.

11. Futtassa az alkalmazást

A teljes alkalmazás készen áll. Futtassuk a következő parancsot a felépítéséhez:

mvn csomag 

A fenti parancs eredményeként megkapjuk a csomagolt alkalmazást és egy speciális job jarot a célkönyvtárban.

Használjuk ezt a jobot az alkalmazás Hadoopon történő futtatásához:

hadoop jar cél / crunch-1.0-SNAPSHOT-job.jar 

Az alkalmazás elolvassa a bemeneti fájlt, és az eredményt a kimeneti fájlba írja. A kimeneti fájl egyedi szavakat és azok számát tartalmazza, hasonlóan a következőkhöz:

[Hozzáadás, 1] [Hozzáadva, 1] [Csodálat, 1] [Elismerés, 1] [Engedély, 1]

A Hadoop mellett futtathatjuk az alkalmazást az IDE-n belül, önálló alkalmazásként vagy egységtesztként.

12. Következtetés

Ebben az oktatóanyagban létrehoztunk egy MapReduce-on futó adatfeldolgozó alkalmazást. Az Apache Crunch megkönnyíti a MapReduce csővezetékek írását, tesztelését és végrehajtását Java-ban.

Szokás szerint a teljes forráskód megtalálható a Github oldalon.