Create Spark Session

In [1]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *
import pyspark.sql.functions as sf

# Create my_spark_session
# if it already exists, it'll get, otherwise it'll create
my_spark = SparkSession.builder.getOrCreate()

# print session
print(my_spark)

#To view the dataframe, we can do df.show()
<pyspark.sql.session.SparkSession object at 0x10c95c710>

Import data

Here, we will calculate the date in YYYYMMDD format for yesterday & read files from that directory

In [2]:
yesterday = datetime.fromtimestamp(int(datetime.now().strftime('%s')) - 60*60*24).strftime('%Y%m%d')
In [3]:
#Superstore dataset made by Tableau
df = my_spark.read.format("csv").option("header", "true").load(yesterday + "/*.csv")

Use Spark Dataframe API

In [4]:
#How to select specific fields, apply alias and cast as type
df2 = df.select(col("Customer Name").alias("customer"), col("sales"), col("Profit").cast('float'), col("Region"), col("Ship Mode").alias("shipmode"))
In [5]:
df3 = df2.filter((df2.shipmode == "Standard Class") & (df2.Region == 'South')).limit(5).show()
+--------------+--------+---------+------+--------------+
|      customer|   sales|   Profit|Region|      shipmode|
+--------------+--------+---------+------+--------------+
|Sean O'Donnell|957.5775| -383.031| South|Standard Class|
|Sean O'Donnell|  22.368|   2.5164| South|Standard Class|
|  Andrew Allen|  15.552|   5.4432| South|Standard Class|
|    Erin Smith|  95.616|   9.5616| South|Standard Class|
|    Joel Eaton| 831.936|-114.3912| South|Standard Class|
+--------------+--------+---------+------+--------------+

In [6]:
#Calculate the most profitable customers
df2 = df2.groupBy("customer").agg(sf.sum('sales').alias('sales'), sf.sum('Profit').alias('profit'))
df2 = df2.withColumn("Margin", bround((df2.profit/df2.sales)*100,2)).sort(col("profit").desc()).limit(5)

What about SQL against the files?

In [7]:
df.createOrReplaceTempView("superstore")

finaldf = spark.sql(''' select * 
                        from (
                            select customer, sum(sales) as sales, sum(profit) as profit, (sum(profit)/sum(sales))*100 as margin
                            from(
                                select `Customer Name` as customer, sum(sales) as sales, sum(profit) as profit
                                from superstore 
                                where `Ship Mode` = 'Standard Class' and region = 'South'
                                group by customer)t
                            group by customer)s
                        order by margin desc
                        limit 5''')
In [8]:
finaldf.show()
+--------------------+------------------+------------------+------------------+
|            customer|             sales|            profit|            margin|
+--------------------+------------------+------------------+------------------+
|       Theresa Swint|            300.93|          679.3527|225.75107167779885|
|   Anthony Garverick|             113.1|             56.55|              50.0|
|   Dorothy Dickinson|             25.06|             12.53|              50.0|
|Christopher Martinez|6412.7699999999995|3192.0681999999997| 49.77674546256922|
|          Ben Ferrer|           1396.35|          694.4142| 49.73066924481685|
+--------------------+------------------+------------------+------------------+

And what about doing the same in Pandas?

In [9]:
import pandas as pd

#Spark has done the heavy lifting and aggregation. We can just convert the dataframe
panda = df2.toPandas()
In [10]:
panda
Out[10]:
customer sales profit Margin
0 Tamara Chand 19017.848 8964.482192 47.14
1 Raymond Buch 15117.339 6976.096050 46.15
2 Sanjit Chand 14142.334 5757.411976 40.71
3 Hunter Lopez 12873.298 5622.429434 43.68
4 Adrian Barton 14355.611 5438.907616 37.89
In [ ]: