Let's do this with a Spark RDD

In [9]:
import findspark
import pyspark

#UserID | Name | Age | Num_Friends
#r before the filepath converts it to a raw string
lines = sc.textFile(r"c:\Users\kiera\Downloads\1800.csv") 

def parseLine(line):
    fields = line.split(',')
    stationID = fields[0]
    entryType = fields[2]
    #Convert to farenheight
    temperature = float(fields[3]) * 0.1 * (9.0 / 5.0) + 32.0
    return (stationID, entryType, temperature)

parsedLines = lines.map(parseLine)

#Parsed lines has this structure. StationID | EntryType | Temp
#So, we want to find records where column 1 includes 'TMIN'
minTemps = parsedLines.filter(lambda x: "TMIN" in x[1])

#Min Temps has the structure StationID | Temperature, which are fields zero and two of the minTemps RDD
#This simply removes the EntryType field as now everything is TMIN
stationTemps = minTemps.map(lambda x: (x[0], x[2]))

#Station temps has the structure StationID | Temperature
#reduceByKey aggregates every minimum temperature of the year and take the minimum
minTemps = stationTemps.reduceByKey(lambda x, y: min(x,y))

#Extract the results
results = minTemps.collect();

#Print the results
for result in results:
    print(result[0] + "\t{:.2f}F".format(result[1]))
ITE00100554	5.36F
EZE00100082	7.70F

Let's do the same in a Spark dataframe

In [15]:
# Import SparkSession from pyspark.sql
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import *
import pyspark.sql.functions as sf

# Create my_spark_session
# if it already exists, it'll get, otherwise it'll create
my_spark = SparkSession.builder.appName('myapp').getOrCreate()

df = my_spark.read.format("csv").option("header", "false").load(r"c:\Users\kiera\Downloads\1800.csv")

#Rename columns to make them easier to work with
df = df.withColumnRenamed('_c0', 'StationID')\
        .withColumnRenamed('_c1', 'Entry')\
        .withColumnRenamed('_c2', 'Type')\
        .withColumnRenamed('_c3', 'Temp')

#Filter the dataframe for specific criteria
df = df.filter(df['Type'] == 'TMIN').select(['StationID', 'Temp'])

#Add a column with the temp in farenheight 
df = df.withColumn('TempF', df['Temp']* 0.1 * (9.0 / 5.0) + 32.0)

#Find the minimum 
df.groupBy(['StationID']).agg( sf.min('TempF') ).head(10)
[Row(StationID='ITE00100554', min(TempF)=5.359999999999999),
 Row(StationID='EZE00100082', min(TempF)=7.699999999999999)]
In [ ]: