What are RDDs?

The RDD is the core of what Spark does & how it works. It stands for Resillient Distributed Dataset. Effectively, it's an abstraction for a large dataset. You then call functions on the RDD to distribute the processing of the dataset. You don't need to worry about how it's distributed / resillient as that is handled by the cluster manager.

We frequently run the below functions on RDDs:

  • Map: Take a set of data and transform it with a function (e.g. cube all the numbers in the RDD). It maps every entry in the original RDD to an entry in the new RDD.

  • Filter: like a where clause in SQL

  • Distinct: only the unique rows in the RDD

  • Sample: to take a sample from the RDD to get a smaller dataset from it for testing purposes

  • Union, intersections, subtractions and cartesian product

We can also use:

  • Collect: extract all the values
  • Count
  • CountByValue: Breakdown by unique value etc...
In [9]:
#An example RDD. Here, we input the data 1,2,3,4. We then use the map function to make an new RDD which has the squared value of the inputs
rdd = sc.parallelize([1, 2, 3, 4])
rdd2 = rdd.map(lambda x: x*x)
print(rdd2.collect())
[1, 4, 9, 16]

Lambda is a nameless function. It does exactly the same as below, but in shorterhand

In [11]:
def squared(x):
    return x*x
    
rdd2 = rdd.map(squared)
print(rdd2.collect())
[1, 4, 9, 16]

A PySpark example

In [18]:
import findspark
findspark.init()
import pyspark
import collections
import os

#Make sure environment variables are set correctly
os.environ['SPARK_HOME'] = "c:\spark"
os.environ['JAVA_HOME'] = "C:\jdk2"
os.environ['PYSPARK_DRIVER_PYTHON'] = "jupyter"
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = "notebook"
 
'''
Spark Session has replaced the Spark Context of pre-version2 Spark. The Spark Session allows for the creation of Dataframes based on RDDs and accessing SparkSQL services.
'''
from pyspark.sql import SparkSession
from pyspark import  SQLContext

spark = SparkSession.builder.getOrCreate()

'''
ABOUT THE DATASOURCE:
---------------------
u.data: 
The full u data set, 100000 ratings by 943 users on 1682 items. 
Each user has rated at least 20 movies.  Users and items are numbered consecutively from 1.  The data is randomly
ordered. This is a tab separated list of 
user id | item id | rating | timestamp. 
The time stamps are unix seconds since 1/1/1970 UTC  
'''

#Load the .data file into RDD - source: https://grouplens.org/datasets/movielens/
#Each row of the RDD is the entire line of the text file
rdd = sc.textFile("c:\ml-100k/u.data") 

#lambda X (i.e. the line) and then we split. Now, we take column 2 (rating) only
ratings = rdd.map(lambda x: x.split()[2])

#Now we have the list of ratings and can count by value (i.e. how many 1 star, 2 star...)
result = ratings.countByValue()

#sort the results and put them into a dictionary
sortedResults = collections.OrderedDict(sorted(result.items()))

print('The Dictionary')
print(sortedResults)

print('The Output:')
#now, let's extract each key from the dictionary and print them
for key, value in sortedResults.items():
    print("%s %i" % (key, value))
The Dictionary
OrderedDict([('1', 6110), ('2', 11370), ('3', 27145), ('4', 34174), ('5', 21201)])
The Output:
1 6110
2 11370
3 27145
4 34174
5 21201
In [ ]: