Spracovanie dát v prostredí Apache Spark¶
!pip install findspark
!pip install pyspark # nainštalujeme a naimportujeme knižnice potrebne pre Apache spark
# vytvorenie spark aplikacie
import findspark
findspark.init()
import pyspark
from pyspark import SparkContext
sc = SparkContext("local", "test app")
Práca s RDD - Resilient Distributed Dataset¶
Základným objektom programátorského rozhrania Apache Spark je tzv. SparkContext. V interpretri PySpark je kontext automaticky nastavený v premennej sc.
Základným dátovým typov v Sparku je RDD (Resilient Distributed Dataset) reprezentujúci rozsiahlu kolekciu prvkov, ktorú je možné
spracovať paralelne na viacerých procesoroch alebo distribuovane na viacerých výpočtových uzloch
programátor k RDD pristupuje ako k lokálnej premennej aj keď sú jednotlivé operácie nad dátami vykonávané distribuovane na
viacerých počítačoch
RDD kolekciu môžete vytvoriť napr. z lokálneho poľa prvkov metódou parallelize.
data = ["spark", "rdd", "example", "sample", "example"]
rdd = sc.parallelize(data)
Nad RDD je možné vykonávať dva typy príkazov:
- akcie vrátia výsledný výsledok spracovania (napr. vypočítané číslo a pod.)
- transformácie transformujú pôvodnú kolekciu RDD na novú kolekciu RDD, viacero transformácií je možné zreťaziť transformácie nie sú vykonané ihneď po uvedení príkazu v kóde, zreťazené transformácie musia byť ukončené niektorou z akcií a až pri volaní výslednej akcie sa nad dátami spätne aplikujú postupne všetky transformácie (tzv. lazy execution).
Základné Spark akcie¶
Akcia count vráti počet prvkov v kolekcii.
rdd.count() # = 5
Ak chceme získať prvky RDD kolekcie ako pole hodnôt, môžeme použiť akciu collect.
rdd.collect() # = ["spark", "rdd", "example", "sample", "example"]
first vráti iba prvý prvok RDD.
rdd.first() # = "spark"
take(n) vráti prvých n prvkov RDD ako pole hodnôt.
rdd.take(4) # = ["spark", "rdd", "example", "sample"]
Pole n náhodne vybraných prvkov môžeme získať akciou takeSample
prvý parameter udáva či ide o výber s opakovaním (hodnota True) alebo bez (False)
voliteľne môžeme zadať aj inicializáciu generátora náhodných čísel ako tretí parameter seed (inak môžete získať vždy iný výber)
rdd.takeSample(True, 3)
Pomocou akcie reduce na všetky hodnoty RDD kolekcie môžete postupne aplikovať zadanú funkciu, ktorá ich zredukuje na jednu hodnotu
Jednoduchú funkciu môžete zadať priamo ako lambda výraz.
Nasledujúce volanie spojí hodnoty operátorom + a keďže hodnoty sú reťazce, výsledok je reťazec, ktorý vznikne spojením všetkých prvkov.
print(rdd.reduce(lambda x, y: x + y))
Ak chceme postupne zavolať danú funkciu pre každý prvok RDD kolekcie, môžeme použiť akciu foreach.
# najprv si definujeme jednoduchú funkciu, ktorá prevedie reťazec na veľké písmená a zobrazí ho na obrazovke
def print_upper(x):
print(x.upper())
# zavoláme funkciu print_upper pre každý prvok RDD kolekcie
rdd.foreach(print_upper)
Ak RDD množina obsahuje dvojice kľúč:hodnota, môžeme spočítať počet hodnôt pre každý kľúč pomocou akcie countByKey.
# najprv si vytvoríme RDD s dvojicami kľúč:hodnota
kv_pairs = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("a", 1), ("b", 1), ("b", 1), ("b", 1), ("b", 1)])
# a vypočítame počet hodnôt pre každý kľúč, výsledok operácie je mapa
kv_pairs.countByKey().items()
Základné Spark transformácie¶
Transformácia map vytvorí RDD kolekciu prvkov, ktoré vzniknú postupným aplikovaním zadanej funkcie na prvky pôvodnej RDD kolekcie.
Nižšie uvedený príklad vytvorí RDD kolekciu dvojíc, kde prvá zložka dvojice bude pôvodný reťazec z kolekcie rdd a druhá zložka
jeho dĺžka.
rdd2 = rdd.map(lambda x: (x, len(x)))
# prvky transformovanej kolekcie získame akciou ‘collect’
rdd2.collect()
Transformácia flatMap funguje podobne ako map, ale pre jeden prvok pôvodnej RDD kolekcie sa vygeneruje niekoľko prvkov
transformovanej kolekcie (zadaná funkcia by mala vracať pole vygenerovaných hodnôt).
# porovnajte napr. nasledujúci príkaz ktorý vygeneruje RDD kolekciu s 3 prvkami a každý prvok je pole
sc.parallelize([1, 2, 3]).map(lambda x: [x, x, x]).collect()
# = [[1, 1, 1], [2, 2, 2], [3, 3, 3]]
# s príkazom ktorý vygeneruje pre každý prvok pôvodnej RDD kolekcie 3 (rovnaké) transformované prvky, tzn. výsledná kolekcia bude
# mať 3x3=9 prvkov
sc.parallelize([2, 3, 4]).flatMap(lambda x: [x, x, x]).collect()
# = [2, 2, 2, 3, 3, 3, 4, 4, 4]
Transformácia filter vytvorí novú kolekciu RDD tak, že z pôvodnej kolekcie vyberie iba prvky, ktoré spĺňajú podmienku zadanej funkcie.
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
filtered_rdd = rdd.filter(lambda x: x % 2 == 0) # vyberieme iba párne čísla
filtered_rdd.collect()
Transformácia sample vytvorí RDD kolekciu s náhodným výberom prvkov z pôvodnej kolekcie
sample má 3 argumenty – prvý špecifikuje, či je výber s opakovaním, alebo bez, druhý špecifikuje veľkosť výberu a posledný
inicializáciu generátora náhodných čísel.
rdd = sc.parallelize(range(1, 10)) # vygenerujeme si postupnosť čísel od 1 do 10
sample_rdd = rdd.sample(True, 0.2) # náhodne vyberieme 20 % prvkov s opakovaním
Nad RDD kolekciami môžete robiť množinové operácie ako je napr. zjednotenie alebo prienik pomocou transformácií
union alebo intersection.
rdd1 = sc.parallelize(range(1, 15))
rdd2 = sc.parallelize(range(10, 21))
rdd1.union(rdd2).collect()
# = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
rdd1.intersection(rdd2).collect()
# = [10, 11, 12, 13, 14, 15]
Ak máme RDD kolekciu s dvojicami kľúč:hodnota, prvky môžeme zoskupiť transformáciou groupByKey, výsledok je
RDD kolekcia dvojíc (kľúč, iterátor hodnôt), iterátor je špeciálny typ Sparku ktorý umožňuje postupne prechádzať všetky
zoskupené prvky, iterátor napr. môžete previesť na zoznam Pythonu funkciou list alebo iterovať v cykle.
# najprv si vygenerujeme RDD kolekciu dvojíc typu (prvé písmeno reťazca, reťazec)
rdd = sc.parallelize(["spark", "rdd", "example", "sample", "example"]).map(lambda word: (word[0], word))
# zoskupíme slová podľa kľúča (prvého písmena)
group_rdd = rdd.groupByKey()
# ‘group_rdd’ je kolekcia dvojíc (prvé písmeno, iterátor slov začínajúcich na dané písmeno)
# ak chceme previesť iterátor, tj. druhú zložku dvojice na zoznam, musíme naň aplikovať funkciu ‘list’, čo môžeme zapísať pomocou
# transformácie ‘mapValues’
group_list = group_rdd.mapValues(lambda x: list(x))
# výsledok je RDD kolekcia dvojíc typu (prvé písmeno, zoznam slov začínajúcich na dané písmeno)
group_list.collect()
Transformácia reduceByKey zagreguje všetky hodnoty s daným kľúčom zadanou funkciou, prvky výslednej kolekcie sú dvojice
(kľúč, agregovaná hodnota), napr.:
kv_pairs = sc.parallelize([("a", 4), ("b", 2), ("a", 7), ("a", 4), ("b", 3)])
kv_pairs_count = kv_pairs.reduceByKey(lambda x, y: x + y) # obsahuje prvky ("a", 15), ("b", 5)
kv_pairs_count.collect()
Ak chceme RDD kolekciu typu kľúč:hodnota usporiadať podľa kľúča, môžeme použiť transformáciu sortByKey.
kv_pairs.sortByKey().collect()
Viac informácií o operáciách nad RDD si môžete prečítať na nasledujúcom odkaze (v angličtine).
RDD operácie nad dátami z reálnej dátovej množiny KDD Cup.¶
Najprv si stiahneme priamo v Pythone súbor dátovej množiny z internetu a uložíme ho do pracovného adresára.
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
Dátová množina obsahuje záznamy o sieťovej komunikácii medzi zariadeniami jednotlivé komunikačné spojenia sú charakterizované množinou atribútov (nominálnych aj numerických – pozri http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), posledným atribútom je typ komunikácie (‘normal.‘ popisuje štandardnú komunikáciu, ostatné rôzne typy útokov).
# dáta načítame ako RDD kolekciu zo súboru a zobrazíme prvých 5 záznamov
rawdata = sc.textFile("./kddcup.data_10_percent.gz")
rawdata.take(5)
# z výpisu je vidno, že dáta sú na začiatku reprezentované ako RDD kolekcia reťazcov načítaných zo súboru po riadkoch
# pomocou ‘count‘ možeme spočítať počet záznamov
rawdata.count()
# na rozdelenie riadkov na hodnoty môžeme použiť csv reader
import csv
rdd = sc.textFile("./kddcup.data_10_percent.gz")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))
# ak súbor obsahuje na prvom riadku hlavičku, môžeme ju odstrániť pomocou transformácie ‘filter‘
header = rdd.first()
rdd = rdd.filter(lambda x: x != header)
# nasledujúce príkazy spočítajú, koľko záznamov má cieľový atribút s hodnotou ‘normal‘
# najprv odfiltrujeme všetky riadky ktoré obsahujú reťazec ‘normal‘
normal_records = rawdata.filter(lambda x: "normal" in x)
# a zistíme ich počet
print(normal_records.count())
# naopak dáta, ktoré obsahujú iba údaje o neštandardnej komunikácii získame napr. takto:
attack_raw_data = rawdata.subtract(normal_records)
# pomocou transformácie ‘map‘ rozdelíme riadky na pole hodnôt s čiarkou ako oddeľovačom
csv_data = rawdata.map(lambda line: line.split(","))
# pomocou ‘map‘ si môžeme dáta aj preusporiadať, napr. si vygenerujeme RDD kolekciu typu kľúč:hodnota kde ako kľúč použijeme
# cieľový atribút (index atribútu 41) a ako hodnotu budeme mať pole hodnôt ostatných atribútov (indexy 0 až 40)
def create_kv(line):
elems = line.split(",") # rozdelíme riadok na podreťazce hodnôt
tag = elems[41] # tag je cieľový atribút
return (tag, elems[0:40]) # vrátime dvojicu t
# aplikujeme mapovaciu funkciu na RDD kolekciu
key_csv_data = rawdata.map(create_kv)
# pomocou ‘sample‘ si môžeme náhodne vybrať podmnožinu dát, vyberieme 10 % záznamov bez opakovania (1234 je inicializácia
# generátora náhodných čísel)
rawdata_sample = rawdata.sample(False, 0.1, 1234)
sample_size = rawdata_sample.count()
total_size = rawdata.count()
# vypíšeme počet vybraných záznamov a celkový počet záznamov
print("sample size is {0} of {1}".format(sample_size, total_size))
Úloha 7.2¶
Napíšte v Pythone pomocou Spark transformácií a akcií kód, ktorý spočíta počet výskytov slov vo vstupnom textovom súbore (ako vstupný súbor môžete použiť text z Úlohy 4.5).
Úloha 7.3¶
Pre dáta z dátovej množiny KDD Cup z príkladu na cvičení napíšte kód, ktorý vypíše pre nominálne atribúty (atribúty s indexmi 1,2,3) počet ich rôznych hodnôt.
Úloha 7.4¶
Pre dáta z dátovej množiny KDD Cup z príkladu na cvičení napíšte kód, ktorý vypočíta pre všetky záznamy s cieľovým atribútom (index 41) s hodnotou normal priemerný a celkový čas pripojenia (atribút s indexom 0).