import findspark
findspark.init('/usr/local/Cellar/apache-spark/2.3.2/libexec/')
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
sc = SparkSession.builder.appName("TestGA").getOrCreate()
# vor ga_sessions_201611* weitgehend category "(not set)"
# pyspark wird manchmal falsch interpretiert mit Java > 8
ga_data = sc.read.json('/Users/eduardoschumann/Documents/Projekte/E-CommerceAnalytics/ga_data/ga_sessions_2016*.json.gz')
Filtern von Hits die Transaktionen darstellen.
transactionsInfo = ga_data.select('fullVisitorId','visitId', 'visitNumber',
expr("totals.transactions").cast('integer'),
expr("totals.transactionRevenue").cast('integer'),
explode('hits').alias('hit'),
'hit.isExit',
'hit.isInteraction',
'hit.product'
).fillna(0)\
.filter("transactions > 0")\
.filter(col('hit.transaction.transactionId').isNotNull())\
.filter(col('hit.isExit').isNull())\
.drop('hit')
transactionsInfo.printSchema()
productInfo = transactionsInfo.select('fullVisitorId',
explode('product').alias('sproduct'),
expr('sproduct.productQuantity').cast('integer'),
expr('sproduct.productRevenue').cast('integer'),
'sproduct.productSKU',
'sproduct.v2ProductCategory',
'sproduct.v2ProductName'
).filter(~((col('v2ProductCategory') == '${productitem.product.origCatName}')|
(col('v2ProductCategory') == '(not set)'))).\
fillna({'productQuantity':0, 'productRevenue':0 }).\
filter('productQuantity > 1').\
drop('sproduct')
productInfo.show(3, truncate=False)
productbyVisitor = productInfo.groupBy('fullVisitorId', 'productSKU').\
agg(sum('productQuantity').alias('quantity'),
first(col('v2ProductCategory')).alias('category'),
first(col('productRevenue')).alias('revenue'),
first(col('v2ProductName')).alias('product'),
)
collected = productbyVisitor.groupBy('fullVisitorId').agg(collect_list("product").alias("products"))
make a struct out of product cols and collect by user
collected.write.json('productCollected.json')