AWS EMR Spark and Data Lake

An ETL pipeline that extracts data from S3, processes them using Spark, and loads the data back into S3 as a set of dimensional tables.

Create a Data Lake with Spark and AWS EMR

  1. create a ssh key-pair to securely connect to the EMR cluster
  2. create an EMR cluster

  3. ssh into the master node
  4. access master node jupyter notebook
print("Welcome to my EMR Notebook!")
VBox()


Starting Spark application
IDYARN Application IDKindStateSpark UIDriver logCurrent session?
1application_1573680517609_0004pysparkidleLinkLink
FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


SparkSession available as 'spark'.



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Welcome to my EMR Notebook!
%%info

Current session configs: {'conf': {'spark.pyspark.python': 'python3', 'spark.pyspark.virtualenv.enabled': 'true', 'spark.pyspark.virtualenv.type': 'native', 'spark.pyspark.virtualenv.bin.path': '/usr/bin/virtualenv'}, 'kind': 'pyspark'}

IDYARN Application IDKindStateSpark UIDriver logCurrent session?
1application_1573680517609_0004pysparkidleLinkLink
sc.list_packages()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1  
boto                       2.49.0 
jmespath                   0.9.4  
lxml                       4.4.1  
mysqlclient                1.4.4  
nltk                       3.4.5  
nose                       1.3.4  
numpy                      1.14.5 
pip                        19.3.1 
py-dateutil                2.2    
python36-sagemaker-pyspark 1.2.6  
pytz                       2019.3 
PyYAML                     3.11   
setuptools                 41.6.0 
six                        1.12.0 
soupsieve                  1.9.4  
wheel                      0.33.6 
windmill                   1.6
sc.install_pypi_package("pandas==0.25.1")
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Collecting pandas==0.25.1
  Downloading https://files.pythonhosted.org/packages/73/9b/52e228545d14f14bb2a1622e225f38463c8726645165e1cb7dde95bfe6d4/pandas-0.25.1-cp36-cp36m-manylinux1_x86_64.whl (10.5MB)
Requirement already satisfied: numpy>=1.13.3 in /usr/local/lib64/python3.6/site-packages (from pandas==0.25.1) (1.14.5)
Collecting python-dateutil>=2.6.1
  Downloading https://files.pythonhosted.org/packages/d4/70/d60450c3dd48ef87586924207ae8907090de0b306af2bce5d134d78615cb/python_dateutil-2.8.1-py2.py3-none-any.whl (227kB)
Requirement already satisfied: pytz>=2017.2 in /usr/local/lib/python3.6/site-packages (from pandas==0.25.1) (2019.3)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.6/site-packages (from python-dateutil>=2.6.1->pandas==0.25.1) (1.12.0)
Installing collected packages: python-dateutil, pandas
Successfully installed pandas-0.25.1 python-dateutil-2.8.1
sc.install_pypi_package("matplotlib", "https://pypi.org/simple") #Install matplotlib from given PyPI repository
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Collecting matplotlib
  Downloading https://files.pythonhosted.org/packages/57/4f/dd381ecf6c6ab9bcdaa8ea912e866dedc6e696756156d8ecc087e20817e2/matplotlib-3.1.1-cp36-cp36m-manylinux1_x86_64.whl (13.1MB)
Requirement already satisfied: numpy>=1.11 in /usr/local/lib64/python3.6/site-packages (from matplotlib) (1.14.5)
Collecting kiwisolver>=1.0.1
  Downloading https://files.pythonhosted.org/packages/f8/a1/5742b56282449b1c0968197f63eae486eca2c35dcd334bab75ad524e0de1/kiwisolver-1.1.0-cp36-cp36m-manylinux1_x86_64.whl (90kB)
Requirement already satisfied: python-dateutil>=2.1 in /mnt/tmp/1573683191129-0/lib/python3.6/site-packages (from matplotlib) (2.8.1)
Collecting pyparsing!=2.0.4,!=2.1.2,!=2.1.6,>=2.0.1
  Downloading https://files.pythonhosted.org/packages/c0/0c/fc2e007d9a992d997f04a80125b0f183da7fb554f1de701bbb70a8e7d479/pyparsing-2.4.5-py2.py3-none-any.whl (67kB)
Collecting cycler>=0.10
  Downloading https://files.pythonhosted.org/packages/f7/d2/e07d3ebb2bd7af696440ce7e754c59dd546ffe1bbe732c8ab68b9c834e61/cycler-0.10.0-py2.py3-none-any.whl
Requirement already satisfied: setuptools in /mnt/tmp/1573683191129-0/lib/python3.6/site-packages (from kiwisolver>=1.0.1->matplotlib) (41.6.0)
Requirement already satisfied: six>=1.5 in /usr/local/lib/python3.6/site-packages (from python-dateutil>=2.1->matplotlib) (1.12.0)
Installing collected packages: kiwisolver, pyparsing, cycler, matplotlib
Successfully installed cycler-0.10.0 kiwisolver-1.1.0 matplotlib-3.1.1 pyparsing-2.4.5
sc.list_packages()
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1  
boto                       2.49.0 
cycler                     0.10.0 
jmespath                   0.9.4  
kiwisolver                 1.1.0  
lxml                       4.4.1  
matplotlib                 3.1.1  
mysqlclient                1.4.4  
nltk                       3.4.5  
nose                       1.3.4  
numpy                      1.14.5 
pandas                     0.25.1 
pip                        19.3.1 
py-dateutil                2.2    
pyparsing                  2.4.5  
python-dateutil            2.8.1  
python36-sagemaker-pyspark 1.2.6  
pytz                       2019.3 
PyYAML                     3.11   
setuptools                 41.6.0 
six                        1.12.0 
soupsieve                  1.9.4  
wheel                      0.33.6 
windmill                   1.6
df = spark.read.parquet('s3://amazon-reviews-pds/parquet/product_category=Books/*.parquet')
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…
df.printSchema()
num_of_books = df.select('product_id').distinct().count()
print(f'Number of Books: {num_of_books:,}')
VBox()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…


root
 |-- marketplace: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- review_id: string (nullable = true)
 |-- product_id: string (nullable = true)
 |-- product_parent: string (nullable = true)
 |-- product_title: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- helpful_votes: integer (nullable = true)
 |-- total_votes: integer (nullable = true)
 |-- vine: string (nullable = true)
 |-- verified_purchase: string (nullable = true)
 |-- review_headline: string (nullable = true)
 |-- review_body: string (nullable = true)
 |-- review_date: date (nullable = true)
 |-- year: integer (nullable = true)

Number of Books: 3,423,743
  1. install python libraries
sudo easy_install-3.6 pip 
sudo /usr/local/bin/pip3 install paramiko nltk scipy scikit-learn pandas
  1. upload file to EMR

  2. spark-submit job

import configparser
from datetime import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col
from pyspark.sql.functions import year, month, dayofmonth, hour, weekofyear, date_format
from pyspark.sql import functions as F
from pyspark.sql import types as T

import pandas as pd
pd.set_option('display.max_columns', 500)
config = configparser.ConfigParser()

#Normally this file should be in ~/.aws/credentials
config.read_file(open('dl.cfg'))

os.environ["AWS_ACCESS_KEY_ID"]= config['AWS']['AWS_ACCESS_KEY_ID']
os.environ["AWS_SECRET_ACCESS_KEY"]= config['AWS']['AWS_SECRET_ACCESS_KEY']
def create_spark_session():
    spark = SparkSession \
        .builder \
        .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:2.7.0") \
        .getOrCreate()
    return spark
# create the spark session
spark = create_spark_session()
# read data from my S3 bucket. This is the same data in workspace
songPath = 's3a://testemrs3/song_data/*/*/*/*.json'
logPath = 's3a://testemrs3/log_data/*.json'
# define output paths
output = 's3a://testemrs3/schema/'

process song data

create song_table

# Step 1: Read in the song data
df_song = spark.read.json(songPath)
# check the schema
df_song.printSchema()
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
# Step 2: extract columns to create songs table
song_cols = ['song_id', 'title', 'artist_id', 'year', 'duration']
# groupby song_id and select the first record's title in the group.
t1 = df_song.select(F.col('song_id'), 'title') \
    .groupBy('song_id') \
    .agg({'title': 'first'}) \
    .withColumnRenamed('first(title)', 'title1')

t2 = df_song.select(song_cols)
t1.toPandas().head()
song_id title1
0 SOGOSOV12AF72A285E ¿Dónde va Chichi?
1 SOMZWCG12A8C13C480 I Didn't Mean To
2 SOUPIRU12A6D4FA1E1 Der Kleine Dompfaff
3 SOXVLOJ12AB0189215 Amor De Cabaret
4 SOWTBJW12AC468AC6E Broken-Down Merry-Go-Round
t2.toPandas().head()
song_id title artist_id year duration
0 SOBAYLL12A8C138AF9 Sono andati? Fingevo di dormire ARDR4AC1187FB371A1 0 511.16363
1 SOOLYAZ12A6701F4A6 Laws Patrolling (Album Version) AREBBGV1187FB523D2 0 173.66159
2 SOBBUGU12A8C13E95D Setting Fire to Sleeping Giants ARMAC4T1187FB3FA4C 2004 207.77751
3 SOAOIBZ12AB01815BE I Hold Your Hand In Mine [Live At Royal Albert... ARPBNLO1187FB3D52F 2000 43.36281
4 SONYPOM12A8C13B2D7 I Think My Wife Is Running Around On Me (Taco ... ARDNS031187B9924F0 2005 186.48771
song_table_df = t1.join(t2, 'song_id') \
                .where(F.col("title1") == F.col("title")) \
                .select(song_cols)
song_table_df.toPandas().head()
song_id title artist_id year duration
0 SOBAYLL12A8C138AF9 Sono andati? Fingevo di dormire ARDR4AC1187FB371A1 0 511.16363
1 SOOLYAZ12A6701F4A6 Laws Patrolling (Album Version) AREBBGV1187FB523D2 0 173.66159
2 SOBBUGU12A8C13E95D Setting Fire to Sleeping Giants ARMAC4T1187FB3FA4C 2004 207.77751
3 SOAOIBZ12AB01815BE I Hold Your Hand In Mine [Live At Royal Albert... ARPBNLO1187FB3D52F 2000 43.36281
4 SONYPOM12A8C13B2D7 I Think My Wife Is Running Around On Me (Taco ... ARDNS031187B9924F0 2005 186.48771
song_table_df.toPandas().shape
(71, 5)
df_song.toPandas().shape
(71, 10)
# Step 3: Write this to a parquet file
song_table_df.write.parquet('data/songs_table', partitionBy=['year', 'artist_id'], mode='Overwrite')
# write this to s3 bucket
song_table_df.write.parquet(output + 'songs_table', partitionBy=['year', 'artist_id'], mode='Overwrite')

create artists_table

# define the cols
artists_cols = ["artist_id", "artist_name", "artist_location", "artist_latitude", "artist_longitude"]
df_song.printSchema()
root
 |-- artist_id: string (nullable = true)
 |-- artist_latitude: double (nullable = true)
 |-- artist_location: string (nullable = true)
 |-- artist_longitude: double (nullable = true)
 |-- artist_name: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- num_songs: long (nullable = true)
 |-- song_id: string (nullable = true)
 |-- title: string (nullable = true)
 |-- year: long (nullable = true)
# groupby song_id and select the first record's title in the group.
t1 = df_song.select(F.col('artist_id'), 'artist_name') \
    .groupBy('artist_id') \
    .agg({'artist_name': 'first'}) \
    .withColumnRenamed('first(artist_name)', 'artist_name1')

t2 = df_song.select(artists_cols)
t1.toPandas().head()
artist_id artist_name1
0 AR9AWNF1187B9AB0B4 Kenny G featuring Daryl Hall
1 AR0IAWL1187B9A96D0 Danilo Perez
2 AR0RCMP1187FB3F427 Billie Jo Spears
3 AREDL271187FB40F44 Soul Mekanik
4 ARI3BMM1187FB4255E Alice Stuart
t2.toPandas().head()
artist_id artist_name artist_location artist_latitude artist_longitude
0 ARDR4AC1187FB371A1 Montserrat Caballé;Placido Domingo;Vicente Sar... NaN NaN
1 AREBBGV1187FB523D2 Mike Jones (Featuring CJ_ Mello & Lil' Bran) Houston, TX NaN NaN
2 ARMAC4T1187FB3FA4C The Dillinger Escape Plan Morris Plains, NJ 40.82624 -74.47995
3 ARPBNLO1187FB3D52F Tiny Tim New York, NY 40.71455 -74.00712
4 ARDNS031187B9924F0 Tim Wilson Georgia 32.67828 -83.22295
artists_table_df = t1.join(t2, 'artist_id') \
                .where(F.col("artist_name1") == F.col("artist_name")) \
                .select(artists_cols)
artists_table_df.toPandas().head()
artist_id artist_name artist_location artist_latitude artist_longitude
0 ARDR4AC1187FB371A1 Montserrat Caballé;Placido Domingo;Vicente Sar... NaN NaN
1 AREBBGV1187FB523D2 Mike Jones (Featuring CJ_ Mello & Lil' Bran) Houston, TX NaN NaN
2 ARMAC4T1187FB3FA4C The Dillinger Escape Plan Morris Plains, NJ 40.82624 -74.47995
3 ARPBNLO1187FB3D52F Tiny Tim New York, NY 40.71455 -74.00712
4 ARDNS031187B9924F0 Tim Wilson Georgia 32.67828 -83.22295
# write this to s3 bucket
artists_table_df.write.parquet(output + 'artists_table', mode='Overwrite')
artists_table_df.write.parquet('data/artists_table', mode='Overwrite')
# read the partitioned data
df_artists_read = spark.read.option("mergeSchema", "true").parquet("data/artists_table")

Process log data

# Step 1: Read in the log data
df_log = spark.read.json(logPath)
df_log.printSchema()
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
df_log.filter(F.col("page") == "NextSong").toPandas().head()
artist auth firstName gender itemInSession lastName length level location method page registration sessionId song status ts userAgent userId
0 Harmonia Logged In Ryan M 0 Smith 655.77751 free San Jose-Sunnyvale-Santa Clara, CA PUT NextSong 1.541017e+12 583 Sehr kosmisch 200 1542241826796 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26
1 The Prodigy Logged In Ryan M 1 Smith 260.07465 free San Jose-Sunnyvale-Santa Clara, CA PUT NextSong 1.541017e+12 583 The Big Gundown 200 1542242481796 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26
2 Train Logged In Ryan M 2 Smith 205.45261 free San Jose-Sunnyvale-Santa Clara, CA PUT NextSong 1.541017e+12 583 Marry Me 200 1542242741796 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26
3 Sony Wonder Logged In Samuel M 0 Gonzalez 218.06975 free Houston-The Woodlands-Sugar Land, TX PUT NextSong 1.540493e+12 597 Blackbird 200 1542253449796 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... 61
4 Van Halen Logged In Tegan F 2 Levine 289.38404 paid Portland-South Portland, ME PUT NextSong 1.540794e+12 602 Best Of Both Worlds (Remastered Album Version) 200 1542260935796 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... 80
# Step 2: filter by actions for song plays
df_log = df_log.filter(F.col("page") == "NextSong")
df_log.toPandas().shape
(6820, 18)
# Step 3: extract columns for users table
users_cols = ["userId", "firstName", "lastName", "gender", "level"]
df_log.select(users_cols).toPandas().head()
userId firstName lastName gender level
0 26 Ryan Smith M free
1 26 Ryan Smith M free
2 26 Ryan Smith M free
3 61 Samuel Gonzalez M free
4 80 Tegan Levine F paid
df_log.select(users_cols).toPandas().shape
(6820, 5)
df_log.select(users_cols).dropDuplicates().toPandas().shape
(104, 5)
df_log.select(users_cols).dropDuplicates().toPandas().head()
userId firstName lastName gender level
0 57 Katherine Gay F free
1 84 Shakira Hunt F free
2 22 Sean Wilson F free
3 52 Theodore Smith M free
4 80 Tegan Levine F paid
users_table_df = df_log.select(users_cols).dropDuplicates()
# write this to s3 bucket
users_table_df.write.parquet(output + 'users_table', mode='Overwrite')
users_table_df.write.parquet('data/users_table', mode='Overwrite')

Time table

# # create timestamp column from original timestamp column
get_timestamp = udf()
df_log.select('ts').toPandas().head()
ts
0 1542241826796
1 1542242481796
2 1542242741796
3 1542253449796
4 1542260935796
df.withColumn('epoch', f.date_format(df.epoch.cast(dataType=t.TimestampType()), "yyyy-MM-dd"))

df_log.withColumn('ts', F.date_format(df_log.ts.cast(dataType=T.TimestampType()), "yyyy-MM-dd")).toPandas().head()
artist auth firstName gender itemInSession lastName length level location method page registration sessionId song status ts userAgent userId
0 Harmonia Logged In Ryan M 0 Smith 655.77751 free San Jose-Sunnyvale-Santa Clara, CA PUT NextSong 1.541017e+12 583 Sehr kosmisch 200 50841-09-12 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26
1 The Prodigy Logged In Ryan M 1 Smith 260.07465 free San Jose-Sunnyvale-Santa Clara, CA PUT NextSong 1.541017e+12 583 The Big Gundown 200 50841-09-19 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26
2 Train Logged In Ryan M 2 Smith 205.45261 free San Jose-Sunnyvale-Santa Clara, CA PUT NextSong 1.541017e+12 583 Marry Me 200 50841-09-22 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26
3 Sony Wonder Logged In Samuel M 0 Gonzalez 218.06975 free Houston-The Woodlands-Sugar Land, TX PUT NextSong 1.540493e+12 597 Blackbird 200 50842-01-24 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... 61
4 Van Halen Logged In Tegan F 2 Levine 289.38404 paid Portland-South Portland, ME PUT NextSong 1.540794e+12 602 Best Of Both Worlds (Remastered Album Version) 200 50842-04-21 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... 80
df_time = df_log.select('ts')
df_time.take(5)
[Row(ts=1542241826796),
 Row(ts=1542242481796),
 Row(ts=1542242741796),
 Row(ts=1542253449796),
 Row(ts=1542260935796)]
@udf
def gettimestamp(time):
    import datetime
    time = time/1000
    return datetime.datetime.fromtimestamp(time).strftime("%m-%d-%Y %H:%M:%S")
df_time.withColumn("timestamp", gettimestamp("ts")).show()
+-------------+-------------------+
|           ts|          timestamp|
+-------------+-------------------+
|1542241826796|11-15-2018 00:30:26|
|1542242481796|11-15-2018 00:41:21|
|1542242741796|11-15-2018 00:45:41|
|1542253449796|11-15-2018 03:44:09|
|1542260935796|11-15-2018 05:48:55|
|1542261224796|11-15-2018 05:53:44|
|1542261356796|11-15-2018 05:55:56|
|1542261662796|11-15-2018 06:01:02|
|1542262057796|11-15-2018 06:07:37|
|1542262233796|11-15-2018 06:10:33|
|1542262434796|11-15-2018 06:13:54|
|1542262456796|11-15-2018 06:14:16|
|1542262679796|11-15-2018 06:17:59|
|1542262728796|11-15-2018 06:18:48|
|1542262893796|11-15-2018 06:21:33|
|1542263158796|11-15-2018 06:25:58|
|1542263378796|11-15-2018 06:29:38|
|1542265716796|11-15-2018 07:08:36|
|1542265929796|11-15-2018 07:12:09|
|1542266927796|11-15-2018 07:28:47|
+-------------+-------------------+
only showing top 20 rows
df_time.printSchema()
root
 |-- ts: long (nullable = true)

get_timestamp = F.udf(lambda x: datetime.fromtimestamp( (x/1000.0) ), T.TimestampType()) 
get_hour = F.udf(lambda x: x.hour, T.IntegerType()) 
get_day = F.udf(lambda x: x.day, T.IntegerType()) 
get_week = F.udf(lambda x: x.isocalendar()[1], T.IntegerType()) 
get_month = F.udf(lambda x: x.month, T.IntegerType()) 
get_year = F.udf(lambda x: x.year, T.IntegerType()) 
get_weekday = F.udf(lambda x: x.weekday(), T.IntegerType()) 
df_log = df_log.withColumn("timestamp", get_timestamp(df_log.ts))
df_log = df_log.withColumn("hour", get_hour(df_log.timestamp))
df_log = df_log.withColumn("day", get_day(df_log.timestamp))
df_log = df_log.withColumn("week", get_week(df_log.timestamp))
df_log = df_log.withColumn("month", get_month(df_log.timestamp))
df_log = df_log.withColumn("year", get_year(df_log.timestamp))
df_log = df_log.withColumn("weekday", get_weekday(df_log.timestamp))
df_log.limit(5).toPandas()
artist auth firstName gender itemInSession lastName length level location method ... ts userAgent userId timestamp hour day week month year weekday
0 Harmonia Logged In Ryan M 0 Smith 655.77751 free San Jose-Sunnyvale-Santa Clara, CA PUT ... 1542241826796 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26 2018-11-15 00:30:26.796 0 15 46 11 2018 3
1 The Prodigy Logged In Ryan M 1 Smith 260.07465 free San Jose-Sunnyvale-Santa Clara, CA PUT ... 1542242481796 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26 2018-11-15 00:41:21.796 0 15 46 11 2018 3
2 Train Logged In Ryan M 2 Smith 205.45261 free San Jose-Sunnyvale-Santa Clara, CA PUT ... 1542242741796 "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... 26 2018-11-15 00:45:41.796 0 15 46 11 2018 3
3 Sony Wonder Logged In Samuel M 0 Gonzalez 218.06975 free Houston-The Woodlands-Sugar Land, TX PUT ... 1542253449796 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... 61 2018-11-15 03:44:09.796 3 15 46 11 2018 3
4 Van Halen Logged In Tegan F 2 Levine 289.38404 paid Portland-South Portland, ME PUT ... 1542260935796 "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... 80 2018-11-15 05:48:55.796 5 15 46 11 2018 3

5 rows × 25 columns

time_cols = ["timestamp", "hour", "day", "week", "month", "year", "weekday"]
time_table_df = df_log.select(time_cols)
# write to parquet file partition by 
time_table_df.write.parquet('data/time_table', partitionBy=['year', 'month'], mode='Overwrite')
# write this to s3 bucket
time_table_df.write.parquet(output + 'time_table', partitionBy=['year', 'month'], mode='Overwrite')

SongPlay table

df_log.printSchema()
root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: double (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- hour: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- week: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- weekday: integer (nullable = true)
songplay_cols_temp = ["timestamp", "userId", "sessionId", "location", "userAgent", "level"]
df_log.select(songplay_cols).toPandas().head()
timestamp userId sessionId location userAgent level
0 2018-11-15 00:30:26.796 26 583 San Jose-Sunnyvale-Santa Clara, CA "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... free
1 2018-11-15 00:41:21.796 26 583 San Jose-Sunnyvale-Santa Clara, CA "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... free
2 2018-11-15 00:45:41.796 26 583 San Jose-Sunnyvale-Santa Clara, CA "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... free
3 2018-11-15 03:44:09.796 61 597 Houston-The Woodlands-Sugar Land, TX "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... free
4 2018-11-15 05:48:55.796 80 602 Portland-South Portland, ME "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4... paid
# read the partitioned data
df_artists_read = spark.read.option("mergeSchema", "true").parquet("data/artists_table")
df_songs_read = spark.read.option("mergeSchema", "true").parquet("data/songs_table")
df_artists_read.toPandas().head()
artist_id artist_name artist_location artist_latitude artist_longitude
0 ARDR4AC1187FB371A1 Montserrat Caballé;Placido Domingo;Vicente Sar... NaN NaN
1 AREBBGV1187FB523D2 Mike Jones (Featuring CJ_ Mello & Lil' Bran) Houston, TX NaN NaN
2 ARMAC4T1187FB3FA4C The Dillinger Escape Plan Morris Plains, NJ 40.82624 -74.47995
3 ARPBNLO1187FB3D52F Tiny Tim New York, NY 40.71455 -74.00712
4 ARDNS031187B9924F0 Tim Wilson Georgia 32.67828 -83.22295
df_songs_read.toPandas().head()
song_id title duration year artist_id
0 SOAOIBZ12AB01815BE I Hold Your Hand In Mine [Live At Royal Albert... 43.36281 2000 ARPBNLO1187FB3D52F
1 SONYPOM12A8C13B2D7 I Think My Wife Is Running Around On Me (Taco ... 186.48771 2005 ARDNS031187B9924F0
2 SODREIN12A58A7F2E5 A Whiter Shade Of Pale (Live @ Fillmore West) 326.00771 0 ARLTWXK1187FB5A3F8
3 SOYMRWW12A6D4FAB14 The Moon And I (Ordinary Day Album Version) 267.70240 0 ARKFYS91187B98E58F
4 SOWQTQZ12A58A7B63E Streets On Fire (Explicit Album Version) 279.97995 0 ARPFHN61187FB575F6
# merge song and artists
df_songs_read.join(df_artists_read, 'artist_id').toPandas().head()
artist_id song_id title duration year artist_name artist_location artist_latitude artist_longitude
0 ARPBNLO1187FB3D52F SOAOIBZ12AB01815BE I Hold Your Hand In Mine [Live At Royal Albert... 43.36281 2000 Tiny Tim New York, NY 40.71455 -74.00712
1 ARDNS031187B9924F0 SONYPOM12A8C13B2D7 I Think My Wife Is Running Around On Me (Taco ... 186.48771 2005 Tim Wilson Georgia 32.67828 -83.22295
2 ARLTWXK1187FB5A3F8 SODREIN12A58A7F2E5 A Whiter Shade Of Pale (Live @ Fillmore West) 326.00771 0 King Curtis Fort Worth, TX 32.74863 -97.32925
3 ARKFYS91187B98E58F SOYMRWW12A6D4FAB14 The Moon And I (Ordinary Day Album Version) 267.70240 0 Jeff And Sheri Easter NaN NaN
4 ARPFHN61187FB575F6 SOWQTQZ12A58A7B63E Streets On Fire (Explicit Album Version) 279.97995 0 Lupe Fiasco Chicago, IL 41.88415 -87.63241
df_joined_songs_artists = df_songs_read.join(df_artists_read, 'artist_id').select("artist_id", "song_id", "title", "artist_name")
songplay_cols = ["timestamp", "userId", "song_id", "artist_id", "sessionId", "location", "userAgent", "level", "month", "year"]
# join df_logs with df_joined_songs_artists
df_log.join(df_joined_songs_artists, df_log.artist == df_joined_songs_artists.artist_name).select(songplay_cols).toPandas().head()
timestamp userId song_id artist_id sessionId location userAgent level month year
0 2018-11-10 07:47:51.796 44 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 350 Waterloo-Cedar Falls, IA Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r... paid 11 2018
1 2018-11-06 18:34:31.796 97 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 293 Lansing-East Lansing, MI "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... paid 11 2018
2 2018-11-06 16:04:44.796 2 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 126 Plymouth, IN "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK... free 11 2018
3 2018-11-28 23:22:57.796 24 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 984 Lake Havasu City-Kingman, AZ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK... paid 11 2018
4 2018-11-14 13:11:26.796 34 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 495 Milwaukee-Waukesha-West Allis, WI Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r... free 11 2018
songplay_table_df = df_log.join(df_joined_songs_artists, df_log.artist == df_joined_songs_artists.artist_name).select(songplay_cols)
songplay_table_df = songplay_table_df.withColumn("songplay_id", F.monotonically_increasing_id())
songplay_table_df.toPandas().head()
timestamp userId song_id artist_id sessionId location userAgent level month year songplay_id
0 2018-11-10 07:47:51.796 44 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 350 Waterloo-Cedar Falls, IA Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r... paid 11 2018 0
1 2018-11-06 18:34:31.796 97 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 293 Lansing-East Lansing, MI "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/5... paid 11 2018 1
2 2018-11-06 16:04:44.796 2 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 126 Plymouth, IN "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK... free 11 2018 2
3 2018-11-28 23:22:57.796 24 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 984 Lake Havasu City-Kingman, AZ "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK... paid 11 2018 3
4 2018-11-14 13:11:26.796 34 SOWQTQZ12A58A7B63E ARPFHN61187FB575F6 495 Milwaukee-Waukesha-West Allis, WI Mozilla/5.0 (Macintosh; Intel Mac OS X 10.9; r... free 11 2018 4
# write this to parquet file
# write to parquet file partition by 
songplay_table_df.write.parquet('data/songplays_table', partitionBy=['year', 'month'], mode='Overwrite')



from pyspark.sql import functions as F
from glob import glob
test_df = spark.read.json(glob("test/*.json"))
test_df.select(['song_id', 'title', 'artist_id', 'year', 'duration']).toPandas()
song_id title artist_id year duration
0 SOYMRWW12A6D4FAB14 The Moon And I (Ordinary Day Album Version) ARKFYS91187B98E58F 0 267.70240
1 SOHKNRJ12A6701D1F8 Drop of Rain AR10USD1187B99F3F1 0 189.57016
2 SOYMRWW12A6D4FAB14 The Moon And SUN ASKFYS91187B98E58F 0 269.70240
3 SOUDSGM12AC9618304 Insatiable (Instrumental Version) ARNTLGG11E2835DDB9 0 266.39628
test_df.select(['song_id', 'title', 'artist_id', 'year', 'duration']).groupBy('song_id').count().show()
+------------------+-----+
|           song_id|count|
+------------------+-----+
|SOUDSGM12AC9618304|    1|
|SOYMRWW12A6D4FAB14|    2|
|SOHKNRJ12A6701D1F8|    1|
+------------------+-----+
test_df.select(F.col('song_id'), 'title') \
    .groupBy('song_id') \
    .agg({'title': 'first'}) \
    .withColumnRenamed('first(title)', 'title1') \
    .show()
+------------------+--------------------+
|           song_id|               title|
+------------------+--------------------+
|SOUDSGM12AC9618304|Insatiable (Instr...|
|SOYMRWW12A6D4FAB14|The Moon And I (O...|
|SOHKNRJ12A6701D1F8|        Drop of Rain|
+------------------+--------------------+
t1 = test_df.select(F.col('song_id'), 'title') \
    .groupBy('song_id') \
    .agg({'title': 'first'}) \
    .withColumnRenamed('first(title)', 'title1')
t2 = test_df.select(['song_id', 'title', 'artist_id', 'year', 'duration'])
t1.join(t2, 'song_id').where(F.col("title1") == F.col("title")).select(["song_id", "title", "artist_id", "year", "duration"]).show()
+------------------+--------------------+------------------+----+---------+
|           song_id|               title|         artist_id|year| duration|
+------------------+--------------------+------------------+----+---------+
|SOYMRWW12A6D4FAB14|The Moon And I (O...|ARKFYS91187B98E58F|   0| 267.7024|
|SOHKNRJ12A6701D1F8|        Drop of Rain|AR10USD1187B99F3F1|   0|189.57016|
|SOUDSGM12AC9618304|Insatiable (Instr...|ARNTLGG11E2835DDB9|   0|266.39628|
+------------------+--------------------+------------------+----+---------+