Apache Spark: Különbségek az adatkeretek, az adatkészletek és az RDD között

1. Áttekintés

Az Apache Spark egy gyors, elosztott adatfeldolgozó rendszer. A memóriában történő adatfeldolgozást végzi, a memória gyorsítótárát és az optimalizált végrehajtást használja, amely gyors teljesítményt eredményez. Magas szintű API-kat biztosít olyan népszerű programnyelvekhez, mint a Scala, Python, Java és R.

Ebben a gyors oktatóanyagban a Spark három alapkoncepcióján megyünk keresztül: adatkeretek, adatkészletek és RDD-k.

2. DataFrame

A Spark SQL a Spark 1.3 óta DataFrame nevű táblázatos adatkivonatot vezetett be. Azóta a Spark egyik legfontosabb jellemzőjévé vált. Ez az API akkor hasznos, ha strukturált és félig strukturált, elosztott adatokat akarunk kezelni.

A 3. szakaszban tárgyaljuk a Rugalmas elosztott adatkészleteket (RDD). A DataFrames hatékonyabban tárolja az adatokat, mint az RDD-k, mert ezek az RDD-k megváltoztathatatlan, memóriában lévő, rugalmas, elosztott és párhuzamos képességeit használják, de sémát is alkalmaznak az adatokra. A DataFrames az SQL-kódot optimalizált alacsony szintű RDD-műveletekké is fordítja.

Három módon hozhatunk létre DataFrame-eket:

  • Meglévő RDD-k konvertálása
  • SQL lekérdezések futtatása
  • Külső adatok betöltése

Spark csapat bemutatta SparkSession a 2.0 verzióban egyesíti az összes különböző kontextust, biztosítva, hogy a fejlesztőknek ne kelljen aggódniuk különböző kontextusok létrehozása miatt:

SparkSession munkamenet = SparkSession.builder () .appName ("TouristDataFrameExample") .master ("local [*]") .getOrCreate (); DataFrameReader dataFrameReader = session.read ();

Elemezzük a Tourist.csv fájl:

Adatkészlet adat = dataFrameReader.option ("header", "true") .csv ("data / Tourist.csv");

Amióta a Spark 2.0 DataFrame a Adatkészlet típusú Sor, így a DataFrame-et használhatjuk álnévként a Adatkészlet.

Kiválaszthatunk konkrét oszlopokat, amelyek érdekelnek. Szűrhetünk és csoportosíthatunk is egy adott oszlop szerint:

data.select (col ("ország"), col ("év"), col ("érték")) .show (); data.filter (col ("ország"). equTo ("Mexikó")) .show (); data.groupBy (col ("ország")) .szám () .show ();

3. Adatkészletek

Az adatkészlet erősen tipizált, strukturált adatok összessége. Biztosítják az ismert objektum-orientált programozási stílust, valamint a típusbiztonság előnyeit, mivel az adatkészletek fordítási időben ellenőrizhetik a szintaxist és a hibákat.

Adatkészlet a DataFrame kiterjesztése, így a DataFrame-et egy adatkészlet be nem írt nézetének tekinthetjük.

A Spark csapata kiadta a Adatkészlet API a Spark 1.6-ban és mint említettük: „a Spark Datasets célja egy olyan API biztosítása, amely lehetővé teszi a felhasználók számára, hogy egyszerűen kifejezzék az átalakításokat az objektumtartományokban, ugyanakkor a Spark SQL végrehajtó motor teljesítményének és robusztusságának előnyeit is biztosítják”.

Először létre kell hoznunk egy típusú osztályt TouristData:

public class TouristData {privát String régió; privát String ország; magán String év; privát húr sorozat; magán Dupla érték; saját karakterlánc lábjegyzetek; privát String forrás; // ... getterek és beállítók}

Az egyes rekordjainknak a megadott típushoz való hozzárendeléséhez kódolót kell használnunk. A kódolók fordítanak a Java objektumok és a Spark belső bináris formátuma között:

// SparkSession inicializálása és adatbetöltése Adatkészlet responseWithSelectedColumns = data.select (col ("régió"), col ("ország"), col ("év"), col ("sorozat"), col ("érték") cast. ("kettős"), col ("lábjegyzetek"), col ("forrás")); Dataset typedDataset = responseWithSelectedColumns .as (Encoders.bean (TouristData.class));

A DataFrame-hez hasonlóan szűrhetünk és csoportosíthatunk egyes oszlopok szerint:

typedDataset.filter ((FilterFunction) rekord -> record.getCountry () .equals ("Norvégia")) .show (); typedDataset.groupBy (typedDataset.col ("ország")) .count () .show ();

Olyan műveleteket is végezhetünk, mint például egy adott tartománynak megfelelő oszlop szerinti szűrés vagy egy adott oszlop összegének kiszámítása, hogy megkapjuk annak teljes értékét:

typedDataset.filter ([FilterFunction] record -> record.getYear ()! = null && (Long.valueOf (record.getYear ())> 2010 && Long.valueOf (record.getYear ()) record.getValue ()! = null && record.getSeries () .contains ("ráfordítás")) .groupBy ("ország") .agg (összeg ("érték")) .show ();

4. RDD-k

A Rugalmas elosztott adatkészlet vagy az RDD a Spark elsődleges programozási absztrakciója. Olyan elemek gyűjteményét jelenti, amelyek: változhatatlan, rugalmas és elosztott.

Az RDD egy nagy adatkészletet foglal magában, a Spark automatikusan elosztja az RDD-kben található adatokat a fürtünkön, és párhuzamosan végzi a velük végzett műveleteket..

RDD-ket csak stabil adattárban lévő adatok vagy más RDD-k műveletei révén hozhatunk létre.

A hibatűrés elengedhetetlen, ha nagy adathalmazokkal foglalkozunk, és az adatokat fürtgépeken osztják szét. Az RDD-k a Spark beépített hibahelyezési mechanikája miatt rugalmasak. A Spark arra a tényre támaszkodik, hogy az RDD-k megjegyzik létrehozásuk módját, hogy a partíció helyreállítása érdekében könnyen visszavezethessük a származást..

Kétféle műveletet tudunk elvégezni az RDD-ken: Átalakítások és cselekvések.

4.1. Átalakulások

Alkalmazhatunk transzformációkat egy RDD-re annak adatainak manipulálására. A manipuláció végrehajtása után egy teljesen új RDD-t kapunk, mivel az RDD-k megváltoztathatatlan objektumok.

Megvizsgáljuk a Map and Filter megvalósítását, amely a két leggyakoribb átalakítás.

Először létre kell hoznunk a JavaSparkContext és töltse be az adatokat RDD-ként a Tourist.csv fájl:

SparkConf conf = new SparkConf (). SetAppName ("uppercaseCountries") .setMaster ("local [*]"); JavaSparkContext sc = új JavaSparkContext (conf); JavaRDD turisták = sc.textFile ("data / Tourist.csv");

Ezután alkalmazzuk a térképfunkciót, hogy minden rekordból megkapjuk az ország nevét, és alakítsuk át a nevet nagybetűvé. Ezt az újonnan létrehozott adatkészletet szöveges fájlként menthetjük a lemezre:

JavaRDD upperCaseCountries = turisták.map (sor -> {String [] oszlopok = line.split (COMMA_DELIMITER); visszatérő oszlopok [1] .toUpperCase ();}). Külön (); upperCaseCountries.saveAsTextFile ("adatok / kimenet / nagybetűs.txt");

Ha csak egy adott országot akarunk kiválasztani, akkor alkalmazhatjuk a szűrő funkciót az eredeti turistáink RDD-jére:

JavaRDD turistákInMexico = turisták .filter (vonal -> line.split (COMMA_DELIMITER) [1] .egyenlő ("Mexikó")); turistákInMexico.saveAsTextFile ("adatok / output / touristInMexico.txt");

4.2. Műveletek

A műveletek visszaadják a végső értéket, vagy az eredményeket lemezre mentik, miután valamilyen számítást végeztek az adatokon.

A Sparkban a visszatérően használt műveletek közül kettő a Count és a Reduce.

Számoljuk meg az összes országot a CSV-fájlunkban:

// Spark Context inicializálása és adatterhelés JavaRDD országok = turisták.map (sor -> {String [] oszlopok = line.split (COMMA_DELIMITER); visszatérő oszlopok [1];}). Long numberOfCountries = országok.szám ();

Most országonként kiszámoljuk a teljes kiadást. Szűrnünk kell a kiadásokat tartalmazó leírásokat.

Ahelyett, hogy a JavaRDD, használjuk a JavaPairRDD. Az RDD pár egy olyan RDD típus, amely kulcs-érték párokat képes tárolni. Ellenőrizzük a következőt:

JavaRDD turistákKiadások = turisták .szűrő (sor -> sor.split (COMMA_DELIMITER) [3] .tartalmaz ("kiadások")); JavaPairRDD kiadásokPairRdd = turistákKiadások .mapToPair (sor -> {String [] oszlopok = line.split (COMMA_DELIMITER); adja vissza az új Tuple2-t (oszlopok [1], Double.valueOf (oszlopok [6]);}); Lista totalByCountry = kiadásPárRdd .reduceByKey ((x, y) -> x + y) .collect ();

5. Következtetés

Összefoglalva: ha tartományspecifikus API-ra van szükségünk, akkor DataFrame-eket vagy adatkészleteket kell használnunk, olyan magas szintű kifejezésekre van szükségünk, mint például összesítés, összeg vagy SQL-lekérdezések. Vagy amikor a típusbiztonságot akarjuk a fordítás idején.

Másrészt RDD-ket kell használnunk, ha az adatok strukturálatlanok, és nincs szükségünk egy adott séma megvalósítására, vagy amikor alacsony szintű átalakításokra és műveletekre van szükségünk.

Mint mindig, az összes kódminta elérhető a GitHubon.


$config[zx-auto] not found$config[zx-overlay] not found