Create Spark session

In [1]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *
import pyspark.sql.functions as sf

# Create my_spark_session
# if it already exists, it'll get, otherwise it'll create
my_spark = SparkSession.builder.appName('myapp').getOrCreate()

# print session
print(my_spark)

#To view the dataframe, we can do df.show()
<pyspark.sql.session.SparkSession object at 0x10b81c7f0>
In [81]:
#Superstore dataset made by Tableau
df = my_spark.read.format("csv").option("header", "true").load("20191013/*.csv")

Check the inferred schema

In [4]:
#Check dataframe schema
df.printSchema()
root
 |-- Row ID: string (nullable = true)
 |-- Order ID: string (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: string (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: string (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: string (nullable = true)
 |-- Quantity: string (nullable = true)
 |-- Discount: string (nullable = true)
 |-- Profit: string (nullable = true)

In [5]:
#Python list of column names
df.columns
Out[5]:
['Row ID',
 'Order ID',
 'Order Date',
 'Ship Date',
 'Ship Mode',
 'Customer ID',
 'Customer Name',
 'Segment',
 'Country',
 'City',
 'State',
 'Postal Code',
 'Region',
 'Product ID',
 'Category',
 'Sub-Category',
 'Product Name',
 'Sales',
 'Quantity',
 'Discount',
 'Profit']
In [31]:
#Statistical summary of numeric columns in df. E.g. mean, stddev, min, max
df.describe().head(2)
Out[31]:
[Row(summary='count', Row ID='9994', Order ID='9994', Order Date='9994', Ship Date='9994', Ship Mode='9994', Customer ID='9994', Customer Name='9994', Segment='9994', Country='9994', City='9994', State='9994', Postal Code='9994', Region='9994', Product ID='9994', Category='9994', Sub-Category='9994', Product Name='9994', Sales='9994', Quantity='9994', Discount='9994', Profit='9994'),
 Row(summary='mean', Row ID='4997.5', Order ID=None, Order Date=None, Ship Date=None, Ship Mode=None, Customer ID=None, Customer Name=None, Segment=None, Country=None, City=None, State=None, Postal Code='55190.3794276566', Region=None, Product ID=None, Category=None, Sub-Category=None, Product Name=None, Sales='234.41818199917006', Quantity='5.828590535392018', Discount='0.3155949113492862', Profit='28.587912967780834')]

What if the schema isn't correct automatically?

In [54]:
from pyspark.sql.types import StructField, StringType, IntegerType, StructType, FloatType

#For each column give types and tell it that nullable is true
data_schema = [StructField('Row ID', IntegerType(), True),
               StructField('Order ID', IntegerType(), True),
               StructField('Order Date', StringType(), True),
               StructField('Ship Date', StringType(), True),
               StructField('Ship Mode', StringType(), True),
               StructField('Customer ID', IntegerType(), True),
               StructField('Customer Name', StringType(), True),
               StructField('Segment', StringType(), True),
               StructField('Country', StringType(), True),
               StructField('City', StringType(), True),
               StructField('State', StringType(), True),
               StructField('Postal Code', StringType(), True),
               StructField('Region', StringType(), True),
               StructField('Product ID', IntegerType(), True),
               StructField('Category', StringType(), True),
               StructField('Sub-Category', StringType(), True),
               StructField('Product Name', StringType(), True),
               StructField('Sales', FloatType(), True),
               StructField('Quantity', FloatType(), True),              
               StructField('Discount', FloatType(), True),      
               StructField('Profit', FloatType(), True)]
In [57]:
#We now put together the final schema
final_structure = StructType(fields=data_schema)

#And re-read the dataframe with that schema applied
df = my_spark.read.format("csv").option("header", "true").load("20191013/*.csv", schema=final_structure)
In [10]:
#Check that it is correct
df.printSchema()
root
 |-- Row ID: integer (nullable = true)
 |-- Order ID: integer (nullable = true)
 |-- Order Date: string (nullable = true)
 |-- Ship Date: string (nullable = true)
 |-- Ship Mode: string (nullable = true)
 |-- Customer ID: integer (nullable = true)
 |-- Customer Name: string (nullable = true)
 |-- Segment: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Postal Code: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Product ID: integer (nullable = true)
 |-- Category: string (nullable = true)
 |-- Sub-Category: string (nullable = true)
 |-- Product Name: string (nullable = true)
 |-- Sales: float (nullable = true)
 |-- Quantity: float (nullable = true)
 |-- Discount: float (nullable = true)
 |-- Profit: float (nullable = true)

Select method

In [32]:
df.select(['Region', 'Sales']).head(2)
Out[32]:
[Row(Region='South', Sales='261.96'), Row(Region='South', Sales='731.94')]

Adding & Renaming Columns

In [30]:
#Add a new column
df.withColumn('new_column', df['sales']/df['profit']).head(2)
Out[30]:
[Row(Row ID='1', Order ID='CA-2016-152156', Order Date='08/11/2016', Ship Date='11/11/2016', Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code='42420', Region='South', Product ID='FUR-BO-10001798', Category='Furniture', Sub-Category='Bookcases', Product Name='Bush Somerset Collection Bookcase', Sales='261.96', Quantity='2', Discount='0', Profit='41.9136', new_column=6.249999999999999),
 Row(Row ID='2', Order ID='CA-2016-152156', Order Date='08/11/2016', Ship Date='11/11/2016', Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code='42420', Region='South', Product ID='FUR-CH-10000454', Category='Furniture', Sub-Category='Chairs', Product Name='Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back', Sales='731.94', Quantity='3', Discount='0', Profit='219.582', new_column=3.3333333333333335)]
In [29]:
#Rename a column
df.withColumnRenamed('sales', 'new_sales').head(2)
Out[29]:
[Row(Row ID='1', Order ID='CA-2016-152156', Order Date='08/11/2016', Ship Date='11/11/2016', Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code='42420', Region='South', Product ID='FUR-BO-10001798', Category='Furniture', Sub-Category='Bookcases', Product Name='Bush Somerset Collection Bookcase', new_sales='261.96', Quantity='2', Discount='0', Profit='41.9136'),
 Row(Row ID='2', Order ID='CA-2016-152156', Order Date='08/11/2016', Ship Date='11/11/2016', Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code='42420', Region='South', Product ID='FUR-CH-10000454', Category='Furniture', Sub-Category='Chairs', Product Name='Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back', new_sales='731.94', Quantity='3', Discount='0', Profit='219.582')]

Filtering data

In [33]:
#Use SQL syntax
df.filter("State = 'Kentucky'").head(2)
Out[33]:
[Row(Row ID='1', Order ID='CA-2016-152156', Order Date='08/11/2016', Ship Date='11/11/2016', Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code='42420', Region='South', Product ID='FUR-BO-10001798', Category='Furniture', Sub-Category='Bookcases', Product Name='Bush Somerset Collection Bookcase', Sales='261.96', Quantity='2', Discount='0', Profit='41.9136'),
 Row(Row ID='2', Order ID='CA-2016-152156', Order Date='08/11/2016', Ship Date='11/11/2016', Ship Mode='Second Class', Customer ID='CG-12520', Customer Name='Claire Gute', Segment='Consumer', Country='United States', City='Henderson', State='Kentucky', Postal Code='42420', Region='South', Product ID='FUR-CH-10000454', Category='Furniture', Sub-Category='Chairs', Product Name='Hon Deluxe Fabric Upholstered Stacking Chairs, Rounded Back', Sales='731.94', Quantity='3', Discount='0', Profit='219.582')]
In [35]:
#Use SQL syntax to filter & select specific columns
df.filter("State = 'Kentucky'").select(['State', 'City']).head(2)
Out[35]:
[Row(State='Kentucky', City='Henderson'),
 Row(State='Kentucky', City='Henderson')]
In [36]:
#Do the same with dataframe operations
df.filter(df['State'] == 'Kentucky').select(['State', 'City']).head(2)
Out[36]:
[Row(State='Kentucky', City='Henderson'),
 Row(State='Kentucky', City='Henderson')]
In [37]:
#Use multiple where conditions (OR)
df.filter( (df['State'] == 'Kentucky') | (df['State'] == 'Florida') ).select(['State', 'City']).head(2)
Out[37]:
[Row(State='Kentucky', City='Henderson'),
 Row(State='Kentucky', City='Henderson')]
In [38]:
#Use multiple where conditions (AND NOT)
df.filter( (df['State'] == 'Kentucky') &~ (df['State'] == 'Florida') ).select(['State', 'City']).head(2)
Out[38]:
[Row(State='Kentucky', City='Henderson'),
 Row(State='Kentucky', City='Henderson')]

Collect method

In [42]:
#If you want to return a row object that you can then store as a result, use .collect rather than .show
result = df.filter( (df['State'] == 'Kentucky') &~ (df['State'] == 'Florida') ).select(['State', 'City']).collect()
In [44]:
#get first result in collected data
row = result[0]
In [47]:
#Turn row into python dict
row.asDict()
Out[47]:
{'State': 'Kentucky', 'City': 'Henderson'}
In [48]:
#Select specific field from dictionary
row.asDict()['City']
Out[48]:
'Henderson'

Data aggregations

In [71]:
#Group by state and city and sum sales and profit and set an alias
df.groupBy(['State', 'City']).agg( sf.sum('Sales'), sf.sum('Profit').alias('TotalProfit') ).head(10)
Out[71]:
[Row(State='Ohio', City='Newark', sum(Sales)=8128.069067239761, TotalProfit=-2292.4127426743507),
 Row(State='New York', City='Yonkers', sum(Sales)=7657.666157722473, TotalProfit=2767.755807876587),
 Row(State='Louisiana', City='Lafayette', sum(Sales)=5405.750039815903, TotalProfit=1042.2903164625168),
 Row(State='Connecticut', City='Waterbury', sum(Sales)=1326.0599646568298, TotalProfit=338.9354909658432),
 Row(State='Illinois', City='Saint Charles', sum(Sales)=1358.3660316467285, TotalProfit=248.50100135803223),
 Row(State='Illinois', City='Chicago', sum(Sales)=47767.33099740744, TotalProfit=-6460.284273296595),
 Row(State='Ohio', City='Lorain', sum(Sales)=1966.2149403095245, TotalProfit=-641.4797307252884),
 Row(State='Michigan', City='Ann Arbor', sum(Sales)=889.2730140686035, TotalProfit=228.91580438613892),
 Row(State='California', City='Lodi', sum(Sales)=39.68000030517578, TotalProfit=16.268800735473633),
 Row(State='Illinois', City='Naperville', sum(Sales)=1288.3029999732971, TotalProfit=-146.53240871429443)]
In [65]:
#Sum all the sales in the dataframe
df.agg({'Sales': 'Sum'}).show()
+------------------+
|        sum(Sales)|
+------------------+
|2272449.8518802524|
+------------------+

In [70]:
#Count distinct cities within the file
df.select(countDistinct('City')).head(5)
Out[70]:
[Row(count(DISTINCT City)=531)]
In [76]:
#Calculate standard deviation and format to 2 decimal places
sales_stdev = df.select( stddev('Sales').alias('Stdev') )
sales_stdev.select(format_number('Stdev', 2).alias('Stdev') ).show()
+------+
| Stdev|
+------+
|631.79|
+------+

Handling missing data

1) keep as nulls 2) drop missing points 3) fill them with different values

In [82]:
df = my_spark.read.format("csv").option("header", "true").load("missing.csv")
In [83]:
#Spark automatically makes the value set to 'null'
df.show()
+----+----+
| age|Name|
+----+----+
|  51| bob|
|null|carl|
|  21|john|
|  33|null|
+----+----+

In [84]:
#Drop any row that contains any missing data
df.na.drop().show()
+---+----+
|age|Name|
+---+----+
| 51| bob|
| 21|john|
+---+----+

In [86]:
#If there are 1 non-null values, show.
df.na.drop(thresh=1).show()
+----+----+
| age|Name|
+----+----+
|  51| bob|
|null|carl|
|  21|john|
|  33|null|
+----+----+

In [87]:
#If all values in row are null, then drop
df.na.drop(how='all').show()
+----+----+
| age|Name|
+----+----+
|  51| bob|
|null|carl|
|  21|john|
|  33|null|
+----+----+

In [88]:
#Only drop if columns in subset include null
df.na.drop(subset=['Name']).show()
+----+----+
| age|Name|
+----+----+
|  51| bob|
|null|carl|
|  21|john|
+----+----+

In [89]:
#Fill the value with predefined string
df.na.fill('FILL VALUE').show()
+----------+----------+
|       age|      Name|
+----------+----------+
|        51|       bob|
|FILL VALUE|      carl|
|        21|      john|
|        33|FILL VALUE|
+----------+----------+

In [90]:
#We can put a number in. But it will only fill in where field type = numeric ( which ours aren't )
df.na.fill(0).show()
+----+----+
| age|Name|
+----+----+
|  51| bob|
|null|carl|
|  21|john|
|  33|null|
+----+----+

In [91]:
#We can fill specific columns with value using subset
df.na.fill('Missing Name', subset=['Name']).show()
+----+------------+
| age|        Name|
+----+------------+
|  51|         bob|
|null|        carl|
|  21|        john|
|  33|Missing Name|
+----+------------+

In [98]:
#Fill value with the mean value
mean = 10
df.na.fill(mean, subset=['age'])
Out[98]:
DataFrame[age: string, Name: string]

Case Statements in PySpark

In [101]:
#Select the name column & then run when statements across it to produce a new column
df2 = df.select(df.Name, sf.when(df.Name == 'bob', 'YES').when(df.Name == 'Carl', 'NO').otherwise('Who Knows'))
df2.show()
+----+-----------------------------------------------------------------------------+
|Name|CASE WHEN (Name = bob) THEN YES WHEN (Name = Carl) THEN NO ELSE Who Knows END|
+----+-----------------------------------------------------------------------------+
| bob|                                                                          YES|
|carl|                                                                    Who Knows|
|john|                                                                    Who Knows|
|null|                                                                    Who Knows|
+----+-----------------------------------------------------------------------------+

In [102]:
#The column has a horrible name, so....
df2.withColumnRenamed('CASE WHEN (Name = bob) THEN YES WHEN (Name = Carl) THEN NO ELSE Who Knows END', 'Flag').show()
+----+---------+
|Name|     Flag|
+----+---------+
| bob|      YES|
|carl|Who Knows|
|john|Who Knows|
|null|Who Knows|
+----+---------+

String functions

In [106]:
#Like function
df.select('Name','age', df.Name.like('bo%')).show()
+----+----+-------------+
|Name| age|Name LIKE bo%|
+----+----+-------------+
| bob|  51|         true|
|carl|null|        false|
|john|  21|        false|
|null|  33|         null|
+----+----+-------------+

In [107]:
#Substring function
df.select(df.Name.substr(1,3).alias('Name2'), df.age).show()
+-----+----+
|Name2| age|
+-----+----+
|  bob|  51|
|  car|null|
|  joh|  21|
| null|  33|
+-----+----+

In [110]:
#Starts with function
df.select(df.Name, df.Name.startswith('bo').alias('Name2'), df.age).show()
+----+-----+----+
|Name|Name2| age|
+----+-----+----+
| bob| true|  51|
|carl|false|null|
|john|false|  21|
|null| null|  33|
+----+-----+----+

In [111]:
#Ends with function
df.select(df.Name, df.Name.endswith('hn').alias('Name2'), df.age).show()
+----+-----+----+
|Name|Name2| age|
+----+-----+----+
| bob|false|  51|
|carl|false|null|
|john| true|  21|
|null| null|  33|
+----+-----+----+

Dropping columns

In [112]:
#Comma separated (e.g. df.drop("Name", "Field2", "Field3"))
df = df.drop("Name")
In [113]:
df.show()
+----+
| age|
+----+
|  51|
|null|
|  21|
|  33|
+----+

Ranking DF

In [135]:
from pyspark.sql.window import Window
df = my_spark.read.format("csv").option("header", "true").load("salestest.csv")
In [127]:
#groupby and aggregate
df = df.groupBy(['Customer']).agg( sf.sum('sales').alias('sales') )
df.show()
+--------+---------+
|Customer|    sales|
+--------+---------+
|     hhh|4691191.0|
|     fff| 334546.0|
|     ggg|  34455.0|
|     eee|   6487.0|
+--------+---------+

In [134]:
ranked =  df.withColumn("rank", rank().over(Window.orderBy(desc("sales"))))
ranked.show()
+--------+---------+----+
|Customer|    sales|rank|
+--------+---------+----+
|     hhh|4691191.0|   1|
|     fff| 334546.0|   2|
|     ggg|  34455.0|   3|
|     eee|   6487.0|   4|
+--------+---------+----+

In [137]:
#Or we could rank per customer to show the ranking on each of their transactions
ranked =  df.withColumn(
  "rank", dense_rank().over(Window.partitionBy("Customer").orderBy(desc("Sales"))))
In [138]:
ranked.show()
+--------+-------+----+
|Customer|  Sales|rank|
+--------+-------+----+
|     hhh|4343534|   1|
|     hhh| 324234|   2|
|     hhh|  23423|   3|
|     fff| 334546|   1|
|     ggg|  34455|   1|
|     eee|   5253|   1|
|     eee|   1234|   2|
+--------+-------+----+

Joining dataframes

In [ ]:
left_join = dataframe1.join(dataframe2, dataframe1.fieldname == dataframe2.fieldname,how='left')