In [ ]:
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as sqlfunc
from pyspark.sql.types import *
import argparse, sys
from pyspark.sql import *
import pyspark.sql.functions as sqlfunc
import pandas as pd
import time
from datetime import datetime
import os
import glob
def create_session(appname):
    spark_session = SparkSession\
        .builder\
        .appName(appname)\
        .master('yarn')\
        .config("hive.metastore.uris", "thrift://server_host_name:9083")\
        .enableHiveSupport()\
        .getOrCreate()
    return spark_session
### START MAIN ###
if __name__ == '__main__':
    spark_session = create_session('testing_files')
    #import file into dataframe
    start = time.time()
    #--------------------------------------------------------------------------------------------
    #-----------------------------CALCUALTE DATES AND TIMES FOR QUERY----------------------------
    #--------------------------------------------------------------------------------------------
    dt_now = datetime.now()
    target_hour = int(dt_now.strftime('%s')) - 60*60*12
    today_date = datetime.fromtimestamp(target_hour).strftime('%Y%m%d')
    hour = datetime.fromtimestamp(target_hour).strftime('%H')
    #--------------------------------------------------------------------------------------------
    #-----------------------------------CREATE DF FROM FILES ------------------------------------
    #--------------------------------------------------------------------------------------------
    schema = [\
        StructField('domain',StringType(), True),\
        StructField('use',LongType(), True)]
        final_structure = StructType(schema)

       df = spark_session.read\
           .option("header","false")\
           .option("delimiter", "\t")\
           .csv('directory-path/*/*', final_structure)\
           .select('domain', 'use')
       df2 = df.filter(df.domain != '----').groupby('domain').agg(sqlfunc.sum(df.use).alias('totaluse'))
       df2.show()
       df3 = df2.toPandas()

   #--------------------------------------------------------------------------------------------
   #-----------------------------DEFINE REQUIRED LOOKUP LISTS-----------------------------------
   #--------------------------------------------------------------------------------------------
   tld = ('co.uk', 'com', 'org', 'gov.uk', 'co', 'net', 'news', 'it', 'in' 'es', 'tw', 'pe', 'io', 'ca', 'cat', 'com.au',
     'com.ar', 'com.mt', 'com.co', 'ws', 'to', 'es', 'de', 'us', 'br', 'im', 'gr', 'cc', 'cn', 'org.uk', 'me', 'ovh', 'be',
     'tv', 'tech', '..', 'life', 'com.mx', 'pl', 'uk', 'ru', 'cz', 'st', 'info', 'mobi', 'today', 'eu', 'fi', 'jp', 'life',
     '1', '2', '3', '4', '5', '6', '7', '8', '9', '0', 'earth', 'ninja', 'ie', 'im', 'ai', 'at', 'ch', 'ly', 'market', 'click',
     'fr', 'nl', 'se')
   cdns = ('akamai', 'akamaized', 'maxcdn', 'cloudflare')
   cleandomain = []
   #--------------------------------------------------------------------------------------------
   #-----------------------------SPLIT DOMAIN AT EVERY DOT--------------------------------------
   #--------------------------------------------------------------------------------------------
   index = df3.domain.str.split('.').tolist()
   #--------------------------------------------------------------------------------------------
   #------------------DEFINE FUNCTION FOR DOMAIN MANIPULATION-----------------------------------
   #--------------------------------------------------------------------------------------------
   def domfunction():
       #if it isn't a string, then print the value directly in the cleandomain list
       try:
           if str(x[-1]).isdigit():
               try:
                   cleandomain.append(str(x[0])+'.'+str(x[1])+'.*.*')
               except IndexError:
                   cleandomain.append(str(x))
           #if its in the CDN list, take a subdomain as well
           elif len(x) > 3 and str(x[len(x)-2]).rstrip() in cdns:
               try:
                   cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+str(x[len(x)-1]))
               except IndexError:
                   cleandomain.append(str(x))
           elif len(x) > 3 and str(x[len(x)-3]).rstrip() in cdns:
               try:
                   cleandomain.append(str(x[len(x)-4])+'.'+str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
               except IndexError:
                   cleandomain.append(str(x))
           #if its in the TLD list, do this
           elif len(x) > 2 and str(x[len(x)-2]).rstrip()+'.'+ str(x[len(x)-1]).rstrip() in tld:
               try:
                   cleandomain.append(str(x[len(x)-3])+'.'+str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
               except IndexError:
                   cleandomain.append(str(x))
           elif len(x) > 2 and str(x[len(x)-1]) in tld:
               try:
                   cleandomain.append(str(x[len(x)-2])+'.'+ str(x[len(x)-1]))
               except IndexError:
                   cleandomain.append(str(x))
           #if its not in the TLD list, do this
           else:
             cleandomain.append(str(x))
       except IndexError:
             cleandomain.append(str(x))
       except TypeError:
             cleandomain.append(str(x))
   #--------------------------------------------------------------------------------------------
   #-------------LOOP OVER ITEMS WITHIN THE INDEX & CONCAT REQUIRED ELEMENTS--------------------
   #--------------------------------------------------------------------------------------------
   for x in index:
       domfunction()
   #--------------------------------------------------------------------------------------------
   #-------------------------------CONFIGURE OUTPUTS--------------------------------------------
   #--------------------------------------------------------------------------------------------
   #add the column to the dataframe
   se = pd.Series(cleandomain)
   df3['newdomain2'] = se.values
   #select only the new domain column & usage & group by
   df5 = df3.groupby(['newdomain2'],as_index = False)[['totaluse']].sum()
   df6 = df5.sort_values(['totaluse'], ascending=["true"])
   print(df6)
   mySchema = StructType([ StructField("newdomain2", StringType(), True)\
                          ,StructField("totaluse", StringType(), True)])
   final_structure = StructType(mySchema)
   spark_df = spark_session.createDataFrame(df6, schema=mySchema)
   spark_df.show()
   end = time.time()
   print("RunTime:")
   print(end-start)