Cvičenie 9 - Učenie modelov pomocou knižnice MLlib¶
Cieľom cvičenia je naučiť sa pracovať s knižnicou MLlib pre vytváranie a validovanie modelov strojového učenia.
MLlib je knižnica distribuovaných algoritmov, ktorá podporuje učenie a validovanie prediktívnych a popisných modelov na rozsiahlych dátach. MLLib poskytuje algoritmy pre učenie klasifikačných a predikčných modelov, algoritmy zhlukovania, generovania asociačných pravidiel a frekventovaných n-tíc a modely kolaboratívneho filtrovania pre odporúčacie systémy. Okrem algoritmov učenia knižnica poskytuje aj funkcie a objekty pre validovanie modelov a pre export modelov v jazyku PMML (Predictive Model Markup Language).
!pip install findspark
!pip install pyspark
# vytvorenie spark aplikacie
import findspark
findspark.init()
import pyspark
# do skriptu si naimportujeme typ SparkSession z modulu ‘pyspark.sql‘
from pyspark.sql import SparkSession
# vytvoríme objekt ‘spark‘ a ako parameter ‘appName‘ nastavíme názov aplikácie (v distribuovanom prostredí môže naraz bežať
# viacero aplikácií, ktoré je potrebné pomenovať aby sme ich vedeli rozlíšiť)
spark = SparkSession.builder.appName("mllib_example").getOrCreate()
sc = spark.sparkContext
# ďalej už môžeme používať objekt rozhrania ‘spark‘ na vytvorenie a spracovanie dátových rámcov
V prvej ukážkovej úlohe budeme pracovať s datasetom Iris. Iris je jednoduchá dátová množina pre klasifikáciu do troch tried, príklady popisujú tri druhy rastlín (kosatcov) podľa rozmerov ich kvetov (štyry vstupné číselné atribúty).
# naimportujeme si potrebné typy
from pyspark.sql import Row
import urllib
# stiahneme si dáta z internetu a uložíme ich do pracovného adresára
urllib.request.urlretrieve("http://people.tuke.sk/martin.sarnovsky/tsvd/files/iris.csv", "iris.csv")
# načítame dáta a premapujeme ich na objekty typu ‘Row’
raw_data = sc.textFile("iris.csv")
csv_data = raw_data.map(lambda x: x.split(","))
Pomocou operácie take() sa môžeme pozrieť na niekoľko prvých záznamov v datasete.
csv_data.take(5)
V nasledujúcom príklade naučíme lineárny model SVM (Support Vector Machine). Tento model je možné použiť iba na klasifikáciu do dvoch tried, pre Iris dataset naučíme klasifikátor, ktorý odlíši druh iris-versicolor od ostatných druhov.
Najprv si teda budeme musieť transformovať cieľový atribút na binárny.
# ako triedu 1 označíme príklady druhu iris-versicolor a ako triedu 0 označíme všetky ostatné príklady
csv_data = csv_data.map(lambda line: [line[0], line[1], line[2], line[3],
1.0 if line[4] == "iris-versicolor" else 0.0])
Pomocou operácie take() sa môžeme opäť pozrieť na to, ako vyzerá niekoľko príkladov z transformovaného RDD.
csv_data.take(5)
Z načítaného RDD potom vytvoríme pomocou operácie createDataFrame() dátový rámec. Pri konverzii z RDD do dátového rámca môžeme v metóde map() špecifikovať lambda výraz, kde si jednotlivé atribúty pomenujeme a natypujeme.
df_data = csv_data.map(lambda line: Row(
petal_length = float(line[0]),
petal_width = float(line[1]),
sepal_length = float(line[2]),
sepal_width = float(line[3]),
label = line[4]))
df = spark.createDataFrame(df_data)
Na dátový rámec df potom už môžeme aplikovať všetky operácie pre Spark dátové rámce. Pomocou head() môžeme vypísať niekoľko prvých záznamov.
df.head(5)
Jednoducho sa môžeme pozrieť napr. na distribúciu hodnôt cieľového atribútu.
df.groupBy('label').count().show()
Pre učenie modelov je potrebné dáta transformovať do vektorov. Je potrebné spojiť všetky vstupné (predikujúce) atribúty do jedného číselného vektora pomocou objektu ‘VectorAssembler’. V dátovom rámci vytvorí stĺpec features, ktorý bude predstavovať vektor hodnôt predikujúcich atribútov.
# importujeme Vector Assembler
from pyspark.ml.feature import VectorAssembler
# nasledujci príkaz spojí všetky vstupné atribúty do číselného vektora, ktorý uloží do nového stĺpca ‘features’
vector_data = VectorAssembler(inputCols=["petal_length", "petal_width", "sepal_length", "sepal_width"],
outputCol="features").transform(df)
vector_data.head()
Teraz, keď máme dáta vo vektorovej podobe, tak môžeme vstupný dataset rozdeliť na trénovaciu a testovaciu množinu. Rozdelenie spravíme pomocou operácie randomSplit(), ktorej ako parameter špecifikujeme pomer rozdelenia.
# dáta rozdelíme na trénovaciu (70%) a testovaciu (30%) množinu náhodným výberom
training_data, test_data = vector_data.randomSplit([0.7, 0.3], seed=123)
V 'training_data' a 'testing_data' máme uložené trénovacie a testovacie množiny. Teraz môžeme pokračovať s trénovaním klasifikačného modelu. V nasledujúcej ukážke natrénujeme lineárny Support Vector Classifier. Trénovanie modelov je podobné ako napr. pri knižnici scikit-learn. Najprv vytvoríme objekt daného modelu, kde špecifikujeme parametre modelu (vstupné dáta, cieľový atribút, hyperparametre modelu). Model potom aplikujeme na trénovacie dáta pomocou operácie fit(). Natrénovaný model môžeme použiť na klasifikáciu testovacích dát pomocou operácie transform().
# importujeme potrebné knižnice
from pyspark.ml.classification import LinearSVC
# najprv vytvoríme objekt ‘LinearSVC’ a nastavíme parametre algoritmu
svm_classifier = LinearSVC(
featuresCol="features", # parameter features - dátový stĺpec obsahujúci vektor vstupných atribútov
labelCol="label") # parameter labelCol - dátový stĺpec obsahujúci cieľový atribút (indexy tried)
# model naučíme funkciou ‘fit’, ktorej predáme trénovacie dáta
svm_model = svm_classifier.fit(training_data)
# presnosť klasifikácie vyhodnotíme na testovacích dátach pomocou funkcie ‘transform’
predictions = svm_model.transform(test_data)
Pozrite sa na dátový rámec predictions. Všimnite si, že po klasifikácii testovacej množiny sa do tohoto dátového rámca zapísali pôvodné trénovacie dáta a pribudli nové stĺpce, ktoré obsahujú predikovanú triedu a predikované pravdepodobnosti. Predikovaná trieda je uložená v stĺpci ‘prediction’. Môžeme potom spočítať počet nesprávne klasifikovaných príkladov, tak, že porovnáme, kde sa ‘prediction’ nerovná cieľovému atribútu ‘label’.
predictions.head()
test_error = predictions.filter(predictions["prediction"] != predictions["label"]).count() / float(test_data.count())
print("Testing error: {0:.4f}".format(test_error))
Viac informácií o algoritmoch a ich nastaveniach je možné nájsť na stránke - https://spark.apache.org/docs/latest/api/python/reference/pyspark.mllib.html (v angličtine).
V ďalšom príklade aplikujeme klasifikačné modely na dátach KDD Cup z predchádzajúceho cvičenia.
Ak dáta nemáte v pracovnom adresári z minulých cvičení, odkomentujte si riadok pre ich stiahnutie.
# naimportujeme potrebné moduly
from pyspark.sql import Row
import urllib
# stiahneme dáta a načítame ich ako zoznamy reťazcov
# urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "kddcup.data_10_percent.gz")
raw_data = sc.textFile("./kddcup.data_10_percent.gz")
csv_data = raw_data.map(lambda x: x.split(","))
Vytvoríme dátový rámec zo zvolených atribútov - pre prehľadnosť v rámci tejto úlohy si ponecháme len zvolených 5 predikujúcich atribútov. Vytvoríme z nich dátový rámec pomocou operácie createDataFrame().
df_data = csv_data.map(lambda line: Row(
duration= float(line[0]),
protocol_type = line[1],
src_bytes = float(line[4]),
dst_bytes = float(line[5]),
land = float(line[6]),
attack_type = line[41]))
df = spark.createDataFrame(df_data)
# pre kontrolu zobrazíme prvý záznam
df.first()
Ako môžeme vidieť, dátový rámec obsahuje okrem numerických, aj kategorické premenné. Tie je nutné pred samotnou vektorizáciou dát vhodným spôsobom transformovať. Pozrime sa na ich hodnoty.
df.groupBy('protocol_type').count().show()
df.groupBy('attack_type').count().show(40)
Pre transformáciu numerických atribútov na kategorické môžeme použiť niekoľko funkcií z MLlib (vyskúšať ste si ich mali v rámci úloh z predoľlých cvičení). Pre atribúty, kde neexistuje usporiadanie je vhodné použiť OneHotEncoder, ktorý kategorický atribút transformuje na množinu binárnych atribútov, ktoré zodpovedajú jeho hodnotám, alebo StringIndexer, ktorý realizuje jednoduché indexovanie kategorických premenných tak, že jednotlivým hodnotám priradí číselný index. Pozor je potrabné dať pri atribútoch, kde takáto transformácia môže zaviesť nechcené usporiadanie.
from pyspark.ml.feature import StringIndexer
# najprv vytvoríme index hodnôt volaním funkcie ‘fit’
attack_type_index = StringIndexer(inputCol="attack_type", outputCol="attack_type_index").fit(df)
# po aplikovaní transformácie sa do dátového rámca pridá nový číselný atribút ‘attack_type_index’
df = attack_type_index.transform(df)
df.head()
# zoznam nominálnych hodnôt usporiadaný podľa priradených indexov je možné získať z objektu indexu cez atribút ‘labels’
# napr. nasledujúci príkaz vypíše počet tried, t.j. počet hodnôt cieľového atribútu ‘attack_type’
print("Number of classes: {0}".format(len(attack_type_index.labels)))
Pri OneHotEncoderi ale musíme najprv atribút previesť StringIndexerom na numerický a potom pomocou OneHotEncodera zakódovať.
from pyspark.ml.feature import OneHotEncoder
protocol_type_index = StringIndexer(inputCol="protocol_type", outputCol="protocol_type_index").fit(df)
df = protocol_type_index.transform(df)
encoder = OneHotEncoder(inputCol="protocol_type_index", outputCol="protocol_encoded").fit(df)
df = encoder.transform(df)
Skontrolujeme výsledok operácie - uvidíme, že v dátovom rámci je okrem protocol_type_index (čo je len číselne zakódovaný atribút protocol_index), aj protocol_encoded, ktorý je už binárne zakódovaný atribút, vo forme riedkeho vektora.
df.head()
# odstránime pôvodné nominálne hodnoty (tzn. dátový rámec bude ďalej obsahovať iba číselné stĺpce)
# df = df.drop("protocol_type")
# df = df.drop("attack_type")
# df.show()
Transformované dáta opäť prevedieme do vektorovej podoby pomocou Vector Assembleru. Ako parametre mu musíme správne špecifikovať vstupné atribúty - tzn. numerické a transformované.
from pyspark.ml.feature import VectorAssembler
# nasledujci príkaz spojí všetky vstupné atribúty do číselného vektora, ktorý uloží do nového stĺpca ‘features’
vector_data = VectorAssembler(inputCols=["dst_bytes", "duration", "land", "src_bytes", "protocol_encoded"],
outputCol="features").transform(df)
vector_data.head()
Teraz rozdelíme dáta na na trénovaciu (80%) a testovaciu (20%) množinu náhodným výberom.
training_data, testing_data = vector_data.randomSplit([0.8, 0.2], seed=1234)
A teraz naučíme klasifikačný model - v tomto prípade rozhodovací strom.
Rovnako ako v predošlom príklade nastavíme predikujúce atribúty a cieľový atribút. Teraz vyskúšame nastaviť aj hyperparametre modelu - max. hĺbku a kritérium pre delenie.
from pyspark.ml.classification import DecisionTreeClassifier, DecisionTreeClassificationModel
tree_classifier = DecisionTreeClassifier(
featuresCol="features", # dátový stĺpec obsahujúci vektor vstupných atribútov
labelCol="attack_type_index", # dátový stĺpec obsahujúci cieľový atribút (indexy tried)
impurity="entropy", # pre výber atribútov pri delení sa použije kritérium informačného zisku
maxDepth=5) # ohraničíme maximálnu hĺbku generovaného stromu
# klasifikačný model vytvoríme volaním funkcie ‘fit’ na trénovacích dátach
tree_model = tree_classifier.fit(training_data)
# vytvorený model si môžeme uložiť do súboru pomocou funkcie ‘save’
tree_model.save("decision_tree_1.model")
# uložený model môžete spätne načítať zo súboru funkciou ‘DecisionTreeClassificationModel.load’
tree_model = DecisionTreeClassificationModel.load("decision_tree_1.model")
Natrénovaný model môžeme vizualizovať - resp. extrahovať z neho klasifikačné pravidlá, ktoré môžeme vypísať na obrazovku. Použijeme na to funkciu toDebugString().
print(tree_model.toDebugString)
Ak chceme model aj vyhodnotiť, potrebujeme ho evaluovať na testovacej množine. To urobíme rovnako ako v predošlom príklade pomocou transform() na testovacích dátach, odkiaľ získame predikcie.
Tie potom môžeme použiť na vypočítanie chyby (alebo iných metrík).
# presnosť klasifikácie vyhodnotíme na testovacích dátach pomocou funkcie ‘transform’
# po klasifikácii sa do dátového rámca pridajú nové stĺpce, ktoré obsahujú predikovanú triedu a pravdepodobnosti
predictions = tree_model.transform(testing_data)
# predikovaná trieda je uložená v stĺpci ‘prediction’, spočítame počet chybne klasifikovaných príkladov
# pri ktorých sa ‘prediction’ nerovná cieľovému atribútu ‘attack_type_index’
test_error = predictions.filter(predictions["prediction"] != predictions["attack_type_index"]).count() / float(testing_data.count())
print("Testing error: {0:.4f}".format(test_error))
Úloha 9.1¶
Transformujte si úlohu KDD Cup na binárnu klasifikáciu. Ako triedu 0.0 označte normálnu komunikáciu a ako triedu 1.0 všetky útoky. Na transformovaných dátach naučte model logistickej regresie a vyhodnoďte jeho presnosť.
Úloha 9.2¶
Naštudujte si dokumentáciu k objektu OneVsRest a vytvorte LR (Logistic Regression) klasifikátor do viacerých tried, ktorý otestujte na dátach KDD Cup.
Úloha 9.3¶
Naučte model rozhodovacieho stromu na dátach Iris. Pomocou operácie nad dátovým rámcom predictions.stat.crosstab("prediction", "label").collect() vypočítajte kontingenčnú tabuľku medzi predikovanou a skutočnou triedou. Z kontingenčnej tabuľky vypočítajte presnosť pre každú triedu.