Zur Blog-Serie https://www.shi-gmbh.com/herausforderung-customer-centricity/
Author: Dr. Eduardo Torres Schumann, SHI Gmbh
Kunden sollen anhand ihrer Käufe in unterschiedlichen Produktkategorien beschrieben werden. Hierfür werden alle Transaktionen eines Kunden betrachtet und für die einzelnen Kategorien den Gesamteinkaufswert ermittelt. Neben den Ausgaben nach Kategorie sollen die durschnittliche Produktanzahl und das Volumen des Warenkorbs sowie die durchschnitliche Zeit in Tagen bis zum Wiederkauf den Kunden charakterisieren.
Wir betrachten alle Kunden die Ende 2016 eingekauf haben.
import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sc = SparkSession.builder.appName("TestGA").getOrCreate()
# vor ga_sessions_201611* weitgehend category "(not set)"
# pyspark wird manchmal falsch interpretiert mit Java > 8
ga_data = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_2016*.json.gz')
Filterung der 'Hits', die eine Transaktion darstellen:
transactionsInfo = ga_data.select('fullVisitorId','visitId', 'visitNumber',
expr("totals.transactions").cast('integer'),
expr("totals.transactionRevenue").cast('integer'),
explode('hits').alias('hit'),
'hit.contentGroup',
'hit.page',
'hit.isExit',
'hit.isInteraction',
'hit.product',
'hit.transaction',
'hit.transaction.transactionId',
'hit.product.productQuantity',
'hit.type').fillna(0)\
.filter("transactions > 0")\
.filter(col('hit.transaction.transactionId').isNotNull())\
.filter(col('hit.isExit').isNull())\
.drop('hit')
transactionsInfo.show(3,truncate=False)
Produktumsatz in den Transaktion-'Hits'
productTransactions = transactionsInfo.select('fullVisitorId','transactionRevenue',
explode('product').alias('sproduct'),
'transaction.transactionId',
'sproduct.v2ProductName',
'sproduct.v2ProductCategory',
expr('sproduct.productPrice').cast('integer').alias('price'),
expr('sproduct.productQuantity').cast('integer').alias('quantity'),
(col("sproduct.productPrice") * col("sproduct.productQuantity")).alias('productSales'),
'transactionRevenue')\
.filter(col('productQuantity').isNotNull()).drop('sproduct')
productTransactions.show(truncate=False)
customerCategories = productTransactions.groupBy('fullVisitorId', 'v2ProductCategory').agg(sum('productSales').alias('categorySales'))
customerCategories.show(truncate=False)
customerCategories.groupBy("v2ProductCategory").count().show(30)
Zusammenfassung von Kategorien in Top-Level-Kategorien
Top-Level Kategorien:
drinkware_regex = "Housewares|Drinkware|Bottles|Tumblers"
bags_regex = "Backpacks|Bags|More Bags"
office_regex = "Office|Notebooks & Journals|Gift Cards"
brand_regex = "Waze|Google|Android"
electronics_regex = "Electronics"
lifestyle_regex = "Accessories|Lifestyle|Fun"
apparel_regex = "Apparel|Headgear"
toplevelCategories = customerCategories.filter(~((col('v2ProductCategory') == '${productitem.product.origCatName}')|
(col('v2ProductCategory') == '(not set)'))).\
withColumn("category",
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(
regexp_replace(col("v2ProductCategory"),
apparel_regex, "Apparel"),
lifestyle_regex, "Lifestyle"),
electronics_regex,"Electronics"),
brand_regex,"Brand"),
office_regex,"Office"),
bags_regex, "Bags"),
drinkware_regex, "Drinkware"))
toplevelCategories.show()
#groupBy("category").count().show(30)
Aufsummierung der Ausgaben pro Kategorie und Pivotierung der Tabelle
Pivotierung von DataFrames in Spark: https://databricks.com/blog/2016/02/09/reshaping-data-with-pivot-in-apache-spark.html
visitorRevenueByCategory = toplevelCategories.groupBy('fullVisitorId').pivot('category').agg(sum('categorySales')).fillna(0.0)
visitorRevenueByCategory.show()
transactions = ga_data.select('fullVisitorId',
to_date("date", "yyyyMMdd").alias("pdate"),
expr("totals.transactions").cast('integer'),
expr("totals.totalTransactionRevenue").cast('float')).fillna(0).filter("transactions > 0")
#transactions.show(50)
timeintertrans = transactions.withColumn("time_intertrans",datediff(transactions.pdate, lag(transactions.pdate, 1).over(Window.partitionBy("fullVisitorId").orderBy("pdate")))).fillna(0)
timeintertrans.show()
visitorTransactionKPIs = timeintertrans.groupBy('fullVisitorId').agg(avg("totalTransactionRevenue").alias("avg_revenue"),
sum("totalTransactionRevenue").alias("sum_revenue"),
sum("transactions").alias("total_transactions"),
avg("time_intertrans").alias("avg_time"))
customerMatrix = visitorTransactionKPIs.join(visitorRevenueByCategory,"fullVisitorId")
customerMatrix.limit(15).toPandas()
Für den ML-Algorithmus müssen die Merkmale als Vector von Zahlen kodiert werden. Da die Warenkorb- und Kategorienausgaben sich in unterschiedlichen Bereichen bewegen, werden sie skaliert.
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
#scaling
sScaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)
#input to the algorithm must be vector
vectorAssembler = VectorAssembler()\
.setInputCols(["avg_revenue", "sum_revenue", "total_transactions", "avg_time",
"Apparel", "Bags", "Brand", "Drinkware", "Electronics", "Lifestyle", "Office"])\
.setOutputCol("features")
# assembling, scaling
transformationPipeline = Pipeline().setStages([vectorAssembler,sScaler])
# collect statistics for scaling
fittedPipeline = transformationPipeline.fit(customerMatrix)
# actually transform
transformedMatrix = fittedPipeline.transform(customerMatrix)
# put it into cache for accessing it many times efficently
transformedMatrix.cache()
transformedMatrix.select("features", "scaledFeatures").show()
from pyspark.ml.clustering import KMeans
from pyspark.ml.evaluation import ClusteringEvaluator
# Trains a k-means model.
# Test with k = 10
kmeans = KMeans().setK(10).setSeed(1)
model = kmeans.fit(transformedMatrix)
# Make predictions
predictions = model.transform(transformedMatrix)
model.computeCost(transformedMatrix)
predictions
# Evaluate clustering by computing Silhouette score
# The Silhouette is a measure for the validation of the consistency within clusters.
#I t ranges between 1 and -1, where a value close to 1 means that the points in a cluster are close to the other
# points in the same cluster and far from the points of the other clusters.
# https://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html
evaluator = ClusteringEvaluator()
# https://scikit-learn.org/stable/auto_examples/cluster/plot_kmeans_silhouette_analysis.html
silhouette = evaluator.evaluate(predictions)
print("Silhouette with squared euclidean distance = " + str(silhouette))
# Shows the result.
centers = model.clusterCenters()
print("Cluster Centers: ")
for center in centers:
print(center)
bestK = 0
bestSilhouette = 0.0
bestModel = None
evaluator = ClusteringEvaluator()
for k in range(5,16):
kmeans = KMeans().setK(k).setSeed(1)
model = kmeans.fit(transformedMatrix)
cost = model.computeCost(transformedMatrix)
predictions = model.transform(transformedMatrix)
silhouette = evaluator.evaluate(predictions)
print("k: ", k, "silhouette: ", silhouette )
if (silhouette > bestSilhouette):
bestK = k
bestSilhouette = silhouette
bestModel = model
print("best k: ", bestK, "silhouette: ", bestSilhouette )
bestModel.summary.clusterSizes
predictions = bestModel.transform(transformedMatrix)
predictions
clusterProperties = predictions.groupBy('prediction').agg(avg(expr('avg_revenue / 1.0E6')).alias('avgRev'),
avg(expr('sum_revenue / 1.0E6')).alias('sumRev'),
avg('total_transactions').alias('transactions'),
avg('avg_time').alias('days'),
avg(expr('Apparel / 1.0E6')).alias('Apparel'),
avg(expr('Bags / 1.0E6')).alias('Bags'),
avg(expr('Brand / 1.0E6')).alias('Brand'),
avg(expr('Drinkware / 1.0E6')).alias('Drinkware'),
avg(expr('Electronics / 1.0E6')).alias('Electronics'),
avg(expr('Lifestyle/ 1.0E6')).alias('Lifestyle'),
avg(expr('Office / 1.0E6')).alias('Office'),
count('prediction').alias('size')).orderBy('prediction')
clusterProperties.show()
import numpy as np
import plotly.plotly as py
import plotly.graph_objs as go
import matplotlib.pyplot as plt
from plotly.offline import iplot, init_notebook_mode
init_notebook_mode()
clusterSizes = clusterProperties.select(col('prediction').cast('string'),'size').toPandas()
iplot(go.Figure( layout=dict(title='Cluster Sizes', width=500, height=500),
data= [go.Bar( y = clusterSizes["size"],
x = clusterSizes["prediction"],
text= clusterSizes["size"],
textposition='auto')]))
clusterProperties.select(max('avgRev'),
max('sumRev'),
max('transactions'),
max('days'),
max('Apparel'),
max('Bags'),
max('Brand'),
max('Drinkware'),
max('Electronics'),
max('Lifestyle'),
max('Office')).toPandas()
# modified from https://www.kaggle.com/fabiendaniel/customer-segmentation
def _scale_data(data, ranges):
(x1, x2) = ranges[0]
d = data[0]
return [(d - y1) / (y2 - y1) * (x2 - x1) + x1 for d, (y1, y2) in zip(data, ranges)]
class RadarChart():
def __init__(self, fig, location, sizes, variables, ranges, n_ordinate_levels = 6):
angles = np.arange(0, 360, 360./len(variables))
ix, iy = location[:] ; size_x, size_y = sizes[:]
axes = [fig.add_axes([ix, iy, size_x, size_y], polar = True,
label = "axes{}".format(i)) for i in range(len(variables))]
_, text = axes[0].set_thetagrids(angles, labels = variables)
for txt, angle in zip(text, angles):
if angle > -1 and angle < 181:
txt.set_rotation(angle - 90)
else:
txt.set_rotation(angle - 270)
for ax in axes[1:]:
ax.patch.set_visible(False)
ax.xaxis.set_visible(False)
ax.grid(False)
for i, ax in enumerate(axes):
grid = np.linspace(*ranges[i],num = n_ordinate_levels)
grid_label = [""]+["{:.0f}".format(x) for x in grid[1:-1]]
ax.set_rgrids(grid, labels = grid_label, angle = angles[i])
ax.set_ylim(*ranges[i])
self.angle = np.deg2rad(np.r_[angles, angles[0]])
self.ranges = ranges
self.ax = axes[0]
def plot(self, data, *args, **kw):
sdata = _scale_data(data, self.ranges)
self.ax.plot(self.angle, np.r_[sdata, sdata[0]], *args, **kw)
def fill(self, data, *args, **kw):
sdata = _scale_data(data, self.ranges)
self.ax.fill(self.angle, np.r_[sdata, sdata[0]], *args, **kw)
def legend(self, *args, **kw):
self.ax.legend(*args, **kw)
def title(self, title, *args, **kw):
self.ax.text(0, 1.1, title, transform = self.ax.transAxes, *args, **kw)
merged_df = clusterProperties.toPandas()
n_clusters = 6
fig = plt.figure(figsize=(12,12))
attributes = ['avgRev', 'sumRev', 'transactions', 'days', 'Apparel', 'Bags', 'Brand', 'Drinkware', 'Electronics', 'Lifestyle', 'Office']
#to-do
ranges = [[0, 7100], [0, 7100], [0, 20], [0, 10], [0, 1500], [0, 1500], [0, 10], [0, 7100], [0, 1500], [0, 1500], [0, 1500]]
#7003.500032 7003.500032 18.5 9.873274 1072.115 1224.825 3.741908 6996.5 305.6405 225.35 1048.96
index = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
n_groups = n_clusters ; i_cols = 3
i_rows = n_groups//i_cols
size_x, size_y = (1/i_cols), (1/i_rows)
for ind in range(n_clusters):
cluster_name = merged_df.loc[index[ind], 'prediction']
cluster_size = merged_df.loc[index[ind], 'size']
ix = ind%3 ; iy = i_rows - ind//3
pos_x = ix*(size_x + 0.1) ; pos_y = iy*(size_y + 0.0)
location = [pos_x, pos_y] ; sizes = [size_x, size_y]
#______________________________________________________
data = np.array(merged_df.loc[index[ind], attributes])
radar = RadarChart(fig, location, sizes, attributes, ranges)
radar.plot(data, color = 'b', linewidth=2.0)
radar.fill(data, alpha = 0.2, color = 'b')
radar.title(title = 'cluster {}, size = {}'.format(cluster_name, cluster_size), color = 'r', size=14)
ind += 1