Zur Blog-Serie https://www.shi-gmbh.com/herausforderung-customer-centricity/
Damit jupyter den lokal installierten Spark mit Python nutzen kann, benutzen wir das Modul findspark. Ausserdem werden die pySpark-Module importiert, die für die Analyse benötigt werden.
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
Wir starten die Spark Applikation indem wir eine SparkSession erzeugen
sc = SparkSession.builder.appName("TestGA").getOrCreate()
Wir werden die Daten in einem sog. DataFrame reinladen, der im wesentlichen einer Tabelle mit benannten Spalten entspricht und SQL-ähnliche Abfragen der Daten emöglicht. DataFrames lassen sich für parallele Berechnungen ähnlich wie mit der grundlegenden Datenstruktur in Spark, sog. RDDs, auf einem Cluster gut verteilen, bieten aber eben den Vorteil einer Schnittstelle, die jedem, der mit einer klassischen Datenbank gearbeitet hat, bekannt vorkommen dürfte. Dank eines Query-Optimizer im Hintergrund, Catalyst, lassen sich Dataframes sehr effizient abfragen und transformieren.
Die mit gzip kompirmierten JSON-Daten lassen sich direkt einlesen, mit dem Stern-Operator im Pfad lassen sich mehrere Dateien in den selben Dataframe reinladen
dataF = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_2016*.json.gz')
dataF.count(), len(dataF.columns)
Wir haben grob die Hälfte der Daten reingeladen, der gesamte Datensatz hat (903653, 13) Zeilen / Spalten. Jede Zeile stellt eine Sitzung dar, in der ein Benutzer den Store besucht. Wir können uns das Schema anzeigen lassen, um mehr über die Spalten zu erfahren.
dataF.printSchema()
Als erstes fällt auf, dass die meisten der dreizehn verschiedenen Spalten, die Eigenschaften des Besuchs erfassen, ihrerseits eingebettete Strukturen mit einem eigenen Unterschema haben. Außerdem werden alle Werte als String eingelesen.
Einfache Spalten mit einzelnen Werten lassen sich leicht von Namen her interpretieren: fullVisitorId, date, visitId, visitStartTime, visitNumber
Die meisten Spalten haben jedoch Unterspalten (Typ struct), um verwandte Merkmale zusammenzufassen, so z.B. die Spalte device mit Unterspalten für Merkmale des für den Besuch verwendenten Geräts. Verschiedene geographischen Informationen werden unter geoNetwork gesammelt, Details zur Herkunft des Besuchs unter trafficSource.
Die Folge der im Prinzip beliebig vielen Interaktionen innerhalb eines Besuchs werden in der Spalte hits gesammelt, die dafür von Typ array ist. Jedes Element in dieser Liste von hits ist wiederum ein Objekt mit einem festen, umfangreichen Schema, um damit zusammenhängende Informationen zu erfassen, etwa die besuchte Seite (page), vordefinierte Content-Gruppen (content), angezeigte Produkte (product), getätigte Transaktionen bzw. Käufe (transaction), usw.
Die Spalte totals fasst verschiedene Statistiken über die hits des Besuchs zusammen: Anzahl, die Gesamteinnahmen aller Transaktionen (totalTransactionRevenue), Anzahl der besuchten Seiten, usw.
Spark bietet verschiedene Funktionen, um den Datentyp einer Spalte anzupassen und mit den verschiedenen Arten von Verschachtelungen in einem DataFrame umzugehen. Damit können Abfragen ausgewertet, ohne dass dafür die Daten vorher denormalisiert werden müssen, wie wir bei der weiteren Erkundung sehen werden.
Spark stellt eine funktionellen Sprache zu Verfügung, die sich an SQL orientiert, um ein DataFrame abzufragen. Histogramme bzw. Frequenzen für die Werte in einer Spalte lassen sich damit analog zu SQL berechnen.
dataF.groupBy('channelGrouping').count().orderBy("count").show()
Eingebaute Funktionen wie describe() berechnen grundlegende Verteilungs- / Streuungsparameter. Im Beispiel sieht man wie man auf die Unterspalten von totals zugreifen kann.
dataF.select('totals.hits', 'totals.timeOnSite', 'totals.transactions').describe().show()
Zeilen können mit show ausgegeben werden.
dataF.select('trafficSource').show(5, truncate=False)
dataF.show(10)
Auf die Elemente aus dem Array aus der Spalte Hits kann über einen Index zugegriffen werden.
dataF.select(expr('hits[1]')).show(3,truncate=False)
dataF.groupBy("device.browser").count().orderBy(desc("count")).show()
dataF.agg(countDistinct("device.browser")).show()
... es gibt 42 browser...
dataF.groupBy("device.deviceCategory").count().orderBy(desc("count")).show()
dataF.groupBy("device.operatingSystem").count().orderBy(desc("count")).show()
Die Spalten eines DataFrame werden als String eingelesen. Um das Durschnittsrevenue zu berechnen bei der nächsten Queries casten wir die Spalte zu einem double und ersetzen null entsprechend
dataF.select("device.browser",expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("browser").avg("revenue").orderBy(desc("avg(revenue)")).show()
DataFrames können mit Hilfe der Funktion toPandas() zu einem Pandas-Objekt konvertiert werden, das als Input für eine der weitverbreiteten python Graphik-Bibliotheken wie matplolib oder plotly dient. Aber Vorsicht: mit Spark kann man prinzipiell sehr große Datenmengen verarbeiten, dieser Weg der Visualisierung eignet sich jedoch nur für kleinere Datenmengen, da ansonsten sehr große Pandas-Objekte erzeugt werden, die in Spark nicht parallelisiert verarbeitet werden können und einen Flaschenhals in der Berechnung darstellen.
import plotly.plotly as py
import plotly.graph_objs as go
import pandas as pd
import requests
from plotly.offline import iplot, init_notebook_mode
from plotly import tools
init_notebook_mode()
devCat = dataF.groupBy("device.deviceCategory").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Besuche nach Geräteart') , data= [go.Pie(labels=devCat["deviceCategory"], values=devCat["count"])]))
avgRev = dataF.select("device.browser",expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("browser").agg(avg("revenue").alias("avg_revenue")).where(expr("avg_revenue > 0")).orderBy(desc("avg_revenue")).toPandas()
iplot(go.Figure(layout=dict(title='Durchschnittl. Einnahmen nach Browser') , data= [go.Bar( y = avgRev["avg_revenue"], x = avgRev["browser"])]))
dataF.groupBy("geoNetwork.continent").count().orderBy(desc("count")).show()
dataF.groupBy("geoNetwork.country").count().orderBy(desc("count")).show()
dataF.select("geoNetwork.continent",expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("continent").avg("revenue").orderBy(desc("avg(revenue)")).show()
dataF.select("geoNetwork.country",expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("country").avg("revenue").orderBy(desc("avg(revenue)")).show()
contDate = dataF.groupBy("geoNetwork.continent").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Besuche nach Kontinent') , data= [go.Pie(labels=contDate["continent"], values=contDate["count"])]))
Kampagne können über Google Analytics definiert werden. Die Quelle ist die Seite, aus der der Besuch des Stores kommt.
dataF.groupBy("trafficSource.campaign").count().orderBy(desc("count")).show()
dataF.groupBy("trafficSource.source").count().orderBy(desc("count")).show()
dataF.groupBy("channelGrouping").count().orderBy(desc("count")).show()
channel = dataF.groupBy("channelGrouping").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Besuche nach Kanälen') , data= [go.Pie(labels=channel["channelGrouping"], values=channel["count"])]))
VisitNumber ist ein Zähler, der die Besuche eines einzelnen Besuchers (einezlne fullVisitorId) hochzählt
dataF.describe("visitNumber").show()
Berechnung Frequenz Nice To Have Binned Histogram Freqenzbereiche (1, 2-5, 5-10, 10-50, 50 -100)
dataF.groupBy("visitNumber").count().orderBy(desc("count")).show()
dataF.select(dayofweek(to_date("date", "yyyyMMdd")).alias("wday")).groupBy("wday").count().orderBy("wday").show()
dataF.select(dayofweek(to_date("date", "yyyyMMdd")).alias("wday"), expr("totals.transactionRevenue").cast("double").alias("revenue")).fillna(0.0).groupBy("wday").sum("revenue").orderBy("wday").show()
revenue = dataF.select(dayofweek(to_date("date", "yyyyMMdd")).alias("wday"), expr("totals.transactionRevenue /100").cast("double").alias("revenue")).fillna(0.0).groupBy("wday").agg(avg("revenue").alias("avg_revenue")).orderBy("wday").toPandas()
visits = dataF.select(dayofweek(to_date("date", "yyyyMMdd")).alias("wday")).groupBy("wday").count().orderBy("wday").toPandas()
tr1= go.Bar( name ="Anzahl Besuche", y = visits["count"][::-1], x = visits["wday"][::-1], marker=dict(opacity=0.5, color="green"))
tr2 = go.Bar( name= "Durchschnitt. Einnahmen", y = revenue["avg_revenue"][::-1], x = revenue["wday"][::-1], marker=dict(opacity=0.5, color="blue"))
fig = tools.make_subplots(rows=1, cols=2, subplot_titles=["Anzahl Besuche ", "Durchschnitt. Einnahmen"], print_grid=False)
fig.append_trace(tr1, 1, 1)
fig.append_trace(tr2, 1, 2)
layout=dict(title='Anzahl der Besuche und Einnahmen nach Wochentag',showlegend=False)
fig['layout'].update(layout)
iplot(fig)
revenueM = dataF.select(to_date("date", "yyyyMMdd").alias("mday"), expr("totals.transactionRevenue /100").cast("double").alias("revenue")).fillna(0.0).groupBy("mday").agg(sum("revenue").alias("total_revenue")).orderBy("mday").toPandas()
visitsM = dataF.select(to_date("date", "yyyyMMdd").alias("mday")).fillna(0.0).groupBy("mday").count().orderBy("mday").toPandas()
tr1 = go.Scatter(mode="lines", x = visitsM["mday"].astype(str), y = visitsM["count"])
layout = go.Layout(title="Besuche nach Datum", height=400)
fig = go.Figure(data = [tr1], layout = layout)
iplot(fig)
tr2 = go.Scatter(mode="lines", x = revenueM["mday"].astype(str), y = revenueM["total_revenue"])
layout = go.Layout(title="Einnahmen nach Datum", height=400)
fig = go.Figure(data = [tr2], layout = layout)
iplot(fig)
dataF.select("fullvisitorId","totals").fillna(0.0).filter(expr("totals.transactionRevenue > 0")).distinct().count()
dataF.select("fullVisitorId").distinct().count()
Wir haben 4562 Besucher die Revenue generiert haben von den insg. 354810
Wir nutzen agg um zusätzliche Statistiken zu deren Besuch zu erhalten
dataF.select("fullvisitorId","totals").fillna(0.0).filter(expr("totals.transactionRevenue > 0")).groupBy("fullVisitorId").agg(sum("totals.bounces").alias("total_bounces"),
sum("totals.hits").alias("total_hits"), sum("totals.newVisits").alias("new_visit_count"), sum("totals.pageviews").alias("total_pageviews"), sum("totals.transactionRevenue").alias("total_revenue"), sum("totals.transactions").alias("transaction_count"), avg("totals.transactions").alias("transaction_avg")
).orderBy(desc("total_revenue")).show(5,truncate=False)
Wir betrachten hier die Verwendung der Seite und berechnen typische Metriken im Bereich Webanalytics.
Um die beliebtesten Seiten für den Start des Besuchs im Store zu berechnen müssen wir auf die Elemente des Array in der Spalte hits zugreifen.
Ein Hit ist eine Interaktion, die dazu führt, dass Daten an Google Analytics gesendet werden. Es gibt verschiedene Hits oder Treffer-Arten:
Pageview Hit (Seitenaufruf Treffer) Screenview Hit (Bildschirmaufruf Treffer) Event Hit (Ereignis Treffer) Social Interaction Hit (Soziale Interaktion Treffer) E-Commerce Hit (E-Commerce Treffer) User Timing Treffer (Nutzer Timing Treffer) Exception Hit (Ausnahme Treffer)
Da in jeder Zeile die Anzahl der Hits im Prinzip unterschiedlich lang ist, verwenden wir die Funktion expolode() um für jedes Element in Hits in einer nene Spalte eine neue Zeile in der Tabelle einzuführen, sodass dann mit den üblichen Aggregationsfunktionen Statistiken berechnet werden könenn.
dataF.select(explode("hits").alias("hit"), expr("hit.page.pagePath").alias("path")).where("hit.type == 'PAGE' AND hit.hitNumber == 1").groupBy("path").count().orderBy(desc("count")).show(truncate=False)
The real bounce rate is defined as the percentage of visits with a single pageview. What was the real bounce rate per traffic source?
also müssen wir die visits zählen wenn sie ein hit haben und ein bounce
dataF.select("trafficSource.source", expr("totals.bounces").cast("integer").alias("bounces"),expr("totals.pageviews").cast("integer").alias("pageviews"), expr("totals.hits").cast("integer").alias("hits") ).fillna(0).groupBy("source").agg(count("source").alias("total_visits"), count(when((expr("hits") == 1) & (expr("bounces") == 1) , True)).alias("total_bounces")).orderBy(desc("total_visits")).select("source","total_visits","total_bounces", expr("(total_bounces / total_visits) * 100 ").alias("rate")).show()
bounces = dataF.select("trafficSource.source", expr("totals.bounces").cast("integer").alias("bounces"),expr("totals.pageviews").cast("integer").alias("pageviews"), expr("totals.hits").cast("integer").alias("hits") ).fillna(0).groupBy("source").agg(count("source").alias("total_visits"), count(when((expr("hits") == 1) & (expr("bounces") == 1) , True)).alias("total_bounces")).select("source","total_visits","total_bounces", expr("(total_bounces / total_visits) * 100 ").alias("bounce_rate")).orderBy(desc("total_visits")).limit(10).toPandas()
tr1= go.Bar( name ="Anzahl Besuche", x = bounces["total_visits"][::-1], y = bounces["source"][::-1], orientation="h", marker=dict(opacity=0.5, color="green"))
tr2 = go.Bar( name= "Bounce Rate", x = bounces["bounce_rate"][::-1], y = bounces["source"][::-1], orientation="h", marker=dict(opacity=0.5, color="blue"))
fig = tools.make_subplots(rows=1, cols=2, subplot_titles=["Anzahl Besuche ", "Bounce Rate"], print_grid=False)
fig.append_trace(tr1, 1, 1)
fig.append_trace(tr2, 1, 2)
layout=dict(title='Top 10 Quellen nach Anzahl der Besuche',showlegend=False)
fig['layout'].update(layout)
iplot(fig)
Churn bezeichnet "Abwanderung". Uns interessiert wie lange die User dem Store treu bleiben. Dafür interessiert uns ihr verhalten über der Zeit.
dataF.select(month(to_date("date", "yyyyMMdd")).alias("wday")).groupBy("wday").count().orderBy("wday").show()
dataF.select(month(to_date("date", "yyyyMMdd")).alias("wday"), "fullVisitorId").groupBy("wday").agg(countDistinct("fullVisitorId")).orderBy("wday").show()
Für August und September bauen wir eine Tabelle auf mit den Besucher die was gekauft haben und ihr revenue.
firstMonthsVisitors = dataF.select("fullVisitorId", expr("totals.transactionRevenue").cast("float").alias("revenue"), month(to_date("date", "yyyyMMdd")).alias("month")).where(((expr("month") == 8) | (expr("month") == 9)) & (expr("revenue") > 0)).groupBy("fullVisitorId").agg(sum("revenue").alias("first_revenue"))
firstMonthsVisitors.show(3)
Analog bauen eine Tabelle für die zwei darauffolgenden Monate auf und tragen das Revenue aus den Vormonaten ein, falls der Besucher aus diesem Zeitraum bereits bekannt ist. Dafür machen wir einen left Join mit firstMonthsVisitors
nextMonths= dataF.select("fullVisitorId", expr("totals.transactionRevenue").cast("float").alias("revenue"), month(to_date("date", "yyyyMMdd")).alias("month")).where(((expr("month") == 10) | (expr("month") == 11)) & (expr("revenue") > 0)).groupBy("fullVisitorId").agg(sum("revenue").alias("current_revenue")).join(firstMonthsVisitors, "fullVisitorId", "left")
nextMonths.show()
Wie viele Besucher kennen wir aus der Vorperiode? Wieviele gibt es in der aktuellen?
nextMonths.filter(col("first_revenue").isNotNull()).count(), nextMonths.count()
Durschnitt der Einnahmen bereits bekannter Besucher aus der Vorperiode.
nextMonths.filter(col("first_revenue").isNotNull()).agg(avg("current_revenue")).show()
Durschnitt der Einnahmen neuer Besucher:
nextMonths.filter(col("first_revenue").isNull()).agg(avg("current_revenue")).show()
dataF.select("trafficSource.source","device.operatingSystem","device.browser","device.deviceCategory", "geoNetwork.country","totals.transactionRevenue").groupBy("source","operatingSystem","browser","deviceCategory","country").agg(sum("transactionRevenue").alias("revenue")).orderBy(desc("revenue")).show()