Zur Blog-Serie https://www.shi-gmbh.com/herausforderung-customer-centricity/
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import plotly.plotly as py
import plotly.graph_objs as go
import pandas as pd
import requests
from plotly.offline import iplot, init_notebook_mode
from plotly import tools
init_notebook_mode()
sc = SparkSession.builder.appName("TestGA").getOrCreate()
ga_data = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_*.json.gz')
transactions = ga_data.select('fullVisitorId', to_date("date", "yyyyMMdd").alias("pdate"), expr("totals.transactions").cast('integer')).fillna(0).filter("transactions > 0")
transactions.groupBy('fullVisitorId').count().agg(avg(col("count"))).show()
Hierfür nutzen wir eine Window-Funktion, um alle Besuche eines Nutzers zu betrachten und die Anzahl der Tage zu berechnen, die zwischen zwei aufeinanderfolgenden Käufe vergehen. Dafür setzen wir die Funktion lag ein.
timeintertrans = transactions.withColumn("time_intertrans",datediff(transactions.pdate, lag(transactions.pdate, 1).over(Window.partitionBy("fullVisitorId").orderBy("pdate")))).fillna(0).filter("time_intertrans > 0")
timeintertrans.show()
timeintertrans.agg(avg(col("time_intertrans"))).show()
Die Abwanderungsrate gibt an, wie viele Kunden am Ende eines Zeitintervalls, bspw. der Vertraugslaufzeit oder eines Zeifensters, verloren gehen. Da bei dem Shop keine Verträge mit fester Laufzeit abgeschlossen werden, betrachten wir Kalendermonate für die Berechnung der Abwanderungsrate
Mathematische Definition:
Churn(Zeitintervall) = 1 - ( Kunden(Ende) / Kunden(Beginn) )
Kunden(Beginn): Anzahl der Kunden, die im lezten Zeitintervall was gekauft haben
Kunden(Ende): Anzahl der Kunden, die zu Kunden(Beginn) zählten und im aktuellen Zeitintervall was gekauft haben (keine reine Neukunden)
buyersInPeriod = transactions.withColumn("month", month("pdate")).withColumn("year", year("pdate")).select("fullVisitorId", expr( "month + ((year - 2016)*12)").alias("period")).distinct().withColumn("previousPeriod", expr("period - 1"))
buyersInPeriod.show()
totBuyersPeriod = buyersInPeriod.groupBy("period").count().orderBy("period")
totBuyersPeriod.show()
current = buyersInPeriod.select("fullVisitorId", expr("previousPeriod").alias("current"))
previous = buyersInPeriod.select("fullVisitorId", expr("period").alias("current"))
comparePeriods = previous.join(current, ["fullVisitorId", "current"])
comparePeriods.show()
totRemainingBuyersPeriod = comparePeriods.groupBy("current").count().orderBy("current").withColumnRenamed("current","period").withColumnRenamed("count","remaining")
totRemainingBuyersPeriod.show()
totBuyersPeriod.join(totRemainingBuyersPeriod, "period").select(expr("period").alias("afterPeriod"), expr("1 - (remaining / count)").alias("churnRate")).show()
transactionsInfo = ga_data.select('fullVisitorId','channelGrouping','visitId', 'visitNumber', to_date("date", "yyyyMMdd").alias("pdate"), expr("totals.transactions").cast('integer')).fillna(0).filter("transactions > 0")
transactionsInfo.show()
userTransactionWindow = Window.partitionBy(transactionsInfo['fullVisitorId']).orderBy(transactionsInfo['pdate'].asc(),transactionsInfo['visitNumber'].asc())
transactionOrder = row_number().over(userTransactionWindow)
orderedTransactions = transactionsInfo.select('fullVisitorId', 'channelGrouping', 'pdate', 'visitId', transactionOrder.alias('rank'))
orderedTransactions.show()
firstChannel = orderedTransactions.select('fullVisitorId','channelGrouping').where("rank = 1")
firstChannel.groupBy('channelGrouping').count().show()
secondChannel = orderedTransactions.select('fullVisitorId','channelGrouping').where("rank > 1").groupBy('fullVisitorId','channelGrouping').count().withColumnRenamed('channelGrouping','followChannel')
secondChannel.groupBy('followChannel').count().show()
channelsByUser = firstChannel.join(secondChannel, 'fullVisitorId')
channelsByUser.show()
channelMatrix = channelsByUser.groupBy('channelGrouping','followChannel').agg(sum('count').alias('refs')).orderBy('channelGrouping','followChannel')
channelMatrix.show()
inChannelW = Window.partitionBy(channelMatrix['channelGrouping'])
tottr = (channelMatrix['refs'] / sum('refs').over(inChannelW))*100
kohChannel = channelMatrix.select('channelGrouping','followChannel', 'refs', tottr.alias('quote')).toPandas()
kohChannel
# X-Achse stellt den Reengaging-Channel
fig = go.Figure(data=[go.Heatmap(
z=kohChannel['quote'],
x=kohChannel['followChannel'],
y=kohChannel['channelGrouping'])])
iplot(fig)
firstMonth = orderedTransactions.withColumn("month", month("pdate")).withColumn("year", year("pdate")).select('fullVisitorId',expr( "month + ((year - 2016)*12)").alias("period")).where("rank = 1")
followMonth = orderedTransactions.withColumn("month", month("pdate")).withColumn("year", year("pdate")).select('fullVisitorId',expr( "month + ((year - 2016)*12)").alias("follow_period")).where("rank > 1").groupBy('fullVisitorId',"follow_period").count()
joinedm = firstMonth.join(followMonth, 'fullVisitorId')
monthMatrix = joinedm.groupBy('period','follow_period').agg(sum('count').alias('refs')).orderBy('period','follow_period')
monthW =Window.partitionBy(monthMatrix['period'])
totm = (monthMatrix['refs'] / sum('refs').over(monthW))*100
kohMonat = monthMatrix.select('period','follow_period', 'refs', totm.alias('quote')).toPandas()
kohMonat
iplot(go.Figure(data=[go.Heatmap(
z=kohMonat['quote'],
x=kohMonat['follow_period'],
y=kohMonat['period'])]))
transactionHits = ga_data.select('fullVisitorId', 'visitId',expr("totals.transactions").cast('integer'), explode("hits").alias("hit"), expr("hit.product"), expr("hit.transaction.transactionRevenue")).fillna(0).filter("transactions > 0 AND transactionRevenue > 0")
productVisit = transactionHits.select('visitId', explode("product").alias('productext'), expr("productext.v2ProductName"))
firstProds = orderedTransactions.select('visitId').where("rank == 1").join(productVisit, 'visitId').groupBy('v2ProductName').count().orderBy(expr('count').desc())
firstProds.show(truncate=False)
followProds = orderedTransactions.select('visitId').where("rank > 1").join(productVisit, 'visitId').groupBy('v2ProductName').count().orderBy(expr('count').desc())
followProds.show(truncate=False)
transactionFeatures = ga_data.select('fullVisitorId',"device","geoNetwork","socialEngagementType", "trafficSource", 'visitId', 'visitNumber', to_date("date", "yyyyMMdd").alias("pdate"), expr("totals.transactions").cast('integer')).fillna(0).filter("transactions > 0")
transactionOrder = row_number().over(Window.partitionBy(transactionFeatures['fullVisitorId']).orderBy(transactionFeatures['pdate'].asc(),transactionFeatures['visitNumber'].asc()))
transactionFeatures = transactionFeatures.withColumn("rank", transactionOrder)
firstDev = transactionFeatures.where("rank = 1").groupBy("device.deviceCategory").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Erstkäufer: Geräteart') , data= [go.Pie(labels=firstDev["deviceCategory"], values=firstDev["count"])]))
secondDev = transactionFeatures.where("rank > 1").groupBy("device.deviceCategory").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Zweitkäufer: Geräteart') , data= [go.Pie(labels=secondDev["deviceCategory"], values=secondDev["count"])]))
frstBrowser = transactionFeatures.where("rank = 1").groupBy("device.browser").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Erstkäufer: Browser') , data= [go.Pie(labels=frstBrowser["browser"], values=frstBrowser["count"])]))
ndBrowser = transactionFeatures.where("rank > 1").groupBy("device.browser").count().orderBy(desc("count")).toPandas()
iplot(go.Figure(layout=dict(title='Zweitkäufer: Browser') , data= [go.Pie(labels=ndBrowser["browser"], values=ndBrowser["count"])]))
transactionFeatures.where("rank == 1").groupBy("geoNetwork.country").count().orderBy(desc("count")).show()
transactionFeatures.where("rank > 1").groupBy("geoNetwork.country").count().orderBy(desc("count")).show()