Working with RDDs

Some RDD functions

  • reduceByKey: Combine all values with the same key
  • groupByKey: group values with the same key
  • sortByKey: sort RDD by key values
  • Split RDDs from the keys and values (2 separate RDDs)
In [41]:
import findspark
findspark.init()
import pyspark

#UserID | Name | Age | Num_Friends
#r before the filepath converts it to a raw string
lines = sc.textFile(r"c:\Users\kiera\Downloads\fakefriends.csv") 

#For each line in the file, split it at the comma
#split 2 is the age 
#Split 3 is the number of friends
def splitlines(line):
    fields = line.split(',')
    age = int(fields[2])
    numFriends = int(fields[3])
    return (age, numFriends)
    
#rdd2 is just structured as below
#Age | Friends
rdd2 = lines.map(splitlines)

#Lambda Functions:
#-----------------
#rdd2.mapValues(lambda x: (x, 1)) means, for each line, take X (which is the value column, not the key) print X and another column with value 1, which is the number of users the line relates to. From this, we will be able to take the total number of friends and divide it by the total number of users to find average per user
#.reduceByKey(lambda x, y:(x[0] + y[0], x[1] + y[1])) says, for each X and Y from the above (number of friends + number of users) add up the total friends (i.e. x[0]}y[0]) and also sum the total number of users (i.e. x[1] + y[1])
totalsByAge = rdd2.mapValues(lambda x: (x, 1)).reduceByKey(lambda x, y:(x[0] + y[0], x[1] + y[1]))

#Calculate averages where x[0] is number of friends and x[1] is number of users
AvgByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
In [43]:
AvgByAge.take(5)
Out[43]:
[(26, 242.05882352941177),
 (40, 250.8235294117647),
 (68, 269.6),
 (54, 278.0769230769231),
 (38, 193.53333333333333)]

This is a headache.... why don't we use a dataframe?

RDDs are painful, that's why we now have Dataframes as an option in Spark. The same outcome is achieved with the below as is with the above. But I think the code is much more readable

In [36]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *
import pyspark.sql.functions as sf

# Create my_spark_session
# if it already exists, it'll get, otherwise it'll create
my_spark = SparkSession.builder.appName('myapp').getOrCreate()

df = my_spark.read.format("csv").option("header", "false").load(r"c:\Users\kiera\Downloads\fakefriends.csv")

#Rename columns to make them easier to work with
df = df.withColumnRenamed('_c0', 'UserID')\
        .withColumnRenamed('_c1', 'Name')\
        .withColumnRenamed('_c2', 'Age')\
        .withColumnRenamed('_c3', 'Friends')

#Groupby age; sum the number of friends & count the number of unique user IDs
totalsByAge = df.groupBy(['Age']).agg( sf.sum('Friends').alias('NumberFriends'), sf.countDistinct('UserID').alias('NumberUsers') )

#Calculate the average by dividing the number of friends by the number of users with those friends
averages = totalsByAge.withColumn("Avg", totalsByAge.NumberFriends/totalsByAge.NumberUsers)
In [38]:
averages.show(5)
+---+-------------+-----------+------------------+
|Age|NumberFriends|NumberUsers|               Avg|
+---+-------------+-----------+------------------+
| 51|       2115.0|          7|302.14285714285717|
| 54|       3615.0|         13| 278.0769230769231|
| 29|       2591.0|         12|215.91666666666666|
| 69|       2352.0|         10|             235.2|
| 42|       1821.0|          6|             303.5|
+---+-------------+-----------+------------------+
only showing top 5 rows

In [ ]: