Predspracovanie dát v prostredí Apache Spark¶
Cieľom cvičenia je naučiť sa predspracovať dáta do vhodnej reprezentácie tak, aby mohli byť použité ako trénovacie, alebo testovacie dáta pri učení modelov.
!pip install findspark
!pip install pyspark # nainštalujeme a naimportujeme knižnice potrebne pre Apache spark
Dátové rámce¶
Dátové rámce (Data Frames) sú distribuovanou kolekciou dát, ktoré sú usporiadané do pomenovaných stĺpcov. Dátové rámce v Sparku môžu byť vytvorené z rôznych zdrojov napr. zo súborov, externých databáz, alebo transformovaním z existujúcich RDD kolekcií. Základným rozhraním pre prácu nad dátovými rámcami je objekt SparkSession, ktorý umožňuje aj dopytovanie pomocou jazyka SQL. V prostredí interpretra PySpark je objekt SparkSession dostupný priamo v premennej spark. V skripte je potrebné ho inicializovať a zadať aspoň názov aplikácie.
# vytvorenie spark aplikacie
import findspark
findspark.init()
import pyspark
# do skriptu si naimportujeme typ SparkSession z modulu ‘pyspark.sql‘
from pyspark.sql import SparkSession
# vytvoríme objekt ‘spark‘ a ako parameter ‘appName‘ nastavíme názov aplikácie (v distribuovanom prostredí môže naraz bežať
# viacero aplikácií, ktoré je potrebné pomenovať aby sme ich vedeli rozlíšiť)
spark = SparkSession.builder.appName("example28").getOrCreate()
sc = spark.sparkContext
# ďalej už môžeme používať objekt rozhrania ‘spark‘ na vytvorenie a spracovanie dátových rámcov
V nasledujúcom príklade aplikujeme operácie nad dátovým rámcom vytvoreným z reálnej dátovej množiny KDD Cup. Nasledujúce príkazy predspracujú dáta o sieťovej komunikácii medzi zariadeniami z predchádzajúceho cvičenia.
# stiahneme si dáta z minulého cvičenia z internetu a uložíme ich do pracovného adresára
import urllib.request
urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
# ‘raw_data’ je RDD kolekcia riadkov (reťazcov) načítaných z textového súboru
raw_data = sc.textFile("./kddcup.data_10_percent.gz")
# dáta sú v dátových rámcoch reprezentované po riadkoch ako objekty typu ‘Row’
from pyspark.sql import Row
# načítame si textový súbor po riadkoch a každý riadok rozdelíme na jednotlivé hodnoty (reťazce oddelené čiarkou)
csv_data = raw_data.map(lambda line: line.split(","))
# pre každý riadok (zoznam oddelených reťazcov) vytvoríme objekt typu ‘Row’, dáta zadáme ako zoznam pomenovaných parametrov, pričom
# názov parametra bude zodpovedať názvu dátového atribútu, pre číselné atribúty skonvertujeme hodnoty z reťazca na desatinné číslo
df_data = csv_data.map(lambda line: Row(
duration = float(line[0]),
protocol_type = line[1],
service = line[2],
flag = line[3],
src_bytes = float(line[4]),
dst_bytes = float(line[5]),
land = float(line[6]),
wrong_fragment = float(line[7]),
urgent = float(line[8]),
hot = float(line[9]),
num_failed_logins = float(line[10]),
logged_in = float(line[11]),
num_compromised = float(line[12]),
root_shell = float(line[13]),
su_attempted = float(line[14]),
num_root = float(line[15]),
num_file_creations = float(line[16]),
num_shells = float(line[17]),
num_access_files = float(line[18]),
num_outbound_cmds = float(line[19]),
is_host_login = float(line[20]),
is_guest_login = float(line[21]),
count = float(line[22]),
srv_count = float(line[23]),
serror_rate = float(line[24]),
srv_serror_rate = float(line[25]),
rerror_rate = float(line[26]),
srv_rerror_rate = float(line[27]),
same_srv_rate = float(line[28]),
diff_srv_rate = float(line[29]),
srv_diff_host_rate = float(line[30]),
dst_host_count = float(line[31]),
dst_host_srv_count = float(line[32]),
dst_host_same_srv_rate = float(line[33]),
dst_host_diff_srv_rate = float(line[34]),
dst_host_same_src_port_rate = float(line[35]),
dst_host_srv_diff_host_rate = float(line[36]),
dst_host_serror_rate = float(line[37]),
dst_host_srv_serror_rate = float(line[38]),
dst_host_rerror_rate = float(line[39]),
dst_host_srv_rerror_rate = float(line[40]),
attack_type = line[41]))
# dátový rámec potom môžeme vytvoriť z RDD kolekcie objektov typu ‘Row’
df = spark.createDataFrame(df_data)
# dátový rámec potom môžeme vytvoriť z RDD kolekcie objektov typu ‘Row’
df = spark.createDataFrame(df_data)
# nad objektom dátového rámca môžeme používať rôzne operácie podobne ako pri práci s kolekciami RDD
# operácia ‘take(n)’ vráti prvých n záznamov
df.take(5)
# operácia ‘show()’ vypíše dáta ako textovú tabuľku, voliteľne je možné zadať počet prvých zobrazovaných riadkov
df.show(5)
# dáta môžeme jednoducho filtrovať po riadkoch podľa zadanej podmienky, napr. odfiltrujeme záznamy s normálnou komunikáciou a zobrazíme
# ich počet, k jednotlivým dátovým atribútom pristupujeme podobne ako pri mapách cez ich názov v hranatých zátvorkách
normal_records = df.filter(df["attack_type"] == "normal.")
# výstupom operácie je nový dátový rámec, na ktorý môžeme aplikovať ďalšie operácie (volanie viacerých operácií môžeme zreťaziť)
# napr. metóda ‘count’ vráti celkový počet záznamov v dátovom rámci
normal_records.count()
# podobne môžete do nového rámca vybrať iba niektoré atribúty (stĺpce), napr. nasledujúci príkaz vyberie do nového rámca iba
# atribúty ‘duration’ a ‘attack_type’
duration_and_type = df.select("duration", "attack_type")
# dáta môžete zoskupiť podľa zadaného atribútu, napr. nasledujúci príkaz zoskupí dáta podľa typu útoku a pre každý typ vypočíta
# počet záznamov, výsledok je rámec s dvoma stĺpcami: ‘attack_type’ a ‘count’
type_counts = df.groupBy("attack_type").count()
type_counts.show()
# dáta môžete usporiadať príkazom ‘orderBy’, nasledujúci príkaz usporiada agregované hodnoty najprv zostupne podľa počtu a potom
# vzostupne abecedne podľa názvu typu
type_counts.orderBy(["count", "attack_type"], ascending=[0, 1]).show()
# pomocou operácie ‘describe’ môžete vypočítať základné štatistiky pre číselné atribúty
stats_df = df.describe(["duration", "dst_bytes"])
# vlastnosť objektu ‘dtypes’ obsahuje pole s názvami atribútov a ich typom (tzn. dátový rámec ‘stats_df’ sa skladá z troch
# atribútov: ‘summary’, ‘duration’ a ‘dst_type’ a všetky sú typu string
stats_df.dtypes
# metóda ‘collect’ prevedie všetky riadky dátového rámca na zoznam typu ‘Row’ (túto metódu používajte iba pre malé dátové
# množiny, napr. iba pre agregované výsledky)
stats_list = stats_df.collect()
# k objektom typu ‘Row’ môžeme pristupovať podobne ako k mape, nasledujúci cyklus vypíše podobný výstup ako metóda ‘show’
for row in stats_list:
print("{0:<20}\t{1:<20}\t{2:<20}".format(row["summary"], row["duration"], row["dst_bytes"]))
# operácie ‘stat.cov’ a ‘stat.corr’ vypočítajú kovarianciu, resp. koreláciu (Pearsonov korelačný koeficient) medzi dvoma
# číselnými atribútmi
print(df.stat.cov("src_bytes", "dst_bytes"))
print(df.stat.corr("src_bytes", "dst_bytes"))
# pre nominálne atribúty môžete vypočítať kontigenčnú tabuľku
df.stat.crosstab("attack_type", "protocol_type").show()
# nad dátovými rámcami sa môžete priamo dopytovať pomocou jazyka SQL
# najprv je potrebné zaregistrovať dátový rámec ako dočasnú tabuľku pod zvoleným názvom
df.registerTempTable("attacks")
# SQL príkaz môžete nad dátami vyhodnotiť pomocou objektu ‘SparkSession’, výsledok dopytu je nový dátový rámec, na ktorý môžete
# aplikovať ďalšie operácie
selected_records = spark.sql('SELECT * FROM attacks WHERE duration > 0 AND attack_type = "normal."')
selected_records.show(5)
# naraz si môžete zaregistrovať viacero tabuliek, ktoré môžete spájať cez SQL JOIN
# dátové rámce si môžete uložiť napr. do parquet súboru, ktorý potom môžete znovu efektívne načítať
# ‘write’ rozhranie podporuje aj ďalšie formáty (napr. CSV, JSON, alebo ORC)
# nasledujúci príklad uloží dátový rámec ‘selected_records’ do súboru ‘selected_records.parquet’ v aktuálnom pracovnom adresáry
# (keďže dáta môžu byť spracované paralelne, v pracovnom adresáry sa vytvorí podadresár s názvom ‘selected_records.parquet’ v ktorom
# budú uložené jednotlivé časti dátového rámca v samostatných súboroch)
selected_records.write.parquet("selected_records.parquet")
# dáta môžete znovu načítať pomocou ‘SparkSession.read’ rozhrania
selected_records2 = spark.read.parquet("selected_records.parquet")
selected_records2.show(5)
Viac informácií o dátových rámcoch môžete nájsť na stránke - https://spark.apache.org/docs/3.3.0/sql-programming-guide.html (v angličtine).
Úloha 8.2¶
Naštudujte si dokumentáciu k Bucketizer z MLlib, ktorý sa používa na transformáciu numerických (spojitých) atribútov na kategorické (nominálne). Numerické hodnoty zvoleného atribútu rozdelí do intervalov (buckets), ktorým priradí kategorické hodnoty. Zvoľte si ľubovoľný numerický atribút z datasetu z cvičenia a transformujte ho použitím Bucketizeru na nominálny.
Úloha 8.3¶
Naštudujte si dokumentáciu k StandardScaler. StandardScaler sa používa pre normalizáciu hodnôt zvoleného numerického atribútu. Vyberte si ľubovoľný numerický atribút z datasetu z cvičenia a normalizujte ho pomocou StandardScaleru.
Úloha 8.4¶
Naštudujte si dokumentáciu k OneHotEncoder a StringIndexer. Obe sa používajú na indexovanie kategorických (nominálnych) atribútu, tzn. na ich transformáciu na číselné hodnoty. Okrem potreby transformácie niektorých atribútov je indexovanie potrebné pre trénovanie modelov strojového učenia (ktoré musia pracovať s numerickými vektormi). Použite obe metódy na vybraný kategorický atribút z datasetu z cvičnia. Porovnajte výsledky indexovania zvoleného atribútu oboma spôsobmi.