Zur Blog-Serie https://www.shi-gmbh.com/herausforderung-customer-centricity/
Damit jupyter den lokal installierten Spark mit Python nutzen kann, benutzen wir das Modul findspark. Ausserdem werden die pySpark-Module importiert, die für die Analyse benötigt werden.
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
Wir starten die Spark Applikation indem wir eine SparkSession erzeugen
sc = SparkSession.builder.appName("TestGA").getOrCreate()
Wir werden die Daten in einem sog. DataFrame reinladen, der im wesentlichen einer Tabelle mit benannten Spalten entspricht und SQL-ähnliche Abfragen der Daten emöglicht. DataFrames lassen sich für parallele Berechnungen ähnlich wie mit der grundlegenden Datenstruktur in Spark, sog. RDDs, auf einem Cluster gut verteilen, bieten aber eben den Vorteil einer Schnittstelle, die jedem, der mit einer klassischen Datenbank gearbeitet hat, bekannt vorkommen dürfte. Dank eines Query-Optimizer im Hintergrund, Catalyst, lassen sich Dataframes sehr effizient abfragen und transformieren.
Die mit gzip kompirmierten JSON-Daten lassen sich direkt einlesen, mit dem Stern-Operator im Pfad lassen sich mehrere Dateien in den selben Dataframe reinladen
dataF = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_2016*.json.gz')
dataF.count(), len(dataF.columns)
Wir haben grob die Hälfte der Daten reingeladen, der gesamte Datensatz hat (903653, 13) Zeilen / Spalten. Jede Zeile stellt eine Sitzung dar, in der ein Benutzer den Store besucht. Wir können uns das Schema anzeigen lassen, um mehr über die Spalten zu erfahren.
dataF.printSchema()
Als erstes fällt auf, dass die meisten der dreizehn verschiedenen Spalten, die Eigenschaften des Besuchs erfassen, ihrerseits eingebettete Strukturen mit einem eigenen Unterschema haben. Außerdem werden alle Werte als String eingelesen.
Einfache Spalten mit einzelnen Werten lassen sich leicht von Namen her interpretieren: fullVisitorId, date, visitId, visitStartTime, visitNumber
Die meisten Spalten haben jedoch Unterspalten (Typ struct), um verwandte Merkmale zusammenzufassen, so z.B. die Spalte device mit Unterspalten für Merkmale des für den Besuch verwendenten Geräts. Verschiedene geographischen Informationen werden unter geoNetwork gesammelt, Details zur Herkunft des Besuchs unter trafficSource.
Die Folge der im Prinzip beliebig vielen Interaktionen innerhalb eines Besuchs werden in der Spalte hits gesammelt, die dafür von Typ array ist. Jedes Element in dieser Liste von hits ist wiederum ein Objekt mit einem festen, umfangreichen Schema, um damit zusammenhängende Informationen zu erfassen, etwa die besuchte Seite (page), vordefinierte Content-Gruppen (content), angezeigte Produkte (product), getätigte Transaktionen bzw. Käufe (transaction), usw.
Die Spalte totals fasst verschiedene Statistiken über die hits des Besuchs zusammen: Anzahl, die Gesamteinnahmen aller Transaktionen (totalTransactionRevenue), Anzahl der besuchten Seiten, usw.
Spark bietet verschiedene Funktionen, um den Datentyp einer Spalte anzupassen und mit den verschiedenen Arten von Verschachtelungen in einem DataFrame umzugehen. Damit können Abfragen ausgewertet, ohne dass dafür die Daten vorher denormalisiert werden müssen, wie wir bei der weiteren Erkundung sehen werden.
Spark stellt eine funktionellen Sprache zu Verfügung, die sich an SQL orientiert, um ein DataFrame abzufragen. Histogramme bzw. Frequenzen für die Werte in einer Spalte lassen sich damit analog zu SQL berechnen.
dataF.groupBy('channelGrouping').count().orderBy("count").show()
Eingebaute Funktionen wie describe() berechnen grundlegende Verteilungs- / Streuungsparameter. Im Beispiel sieht man wie man auf die Unterspalten von totals zugreifen kann.
dataF.select('totals.hits', 'totals.timeOnSite', 'totals.transactions').describe().show()
Zeilen können mit show ausgegeben werden.
dataF.select('trafficSource').show(5, truncate=False)
dataF.show(10)
Auf die Elemente aus dem Array aus der Spalte Hits kann über einen Index zugegriffen werden.
dataF.select(expr('hits[1]')).show(3,truncate=False)
dataF.groupBy("device.browser").count().orderBy(desc("count")).show()
dataF.agg(countDistinct("device.browser")).show()
... es gibt 42 browser...
dataF.groupBy("device.deviceCategory").count().orderBy(desc("count")).show()
dataF.groupBy("device.operatingSystem").count().orderBy(desc("count")).show()
Die Spalten eines DataFrame werden als String eingelesen. Um das Durschnittsrevenue zu berechnen bei der nächsten Queries casten wir die Spalte zu einem double und ersetzen null entsprechend
dataF.select("device.browser",expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("browser").avg("revenue").orderBy(desc("avg(revenue)")).show()
DataFrames können mit Hilfe der Funktion toPandas() zu einem Pandas-Objekt konvertiert werden, das als Input für eine der weitverbreiteten python Graphik-Bibliotheken wie matplolib oder plotly dient. Aber Vorsicht: mit Spark kann man prinzipiell sehr große Datenmengen verarbeiten, dieser Weg der Visualisierung eignet sich jedoch nur für kleinere Datenmengen, da ansonsten sehr große Pandas-Objekte erzeugt werden, die in Spark nicht parallelisiert verarbeitet werden können und einen Flaschenhals in der Berechnung darstellen.
import plotly.plotly as py
import plotly.graph_objs as go
import pandas as pd
import requests
from plotly.offline import iplot, init_notebook_mode
from plotly import tools
init_notebook_mode()
devCat = dataF.groupBy("device.deviceCategory").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Besuche nach Geräteart') , data= [go.Pie(labels=devCat["deviceCategory"], values=devCat["count"])]))
avgRev = dataF.select("device.browser",expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("browser").agg(avg("revenue").alias("avg_revenue")).where(expr("avg_revenue > 0")).orderBy(desc("avg_revenue")).toPandas()
iplot(go.Figure(layout=dict(title='Durchschnittl. Einnahmen nach Browser') , data= [go.Bar( y = avgRev["avg_revenue"], x = avgRev["browser"])]))