Using the LEAD function in Hive, we take the end time as being the timestamp of the terminating transaction. lead(timestamp,1) over(partition by userID, SessionNumber order by timestamp) as endtime

Then, we convert those numbers to unixtime (seconds). unix_timestamp(start_time) as session_start

This gives us the correct input structure for this Spark job

In [ ]:
from pyspark.sql import SparkSession
from datetime import datetime

now = datetime.now()
yday  = long(now.strftime('%s')) - 24*60*60
yday_date = datetime.fromtimestamp(yday).strftime('%Y%m%d')

spark = SparkSession\
.builder\
.appName('App_Name')\
.master('yarn')\
.enableHiveSupport()\
.getOrCreate()

import datetime
import time
import pyspark.sql.functions as F
from pyspark.sql.functions import col

#The below prints your results to your chosen destination (Hive, Stdout, CSV)

print('data load starting...')

cmd = '''select * from db.table'''
df1 = spark.sql(cmd)
df1.printSchema()
print('data ingested successfully')

print('setting variables...')

#Split the date up, so we can inject it into the timestart and timeend variables
yyyy = yday_date[:4]
mm = yday_date[4:6]
dd = yday_date[6:8]

timestart= yyyy + '-' + mm + '-' + dd + ' 00:00:00'
timeend= yyyy + '-' + mm + '-' + dd + ' 23:59:59'
time_to_check = datetime.datetime.strptime(timestart, '%Y-%m-%d %H:%M:%S')

iters = 0
session = 0
add = []

#There are 96 blocks of 15 minutes over a day, so we will iterate that many times & add 900 seconds (15 minutes) to the time each iteration
#Dates have been converted to unixtime seconds to make comparison easier.
print('begin iteration...')
while iters < 97:

    time_to_add = iters * 900
    time_to_checkx = time_to_check + datetime.timedelta(seconds=time_to_add)
    time_to_checky = time.mktime(time_to_checkx.timetuple())
    stringtime = time_to_checkx.strftime("%m/%d/%Y, %H:%M:%S")

    iters = iters + 1

    spark_date_format = "YYYY-MM-dd hh:mm:ss"
    df1 = df1.withColumn('start_timestamp', F.to_timestamp(df1.start_time, spark_date_format))
    df1 = df1.withColumn('end_timestamp', F.to_timestamp(df1.end_time, spark_date_format))
    
    #Select only records where the session started before and ended after the time being checked
    filterx = df1.filter( ((df1.session_start < time_to_checky) & (df1.session_end > time_to_checky )) | ((df1.session_start < time_to_checky) & (F.isnull(df1.end_time))))
    
    #Count the number of records output
    session = filterx.count()
    newrow = [stringtime, session]
    add.append(newrow)

    df1.show()


import pandas as pd
output = pd.DataFrame.from_records(add)
output.columns = ['time','count']
output = output.groupby(['time'])[['count']].agg('sum').reset_index()
output.to_csv(yday_date+'concsessions.csv', sep=',')

#Copy the output CSV to HDFS
command = 'hdfs dfs -copyFromLocal '+yday_date+'concsessions.csv /user/username/spark_exports/foldername/'

import os
os.system(command)

#Delete the local copy of the CSV
command2 = 'rm ' + yday_date + 'concsessions.csv'

os.system(command2)