Útmutató az Apache Crunchhoz
1. Bemutatkozás
Ebben az oktatóanyagban bemutatjuk az Apache Crunch alkalmazást egy példa adatfeldolgozó alkalmazással. Ezt az alkalmazást a MapReduce keretrendszer segítségével fogjuk futtatni.
Kezdjük azzal, hogy röviden bemutatjuk az Apache Crunch néhány fogalmát. Ezután beugrunk egy minta alkalmazásba. Ebben az alkalmazásban szöveges feldolgozást fogunk végezni:
- Először elolvassuk a szöveges fájl sorait
- Később szavakra bontjuk őket, és eltávolítunk néhány általános szót
- Ezután csoportosítjuk a fennmaradó szavakat, hogy listát kapjunk az egyedi szavakról és azok számáról
- Végül ezt a listát egy szöveges fájlba írjuk
2. Mi a Crunch?
A MapReduce egy elosztott, párhuzamos programozási keretrendszer nagy mennyiségű adat feldolgozásához egy kiszolgálófürtön. Az olyan szoftveres keretrendszerek, mint a Hadoop és a Spark, megvalósítják a MapReduce programot.
A Crunch keretet nyújt a MapReduce csővezetékek írásához, teszteléséhez és futtatásához Java-ban. Itt nem a MapReduce munkákat írjuk közvetlenül. Inkább meghatározzuk az adatvezetéket (azaz a bemeneti, feldolgozási és kimeneti lépések végrehajtásának műveleteit) a Crunch API-k segítségével. A Crunch Planner leképezi őket a MapReduce feladatokra, és szükség esetén végrehajtja őket.
Ezért minden Crunch adatcsatornát a Csővezeték felület. Ez az interfész meghatározza az adatok csővezetékbe történő beolvasásának módszereit is Forrás példányok és adatok kiírása egy csővezetékről a Cél példányok.
3 felületünk van az adatok ábrázolásához:
- PCgyűjtés - változhatatlan, elosztott elemgyűjtemény
- PTable<>, V> - megváltoztathatatlan, elosztott, rendezetlen kulcs- és értéktérkép
- PGroupedTable<>, V> - a K típusú kulcsok elosztott, rendezett térképe az an Iterálható V, amelyet pontosan egyszer lehet ismételni
DoFn az összes adatfeldolgozási funkció alaposztálya. Megfelel Mapper, Csökkentő és Kombinátor osztályok a MapReduce-ban. A fejlesztési idő nagy részét a logikai számítások megírásával és tesztelésével töltjük.
Most, hogy jobban ismerjük a Crunch-ot, használjuk fel a példa alkalmazás felépítésére.
3. Crunch projekt felállítása
Először hozzunk létre egy Crunch projektet Maven-lel. Kétféleképpen tehetjük meg:
- Adja hozzá a szükséges függőségeket a pom.xml meglévő projekt fájlja
- Használjon archetípust indító projekt előállításához
Vessünk egy gyors pillantást mindkét megközelítésre.
3.1. Maven-függőségek
A Crunch hozzáadásához egy meglévő projekthez adjuk hozzá a szükséges függőségeket a pom.xml fájl.
Először tegyük hozzá a ropogós mag könyvtár:
org.apache.crunch crunch-core 0.15.0
Ezután tegyük hozzá a hadoop-kliens könyvtárban kommunikálni Hadoop-tal. A Hadoop telepítésnek megfelelő verziót használjuk:
org.apache.hadoop hadoop-client 2.2.0 biztosított
Ellenőrizhetjük a Maven Centralon a crunch-core és a hadoop-kliens könyvtárak legújabb verzióit.
3.2. Maven archetípus
Egy másik megközelítés egy indító projekt gyors generálása a Crunch által biztosított Maven archetípus felhasználásával:
mvn archetípus: generál -Dfilter = org.apache.crunch: crunch-archetípus
Amikor a fenti parancs kéri, megadjuk a Crunch verziót és a projekt műtárgy részleteit.
4. Crunch Pipeline Setup
A projekt beállítása után létre kell hoznunk a Csővezeték tárgy. A Crunchnak 3 van Csővezeték megvalósítások:
- MRPipeline - a Hadoop MapReduce programon belül hajt végre
- SparkPipeline - a Spark csővezetékek sorozataként hajtja végre
- MemPipeline - memóriában hajtja végre a klienst, és hasznos az egység teszteléséhez
Általában a. Példányának felhasználásával fejlesztünk és tesztelünk MemPipeline. Később használjuk a MRPipeline vagy SparkPipeline a tényleges végrehajtáshoz.
Ha memóriában lévő vezetékre lenne szükségünk, használhatnánk a statikus módszert getInstance hogy megkapja a MemPipeline példa:
Csővezeték = MemPipeline.getInstance ();
De most hozzunk létre egy példányt MRPipeline hogy az alkalmazást Hadoop-tal hajtsa végre:
Pipeline pipeline = új MRPipeline (WordCount.class, getConf ());
5. Olvassa el a bemeneti adatokat
A folyamatobjektum létrehozása után be akarjuk olvasni a bemeneti adatokat. A Csővezeték Az interfész kényelmes módszert kínál a szövegfájlból származó bemenetek olvasására, readTextFile (elérési út).
Hívjuk ezt a módszert a bemeneti szövegfájl olvasására:
PCollection vonalak = pipeline.readTextFile (inputPath);
A fenti kód a szövegfájlt a fájl gyűjteményeként olvassa fel Húr.
Következő lépésként írjunk egy tesztesetet a bemenet olvasására:
@Test public void givenPipeLine_whenTextFileRead_thenExpectedNumberOfRecordsRead () {Pipeline pipeline = MemPipeline.getInstance (); PCollection vonalak = pipeline.readTextFile (INPUT_FILE_PATH); assertEquals (21, sorok.asCollection () .getValue () .size ()); }
Ebben a tesztben ellenőrizzük, hogy megkapjuk-e a várt sorszámot egy szöveges fájl olvasása közben.
6. Adatfeldolgozási lépések
A bemeneti adatok elolvasása után fel kell dolgoznunk azokat. A Crunch API számos alosztályt tartalmaz DoFn az általános adatfeldolgozási forgatókönyvek kezelésére:
- FilterFn - szűri a gyűjtemény tagjait logikai feltétel alapján
- MapFn - minden bemeneti rekordot pontosan egy kimeneti rekordhoz térképez
- CombineFn - számos értéket egyetlen értékbe egyesít
- JoinFn - olyan csatlakozásokat hajt végre, mint a belső illesztés, a bal külső illesztés, a jobb külső illesztés és a teljes külső illesztés
A következő adatfeldolgozási logikát valósítsuk meg az alábbi osztályok használatával:
- Bontsa szavakra a bemeneti fájl minden sorát
- Távolítsa el a leállító szavakat
- Számolja meg az egyedi szavakat
6.1. Szétválaszt egy sort a szavakra
Először hozzuk létre a Tokenizer osztályban egy sort szavakra bontani.
Meghosszabbítjuk a DoFn osztály. Ennek az osztálynak van egy absztrakt módszere folyamat. Ez a módszer a bemeneti rekordokat dolgozza fel a PCgyűjtés és elküldi a kimenetet egy Kibocsátó.
Meg kell valósítanunk a felosztási logikát ebben a módszerben:
a public class Tokenizer kiterjeszti a DoFn {private static final Splitter SPLITTER = Splitter .onPattern ("\ s +") .omitEmptyStrings (); @Orride public void process (String line, Emitter emitter) {for (String word: SPLITTER.split (line)) {emitter.emit (word); }}}
A fenti megvalósításban a Hasító osztály a guavai könyvtárból, hogy szavakat vonjon ki egy sorból.
Ezután írjunk egy egység tesztet a Tokenizer osztály:
@RunWith (MockitoJUnitRunner.class) nyilvános osztályú TokenizerUnitTest {@Mock private Emitter emitter; @Test public void givenTokenizer_whenLineProcessed_thenOnlyExpectedWordsEissions () {Tokenizer splitter = new Tokenizer (); splitter.process ("hello world", emitter); verify (emitter) .emit ("hello"); verify (emitter) .emit ("világ"); VerifyNoMoreInteractions (emitter); }}
A fenti teszt igazolja, hogy a helyes szavakat adják vissza.
Végül osszuk szét a bemeneti szövegfájlból olvasott sorokat ezzel az osztállyal.
A párhuzamos Do a metódusa PCgyűjtés interfész alkalmazza az adott DoFn minden elemre, és újat ad vissza PCgyűjtés.
Hívjuk ezt a módszert a vonalak gyűjteményén, és adjunk át egy példányt Tokenizer:
PCollection words = vonalak.parallelDo (új Tokenizer (), Writables.strings ());
Ennek eredményeként megkapjuk a beviteli szövegfájl szavainak listáját. A következő lépésben eltávolítjuk a leállító szavakat.
6.2. Távolítsa el a Stop szavakat
Az előző lépéshez hasonlóan hozzunk létre egy StopWordFilter osztály a stop szavak kiszűrésére.
Mi azonban meghosszabbítjuk FilterFn ahelyett DoFn. FilterFn nevű elvont módszerrel rendelkezik elfogad. A szűrési logikát ebben a módszerben kell megvalósítanunk:
a StopWordFilter nyilvános osztály kiterjeszti a FilterFn {// angol stop szavakat, amelyeket Lucene-től kölcsönzött. privát statikus végleges készlet STOP_WORDS = ImmutableSet .copyOf (új String [] {"a", "és", "are", "as", "at", "be", "but", "by", "for" , "if", "in", "into", "is", "it", "no", "not", "of", "on", "vagy", "s", "such", " t "," hogy "," a "," azok "," akkor "," ott "," ezek "," ők "," ez "," hogy "," volt "," lesz "," a " }); @Orride public boolean accept (String word) {return! STOP_WORDS.contains (word); }}
Ezután írjuk meg az egység tesztet StopWordFilter osztály:
public class StopWordFilterUnitTest {@Test public void givenFilter_whenStopWordPassed_thenFalseReturned () {FilterFn filter = new StopWordFilter (); assertFalse (filter.accept ("a")); assertFalse (filter.accept ("a")); } @Test public void givenFilter_whenNonStopWordPassed_thenTrueReturned () {FilterFn filter = new StopWordFilter (); assertTrue (filter.accept ("Hello")); assertTrue (filter.accept ("Világ")); } @Test public void givenWordCollection_whenFiltered_thenStopWordsRemoved () {PCollection words = MemPipeline .collectionOf ("Ez", "is", "a", "teszt", "mondat"); PCollection noStopWords = szavak.szűrő (új StopWordFilter ()); assertEquals (ImmutableList.of ("Ez", "teszt", "mondat"), Lists.newArrayList (noStopWords.materialize ())); }}
Ez a teszt ellenőrzi, hogy a szűrési logika megfelelően van-e végrehajtva.
Végül használjuk StopWordFilter az előző lépésben létrehozott szavak listájának szűrése. A szűrő a metódusa PCgyűjtés interfész alkalmazza az adott FilterFn minden elemre, és újat ad vissza PCgyűjtés.
Hívjuk ezt a módszert a gyûjtemény szavakra, és adjunk át egy példányt StopWordFilter:
PCollection noStopWords = szavak.szűrő (új StopWordFilter ());
Ennek eredményeként megkapjuk a szűrt szógyűjteményt.
6.3. Számoljon egyedi szavakat
A szűrt szógyűjtemény megszerzése után meg akarjuk számolni, hogy az egyes szavak milyen gyakran fordulnak elő. PCgyűjtés Az interfésznek számos módszere van a közös összesítések végrehajtására:
- min - a gyűjtemény minimális elemét adja vissza
- max - a gyűjtemény maximális elemét adja vissza
- hossz - visszaadja a gyűjtemény elemeinek számát
- számol - adja vissza a PTable amely tartalmazza a gyűjtemény egyes egyedi elemeinek számát
Használjuk a számol módszer az egyedi szavak és számuk megszerzésére:
// A számlálási módszer a Crunch primitívek sorozatát alkalmazza, és // visszaadja a bemeneti PCollection egyedi szavainak térképét a számlálásukra. PTable count = noStopWords.count ();
7. Adja meg a kimenetet
Az előző lépések eredményeként egy táblázatot készítünk a szavakról és azok számáról. Ezt az eredményt egy szöveges fájlba akarjuk írni. A Csővezeték Az interfész kényelmi módszereket kínál a kimenet írásához:
void write (PCollection gyűjtemény, Target target); void write (PCollection gyűjtemény, Target target, Target.WriteMode writeMode); void writeTextFile (PCollection gyűjtemény, String pathName);
Ezért hívjuk a writeTextFile módszer:
pipeline.writeTextFile (számlál, outputPath);
8. A csővezeték-végrehajtás kezelése
Az eddigi összes lépés pontosan meghatározta az adatvezetéket. Egyetlen bemenet sem olvasható, sem feldolgozva. Ez azért van, mert A Crunch lusta végrehajtási modellt használ.
Addig nem futtatja a MapReduce jobokat, amíg a Pipeline felületen meg nem hívják a job tervezését és végrehajtását vezérlő módszert:
- fuss - végrehajtási tervet készít a szükséges kimenetek létrehozásához, majd szinkron módon végrehajtja
- Kész - futtatja a kimenetek létrehozásához szükséges fennmaradó feladatokat, majd megtisztítja a létrehozott köztes adatfájlokat
- runAsync - hasonló a futási módszerhez, de nem blokkoló módon hajt végre
Ezért hívjuk a Kész módszer a folyamat végrehajtására MapReduce feladatokként:
PipelineResult result = pipeline.done ();
A fenti utasítás futtatja a MapReduce feladatokat a bemenet olvasására, feldolgozására és az eredmény beírására a kimeneti könyvtárba.
9. A csővezeték összekapcsolása
Eddig fejlesztettük és egységesen teszteltük a logikát a bemeneti adatok olvasására, feldolgozására és a kimeneti fájlba történő írásra.
Ezután állítsuk össze őket a teljes adatvezeték megépítéséhez:
public int run (String [] args) dobja a {String inputPath = args [0] kivételt; String outputPath = args [1]; // Hozzon létre egy objektumot a folyamat létrehozásának és végrehajtásának összehangolásához. Pipeline pipeline = új MRPipeline (WordCount.class, getConf ()); // Hivatkozás egy adott szövegfájlra Strings gyűjteményeként. PCollection vonalak = pipeline.readTextFile (inputPath); // Határozzon meg egy függvényt, amely a PCollection of Strings minden egyes sorát felosztja // a fájl egyes szavaiból álló PCollection-re. // A második argumentum beállítja a sorosítási formátumot. PCollection szavak = vonalak.parallelDo (új Tokenizer (), Writables.strings ()); // Vegyük a szavak gyűjteményét, és távolítsuk el az ismert leállítási szavakat. PCollection noStopWords = words.filter (új StopWordFilter ()); // A számlálási módszer a Crunch primitívek sorozatát alkalmazza, és // visszaadja a bemeneti PCollection egyedi szavainak térképét a számlálásukra. PTable count = noStopWords.count (); // utasítsa a folyamatot, hogy írja le a kapott számokat egy szövegfájlba. pipeline.writeTextFile (számlál, outputPath); // Végezze el a folyamatot MapReduce fájlként. PipelineResult result = pipeline.done (); visszatérési eredmény.sikerült ()? 0: 1; }
10. Hadoop indítás konfigurálása
Az adatvezeték így készen áll.
Az indításhoz azonban szükségünk van a kódra. Ezért írjuk meg a fő- módszer az alkalmazás elindítására:
public class A WordCount kiterjeszti a Configured implements eszközt {public static void main (String [] args) dobja a Kivételt {ToolRunner.run (new Configuration (), new WordCount (), args); }
ToolRunner.run elemzi a Hadoop konfigurációt a parancssorból, és végrehajtja a MapReduce feladatot.
11. Futtassa az alkalmazást
A teljes alkalmazás készen áll. Futtassuk a következő parancsot a felépítéséhez:
mvn csomag
A fenti parancs eredményeként megkapjuk a csomagolt alkalmazást és egy speciális job jarot a célkönyvtárban.
Használjuk ezt a jobot az alkalmazás Hadoopon történő futtatásához:
hadoop jar cél / crunch-1.0-SNAPSHOT-job.jar
Az alkalmazás elolvassa a bemeneti fájlt, és az eredményt a kimeneti fájlba írja. A kimeneti fájl egyedi szavakat és azok számát tartalmazza, hasonlóan a következőkhöz:
[Hozzáadás, 1] [Hozzáadva, 1] [Csodálat, 1] [Elismerés, 1] [Engedély, 1]
A Hadoop mellett futtathatjuk az alkalmazást az IDE-n belül, önálló alkalmazásként vagy egységtesztként.
12. Következtetés
Ebben az oktatóanyagban létrehoztunk egy MapReduce-on futó adatfeldolgozó alkalmazást. Az Apache Crunch megkönnyíti a MapReduce csővezetékek írását, tesztelését és végrehajtását Java-ban.
Szokás szerint a teljes forráskód megtalálható a Github oldalon.