Zur Blog-Serie: https://www.shi-gmbh.com/herausforderung-customer-centricity/
Author: Dr. Eduardo Torres Schumann, SHI Gmbh
Die Google Analytics-Daten werden von der Festplatte als DataFrame eingelesen, wir holen uns zunächst die Visits für einen Monat:
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sc = SparkSession.builder.appName("TestGA").getOrCreate()
ga_data = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_201707*.json.gz')
Die Information, ob es sich bei dem Visit um ein Bounce handelt, ist in der Spalte 'totals' kodiert. Wir behalten die entsprechende Unterspalte 'bounces' und behalten zusätzlich die Spalten, die uns als Feature dienen werden:
Damit decken wir unterschiedliche Aspekte eines Besuchs ab. Eine erste Datenexploration war der Auswahl der Features schon vorausgegangen.
Wir wissen daher, dass die Features unabhägig voneinander sind, was wichtig beim Trainieren des Models ist.
data = ga_data.select('fullVisitorId', 'visitId', 'device.browser', 'device.deviceCategory' ,'channelGrouping', 'geoNetwork.subcontinent', 'hit.page.pagePath', expr("totals.bounces").cast("integer").alias("bounces"), explode('hits').alias('hit')).where('hit.time ==0 ').fillna(0)
data = data.drop('fullVisitorId').drop('visitId').drop('hit')
data.show(5)
data.printSchema()
Im ausgewählten Datenset sind Bounces ('bounces' == 1) sowie "echte" Besuche ('bounces' == 0) vergelichbar oft repräsentiert:
data.groupBy('bounces').count().show()
Das stimmt damit überein, was in der ersten Datenexploration beobachtet wurde. Die Anwendung von Techniken wie stratified Sampling ist daher nicht notwendig, um ein ausgewogenes Datenset bzgl. der Ausprägungen der Zielklasse für das Training zu bekommen.
Die ausgewählten Features stellen im Prinzip Kategorien dar, wir haben keine numerischen Wertebereiche. Wir gucken uns die verschiedenen Ausprägungen je nach Feature genauer an: Deren Anzahl sollte im Vergleich zur Anzhal der Datensätze klein sein, Ausprägungen sollten zudem nicht nur mit einzelnen Datensätzen dünnbesetzt sein.
Die Implementierung von Entscheidungsbaum Algorithmus, die wir verwendent werden, steuert mit dem Parameter "maxBins", wie groß ein kategorisches Feature sein darf. Der Defaultwert ist 32, an dem wir uns orientieren, um unsere Kategorien zu bilden.
Bei 'browser' haben wir 29 Asuprägungen, die z.T. sehr dünn besiedelt sind.
Wir behalten die Browser, die eine gewisse Häufigkeit überschreiten, und fügen sie mittels "join" in den Daten ein. Alle anderen seltenen Browser werden als OTHER kodiert.
browser = data.select("browser").groupBy("browser").count().orderBy("browser")
browser.show()
selectedBrowser = browser.select(expr("browser").alias("normBrowser")).where(expr("count > 150"))
selectedBrowser.show()
data = data.join( selectedBrowser, data.browser == selectedBrowser.normBrowser,how='left').fillna({'normBrowser':'OTHER'})
data.printSchema()
Das Feature 'deviceCategory' eignet sich bereits gut als Kategorie:
data.select("deviceCategory").groupBy("deviceCategory").count().show()
'channelGrouping' ebenso:
data.select("channelGrouping").groupBy("channelGrouping").count().show()
Indem statt bspw. 'country' wir die geographischen Information mittels 'subcontinent' kodieren, erhalten wir ein Kategorienfeature einer verwendbaren Größe:
data.agg(countDistinct("subcontinent")).show()
Die Pfade, die Landing Page abbilden, sind sehr variabel:
data.agg(countDistinct("pagePath")).show()
data.select("pagePath").where( expr("pagePath") != "/home").show(20, truncate=False)
Um diese Vielfalt zu verkleinern, extrahieren die Produktkategorie nach dem zweiten Schrägstrich im Pfad und fassen die Landing Page daruch zusammen. Hierfür wenden wir die Funktionen 'substring_index' und 'regexp_replace' auf die Spalte 'pagePath' hintereinander an und fügen das Ergebnis als weitere Spalte 'category' an. Weitere Reguläre Ausdrücke normalisieren Sonderzeichen und Endungen.
data = data.withColumn("category", regexp_replace(substring_index(regexp_replace(regexp_replace('pagePath',r'(^/)|([-+\)])|(\.html)|([&?\.].*$)', ''),r'google ?redesign/', ''), '/', 1),r'((?<=accessorie|office|apparel|home).*)|s$' ,''))
data.filter(expr("pagePath") != "/home").show()
Damit können wir die Landing Page als 'category' als kategorial Feature nutzen:
data.select("category").groupBy("category").count().show(31)
Der Spark ML Package ist die neuere Spark Bibliothek für Maschinelles Lernen, die nicht mehr nur auf RRDs aufbaut, sondern auch mit DataFrames umgehen kann. Damit kann ein Pipeline aufgebaut werden, um die Daten in einem DataFrame in das richtige Format für den Algorithmus des Maschinellen Lernens zu transformieren, das mit dem Algorithums zu trainieren bzw. anzuwenden und die Ergebnisse zu evaluieren.
Zielformat für Anwendung vom Algorithmus: Kategorialdaten als Zahlen kodieren.
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
channelIndexer = StringIndexer(inputCol="channelGrouping", outputCol="channelIdx", handleInvalid='keep')
browserIndexer = StringIndexer(inputCol="normBrowser", outputCol="browserIdx", handleInvalid='keep')
deviceIndexer = StringIndexer(inputCol="deviceCategory", outputCol="deviceCategoryIdx", handleInvalid='keep')
geoIndexer = StringIndexer(inputCol="subcontinent", outputCol="subcontinentIdx", handleInvalid='keep')
categoryIndexer = StringIndexer(inputCol="category", outputCol="categoryIdx", handleInvalid='keep')
assembler = VectorAssembler(inputCols=["channelIdx", "browserIdx", "deviceCategoryIdx", "subcontinentIdx",
"categoryIdx"], outputCol="features")
dtree = DecisionTreeClassifier( labelCol="bounces", maxBins=40, maxDepth=5)
pipeline = Pipeline(stages=[channelIndexer, browserIndexer, deviceIndexer, geoIndexer, categoryIndexer, assembler, dtree])
split data and reserve for final test. run using cross validation.
develop, validation = data.randomSplit([0.8, 0.2], seed=12345)
develop.show()
model = pipeline.fit(develop)
treeModel = model.stages[6]
treeModel.toDebugString
predictions = model.transform(validation)
predictions.select('bounces','prediction').groupBy('bounces','prediction').count().show()
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='bounces')
evalF1 = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='bounces', metricName='f1')
evalRecall = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='bounces', metricName='weightedRecall' )
evalPrecision = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='bounces', metricName='weightedPrecision')
evalAccuracy = MulticlassClassificationEvaluator(predictionCol="prediction",labelCol='bounces', metricName='accuracy' )
accuracy = evaluator.evaluate(predictions)
accuracy
precision = evalPrecision.evaluate(predictions)
precision
recall = evalRecall.evaluate(predictions)
recall
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
paramGrid = ParamGridBuilder().addGrid(dtree.maxDepth, [3,4,5]).build()
crossval = CrossValidator(estimator=pipeline,
estimatorParamMaps=paramGrid,
evaluator=evaluator,
numFolds=3)
cvModel = crossval.fit(develop)
bestTreeModel = cvModel.bestModel.stages[6]
bestTreeModel
bestTreeModel.toDebugString
''' DecisionTreeClassificationModel (uid=DecisionTreeClassifier_4a9c9d361cc90ae20eaf) of depth 3 with 15 nodes If ('subcontinent' is not 'Northern America') If ('category' in {apparel,bag,electronic,signin,accessorie,drinkware,office,basket,asearch,lifestyle,store,storepolicie,giftcard,nest,brand,wearable,shop,storeitem,new2015logo,fun,eco,payment,register,revieworder,home2,madeinusa}) If ('deviceCategory' in {mobile,tablet}) Predict: 1.0 Else ('deviceCategory' is 'desktop') Predict: 0.0 Else ('category' in {home,shopbybrand,myaccount,registersucces,yourinfo}) If ('channel' in {Referral,Paid Search,Affiliates,Display}) Predict: 1.0 Else ('channel' in {Organic Search,Direct,Social,(Other)}) Predict: 1.0 Else ('subcontinent' is 'Northern America') If ('category' other than {home,fun,madeinusa}) If ('channel' in {Referral,Paid Search,Affiliates,Display}) Predict: 0.0 Else ('channel' in {Organic Search,Direct,Social,(Other)}) Predict: 1.0 Else ('category' in {home,fun,madeinusa}) If ('channel' is not 'Direct') Predict: 0.0 Else ('channel' is 'Direct') Predict: 1.0 '''
cvModel.avgMetrics
validationPreds = cvModel.transform(validation)
confusionMatrix = validationPreds.select('bounces','prediction').groupBy('bounces','prediction').count()
confusionMatrix.show()
valAccuracy = evaluator.evaluate(validationPreds)
valAccuracy
valPrecision = evalPrecision.evaluate(validationPreds)
valPrecision
debugPipeline = Pipeline(stages=[channelIndexer, browserIndexer, deviceIndexer, geoIndexer, categoryIndexer])
transformed = debugPipeline.fit(develop)
trdf = transformed.transform(develop)
# 0 channelIndexer = StringIndexer(inputCol="channelGrouping", outputCol="channelIdx", handleInvalid='keep')
# 1 browserIndexer = StringIndexer(inputCol="normBrowser", outputCol="browserIdx", handleInvalid='keep')
# 2 deviceIndexer = StringIndexer(inputCol="deviceCategory", outputCol="deviceCategoryIdx", handleInvalid='keep')
# 3 geoIndexer = StringIndexer(inputCol="subcontinent", outputCol="subcontinentIdx", handleInvalid='keep')
# 4 categoryIndexer = StringIndexer(inputCol="category", outputCol="categoryIdx", handleInvalid='keep')
# Feature 0
trdf.select('channelGrouping','channelIdx').distinct().orderBy('channelIdx').show(50)
# Feature 1
trdf.select('normBrowser','browserIdx').distinct().orderBy('browserIdx').show(50)
# Feature 2
trdf.select('deviceCategory','deviceCategoryIdx').distinct().orderBy('deviceCategoryIdx').show(50)
# Feature 3
trdf.select('subcontinent','subcontinentIdx').distinct().orderBy('subcontinentIdx').show(50)
# Feature 4
trdf.select('category','categoryIdx').distinct().orderBy('categoryIdx').show(50)