library(sparklyr)
library(glue)
<- spark_connect(master = "local") sc
Moving Local Data Pipelines to Spark: The Easy Way with R and Python
Data comes from the Olympic Historical Dataset From Olympedia.org
Intro
As data people, we know that “the cloud” is usually a server somewhere maybe hosted on Azure, a server in a closet, or maybe a ten year old laptop underneath someone’s desk. When we hear people ask about moving data and processes to the cloud, it’s hard not to think of the Little Green Men from Toy Story worshiping “The Claw”. Within a few short breaths, you’ve been asked to try and move all your current processes to the cloud. It’s a daunting task, but with a few handy tricks, we can make the SQL conversion relatively painless.
Getting Started
I’ve generally tried to keep the R and Python code as similar as possible, but that’s not always the wisest move. If you have any questions, don’t hesitate to reach out.
Our first step is to load our packages and connect to spark. We’ll also create a local spark instance to use rather than link to a server. On the R side of things, our core packages are sparklyr
and glue
while Python is using pyspark
and Python 3’s f-String functionality.
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
= SparkSession.builder.getOrCreate() sc
Prepping the Data
At this point “in the real world” you’ll have loaded your data into the table format of your choosing. Where I work, all of our tables are in delta or parquet formats. For this example, rather than create dummy data for a post like I usually do, I’ve downloaded the Olympic Historical Dataset From Olympedia.org from Kaggle and moved everything to my data folder.
<-
tbl_athlete_bio spark_read_csv(sc, name = "athlete_bio",
path = "../assets/data/Olympic_Athlete_Bio.csv")
<-
tbl_athlete_results spark_read_csv(sc, name = "athlete_results",
path = "../assets/data/Olympic_Athlete_Event_Results.csv")
<-
tbl_results spark_read_csv(sc, name = "results",
path = "../assets/data/Olympic_Athlete_Event_Results.csv")
<-
tbl_medal_tally spark_read_csv(sc, name = "medal_tally",
path = "../assets/data/Olympic_Games_Medal_Tally.csv")
<-
tbl_games spark_read_csv(sc, name = "games",
path = "../assets/data/Olympics_Games.csv")
<- spark_read_csv(sc, name = "country",
tbl_country path = "../assets/data/Olympics_Country.csv")
= sc.read.csv(
tbl_athlete_bio "../assets/data/Olympic_Athlete_Bio.csv", header = True)
"athlete_bio")
tbl_athlete_bio.createOrReplaceTempView(
= sc.read.csv(
tbl_athlete_results "../assets/data/Olympic_Athlete_Event_Results.csv", header = True)
"athlete_results")
tbl_athlete_results.createOrReplaceTempView(
= sc.read.csv(
tbl_results "../assets/data/Olympic_Athlete_Event_Results.csv", header = True)
"results")
tbl_results.createOrReplaceTempView(
= sc.read.csv(
tbl_medal_tally "../assets/data/Olympic_Games_Medal_Tally.csv", header = True)
"medal_tally")
tbl_medal_tally.createOrReplaceTempView(
= sc.read.csv(
tbl_games "../assets/data/Olympics_Games.csv", header = True)
"games")
tbl_games.createOrReplaceTempView(
= sc.read.csv(
tbl_country "../assets/data/Olympics_Country.csv", header = True)
"country") tbl_country.createOrReplaceTempView(
By assigning the tables to both an R and Python variable and a name in Spark, we’re able to access the data from both an R and Python context, and a SQL context.
# R
head(tbl_country)
# SQL
sdf_sql(sc, 'SELECT * FROM country LIMIT 6')
# Source: spark<?> [?? x 2]
country_noc country
<chr> <chr>
1 AFG Afghanistan
2 ALB Albania
3 ALG Algeria
4 ASA American Samoa
5 AND Andorra
6 ANG Angola
# Source: spark<?> [?? x 2]
country_noc country
<chr> <chr>
1 AFG Afghanistan
2 ALB Albania
3 ALG Algeria
4 ASA American Samoa
5 AND Andorra
6 ANG Angola
# Python
6)
tbl_country.show(# SQL
'SELECT * FROM country LIMIT 6').show() sc.sql(
+-----------+--------------+
|country_noc| country|
+-----------+--------------+
| AFG| Afghanistan|
| ALB| Albania|
| ALG| Algeria|
| ASA|American Samoa|
| AND| Andorra|
| ANG| Angola|
+-----------+--------------+
only showing top 6 rows
+-----------+--------------+
|country_noc| country|
+-----------+--------------+
| AFG| Afghanistan|
| ALB| Albania|
| ALG| Algeria|
| ASA|American Samoa|
| AND| Andorra|
| ANG| Angola|
+-----------+--------------+
Working With Views
Let’s say your database administrators (if they aren’t also you) are kind and have done a small amount of data cleaning and you usually access the data from a mount point which provides a view. You pull the schema for the athlete_bio_vw
table and get the following SQL that references athlete_bio
.
SELECT
CAST(born AS DATE),
athlete_id, name, sex, CAST(height AS DOUBLE), CAST(weight AS DOUBLE),
country, country_nocFROM athlete_bio
WHERE
!= "na" AND
height != "na" weight
We can re-create the views three ways. We can use R or Python to recreate the view from scratch by referencing the table variable name, not the internal Spark name. We can also wrap a SQL statement in our language of choice and use the internal Spark name. We then assign this new data frame to tbl_athlete_bio_vw
. However, because the new data frame has been assigned as a variable, we no longer have direct and easy access to the view inside further SQL queries, and would have to use R or Python to do any more analysis.
# R
<-
tbl_athlete_bio_vw |>
tbl_athlete_bio filter(height != "na", weight != "na") |>
mutate(born = as.Date(born),
height = as.double(height),
weight = as.double(weight)) |>
select(athlete_id, name, sex, born, height, weight, country, country_noc)
# SQL
<- sdf_sql(sc, '
tbl_athlete_bio_vw SELECT
athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE),
CAST(weight AS DOUBLE), country, country_noc
FROM athlete_bio
WHERE
height != "na" AND
weight != "na"')
head(tbl_athlete_bio_vw)
# Source: spark<?> [?? x 8]
athlete_id name sex born height weight country count…¹
<int> <chr> <chr> <date> <dbl> <dbl> <chr> <chr>
1 43737 Andrzej Socharski Male 1947-08-31 173 72 " Pola… POL
2 50147 Nathalie Wunderlich Female 1971-06-03 170 50 " Swit… SUI
3 5085 Miha Lokar Male 1935-09-10 182 76 " Yugo… YUG
4 136329 Austin Hack Male 1992-05-17 203 100 " Unit… USA
5 38633 Tsuneo Ogasawara Male 1942-07-30 181 80 " Japa… JPN
6 77095 Fulgence Rwabu Male 1947-11-23 165 51 " Ugan… UGA
# … with abbreviated variable name ¹country_noc
# Python
= tbl_athlete_bio.filter('height != "na" AND weight != "na"'
tbl_athlete_bio_vw 'athlete_id', 'name', 'sex', col('born').cast('date'),
).select(['height').cast('double'), col('weight').cast('double'),
col('country', 'country_noc'])
# SQL
= sc.sql('''
tbl_athlete_bio_vw SELECT
athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE),
CAST(weight AS DOUBLE), country, country_noc
FROM athlete_bio
WHERE
height != "na" AND
weight != "na"''')
6) tbl_athlete_bio_vw.show(
+----------+-------------------+------+----------+------+------+--------------+-----------+
|athlete_id| name| sex| born|height|weight| country|country_noc|
+----------+-------------------+------+----------+------+------+--------------+-----------+
| 43737| Andrzej Socharski| Male|1947-08-31| 173.0| 72.0| Poland| POL|
| 50147|Nathalie Wunderlich|Female|1971-06-03| 170.0| 50.0| Switzerland| SUI|
| 5085| Miha Lokar| Male|1935-09-10| 182.0| 76.0| Yugoslavia| YUG|
| 136329| Austin Hack| Male|1992-05-17| 203.0| 100.0| United States| USA|
| 38633| Tsuneo Ogasawara| Male|1942-07-30| 181.0| 80.0| Japan| JPN|
| 77095| Fulgence Rwabu| Male|1947-11-23| 165.0| 51.0| Uganda| UGA|
+----------+-------------------+------+----------+------+------+--------------+-----------+
only showing top 6 rows
The third way is a little chaotic, but is actually my preferred method because I can save the queries I want to use as subqueries later on. First we save the query as a string and then use the R {glue}
package or f-strings from Python to run the SQL query in the tbl_athlete_bio_qry
variable. Because this method lends itself to subqueries so well, I don’t usually use it unless I intend to use the query later on.
If you’re not familiar, both glue
and f-strings take a regular string and insert the value of the variable inside curly braces {}
. For example, if you have a variable named name
that has someone’s name. You can use glue
or f-strings to write "hello {name}"
and when this is evaluated, if the name is Gus, it will print “hello Gus.”
<- '
athlete_bio_vw_qry SELECT
athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE),
CAST(weight AS DOUBLE), country, country_noc
FROM athlete_bio
WHERE
height != "na" AND
weight != "na"'
<- sdf_sql(sc, glue("{athlete_bio_vw_qry}"))
tbl_athlete_bio_vw
head(tbl_athlete_bio_vw)
# Source: spark<?> [?? x 8]
athlete_id name sex born height weight country count…¹
<int> <chr> <chr> <date> <dbl> <dbl> <chr> <chr>
1 43737 Andrzej Socharski Male 1947-08-31 173 72 " Pola… POL
2 50147 Nathalie Wunderlich Female 1971-06-03 170 50 " Swit… SUI
3 5085 Miha Lokar Male 1935-09-10 182 76 " Yugo… YUG
4 136329 Austin Hack Male 1992-05-17 203 100 " Unit… USA
5 38633 Tsuneo Ogasawara Male 1942-07-30 181 80 " Japa… JPN
6 77095 Fulgence Rwabu Male 1947-11-23 165 51 " Ugan… UGA
# … with abbreviated variable name ¹country_noc
= '''
athlete_bio_vw_qry SELECT
athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE),
CAST(weight AS DOUBLE), country, country_noc
FROM athlete_bio
WHERE
height != "na" AND
weight != "na"'''
= sc.sql(f'''{athlete_bio_vw_qry}''')
tbl_athlete_bio_vw
6) tbl_athlete_bio_vw.show(
+----------+-------------------+------+----------+------+------+--------------+-----------+
|athlete_id| name| sex| born|height|weight| country|country_noc|
+----------+-------------------+------+----------+------+------+--------------+-----------+
| 43737| Andrzej Socharski| Male|1947-08-31| 173.0| 72.0| Poland| POL|
| 50147|Nathalie Wunderlich|Female|1971-06-03| 170.0| 50.0| Switzerland| SUI|
| 5085| Miha Lokar| Male|1935-09-10| 182.0| 76.0| Yugoslavia| YUG|
| 136329| Austin Hack| Male|1992-05-17| 203.0| 100.0| United States| USA|
| 38633| Tsuneo Ogasawara| Male|1942-07-30| 181.0| 80.0| Japan| JPN|
| 77095| Fulgence Rwabu| Male|1947-11-23| 165.0| 51.0| Uganda| UGA|
+----------+-------------------+------+----------+------+------+--------------+-----------+
only showing top 6 rows
Subqueries
You probably know that the idea behind subqueries is that it lets you nest operations. The idea behind my third method (I really need a better name for it) is the same. Rather, instead of nesting queries directly, we’re nesting strings which are then evaluated as queries. In this example, we’re going to make another subquery called tbl_athlete_results_qry
that contains all columns from the athlete_results
Spark table for medal winners. We then want to return all rows from tbl_athlete_bio_vw
for people who have won medals.
<- '
athlete_results_qry SELECT athlete_id
FROM athlete_results
WHERE medal != "na"'
sdf_sql(sc, glue('
SELECT *
FROM ({athlete_bio_vw_qry})
WHERE athlete_id IN ({athlete_results_qry})')) |>
head()
# Source: spark<?> [?? x 8]
athlete_id name sex born height weight country count…¹
<int> <chr> <chr> <date> <dbl> <dbl> <chr> <chr>
1 1700071 Lee Myung-Hwa Female 1964-09-07 168 57 " Republi… KOR
2 58599 István Kozma Male 1939-11-27 198 125 " Hungary" HUN
3 31969 Valter Matošević Male 1970-06-11 194 99 " Croatia" CRO
4 91237 Volha Puzhevich Female 1983-03-17 167 43 " Belarus" BLR
5 104818 Marko Kemppainen Male 1976-07-13 184 100 " Finland" FIN
6 5695 Lew Beck Male 1922-04-19 183 75 " United … USA
# … with abbreviated variable name ¹country_noc
= '''
athlete_results_qry SELECT athlete_id
FROM athlete_results
WHERE medal != "na"'''
f'''
sc.sql( SELECT *
FROM ({athlete_bio_vw_qry})
WHERE athlete_id IN ({athlete_results_qry})''').show(6)
+----------+----------------+------+----------+------+------+------------------+-----------+
|athlete_id| name| sex| born|height|weight| country|country_noc|
+----------+----------------+------+----------+------+------+------------------+-----------+
| 1700071| Lee Myung-Hwa|Female|1964-09-07| 168.0| 57.0| Republic of Korea| KOR|
| 58599| István Kozma| Male|1939-11-27| 198.0| 125.0| Hungary| HUN|
| 31969|Valter Matošević| Male|1970-06-11| 194.0| 99.0| Croatia| CRO|
| 91237| Volha Puzhevich|Female|1983-03-17| 167.0| 43.0| Belarus| BLR|
| 104818|Marko Kemppainen| Male|1976-07-13| 184.0| 100.0| Finland| FIN|
| 5695| Lew Beck| Male|1922-04-19| 183.0| 75.0| United States| USA|
+----------+----------------+------+----------+------+------+------------------+-----------+
only showing top 6 rows
If, for whatever reason, we wanted to go even deeper with the subqueries, we could. All we have to do is make sure we create our smallest level queries first then build those out inside the larger queries. Let’s save our query for athletes who have won medals as medal_athlete_qry
. It’s important that when we save it, we use glue
/f-strings to make sure our string is expanded properly. We can then use medal_athlete_qry
as a subquery to count the number of medals won by each country.
<- glue('
medal_athlete_qry SELECT *
FROM ({athlete_bio_vw_qry})
WHERE athlete_id IN ({athlete_results_qry})')
<- sdf_sql(sc, glue('
tbl_country_medal SELECT country, COUNT(country) AS medal_count
FROM ({medal_athlete_qry})
GROUP BY country
ORDER BY medal_count DESC'))
head(tbl_country_medal)
# Source: spark<?> [?? x 2]
country medal_count
<chr> <dbl>
1 " United States" 3116
2 " Soviet Union" 1319
3 " Germany" 1005
4 " Canada" 886
5 " France" 803
6 " Australia" 797
= f'''
medal_athlete_qry SELECT *
FROM ({athlete_bio_vw_qry})
WHERE athlete_id IN ({athlete_results_qry})'''
= sc.sql(f'''
tbl_country_medal SELECT country, COUNT(country) AS medal_count
FROM ({medal_athlete_qry})
GROUP BY country
ORDER BY medal_count DESC''')
6) tbl_country_medal.show(
+--------------+-----------+
| country|medal_count|
+--------------+-----------+
| United States| 3116|
| Soviet Union| 1319|
| Germany| 1005|
| Canada| 886|
| France| 803|
| Australia| 797|
+--------------+-----------+
only showing top 6 rows
Assigning Variable Tables to Spark Tables
At this point, you might decide you want to assign your new tables back to Spark, rather than only keeping them in your R or Python environment. This turns out to be a relatively simple operation and we name tbl_athlete_bio_vw
to athlete_bio_vw
. We can then check the tables available to us to make sure the operation succeeded, and then run a short SQL query to be extra sure it worked.
<- copy_to(sc, tbl_athlete_bio_vw, 'athlete_bio_vw')
tbl_athlete_bio_vw
::src_tbls(sc) dplyr
[1] "athlete_bio" "athlete_bio_vw" "athlete_results" "country"
[5] "games" "medal_tally" "results"
sdf_sql(sc, 'SELECT * FROM athlete_bio_vw LIMIT 6')
# Source: spark<?> [?? x 8]
athlete_id name sex born height weight country count…¹
<int> <chr> <chr> <date> <dbl> <dbl> <chr> <chr>
1 43737 Andrzej Socharski Male 1947-08-31 173 72 " Pola… POL
2 50147 Nathalie Wunderlich Female 1971-06-03 170 50 " Swit… SUI
3 5085 Miha Lokar Male 1935-09-10 182 76 " Yugo… YUG
4 136329 Austin Hack Male 1992-05-17 203 100 " Unit… USA
5 38633 Tsuneo Ogasawara Male 1942-07-30 181 80 " Japa… JPN
6 77095 Fulgence Rwabu Male 1947-11-23 165 51 " Ugan… UGA
# … with abbreviated variable name ¹country_noc
'athlete_bio_vw')
tbl_athlete_bio_vw.createOrReplaceTempView(
"show tables").show() sc.sql(
+---------+---------------+-----------+
|namespace| tableName|isTemporary|
+---------+---------------+-----------+
| | athlete_bio| true|
| | athlete_bio_vw| true|
| |athlete_results| true|
| | country| true|
| | games| true|
| | medal_tally| true|
| | results| true|
+---------+---------------+-----------+
'SELECT * FROM athlete_bio_vw LIMIT 6').show() sc.sql(
+----------+-------------------+------+----------+------+------+--------------+-----------+
|athlete_id| name| sex| born|height|weight| country|country_noc|
+----------+-------------------+------+----------+------+------+--------------+-----------+
| 43737| Andrzej Socharski| Male|1947-08-31| 173.0| 72.0| Poland| POL|
| 50147|Nathalie Wunderlich|Female|1971-06-03| 170.0| 50.0| Switzerland| SUI|
| 5085| Miha Lokar| Male|1935-09-10| 182.0| 76.0| Yugoslavia| YUG|
| 136329| Austin Hack| Male|1992-05-17| 203.0| 100.0| United States| USA|
| 38633| Tsuneo Ogasawara| Male|1942-07-30| 181.0| 80.0| Japan| JPN|
| 77095| Fulgence Rwabu| Male|1947-11-23| 165.0| 51.0| Uganda| UGA|
+----------+-------------------+------+----------+------+------+--------------+-----------+
Dynamic Queries
The last piece that ties all of this together is using variables as different components of your data operations. A relatively common task could be to change dates on a monthly query to filter for the last thirty days. However, I don’t think any Olympians have been born in the last thirty days, probably because the last Olympics was more than thirty days ago. Nevertheless, we can filter for athletes that were born in the last thirty years. In R, the {lubridate}
makes the year subtraction a breeze while python-dateutil
does all the heavy lifting in Python.
library(lubridate)
<- ymd(Sys.Date()) - years(30)
thirtyYears
sdf_sql(sc, glue('
SELECT *
FROM athlete_bio_vw
WHERE born >= "{thirtyYears}"
LIMIT 6'))
# Source: spark<?> [?? x 8]
athlete_id name sex born height weight country count…¹
<int> <chr> <chr> <date> <dbl> <dbl> <chr> <chr>
1 136346 Pedro Pascual Male 1996-03-15 185 70 " Unite… USA
2 131521 Ana Dascăl Female 2002-09-12 183 60 " Roman… ROU
3 137091 Saskia Alusalu Female 1994-04-14 175 64 " Eston… EST
4 134220 Jason Osborne Male 1994-03-20 178 72 " Germa… GER
5 143729 C. A. Bhavani Devi Female 1993-08-27 168 58 " India" IND
6 138162 Oskar Svensson Male 1995-09-07 190 86 " Swede… SWE
# … with abbreviated variable name ¹country_noc
from datetime import date
from dateutil.relativedelta import relativedelta
= date.today() - relativedelta(years = 30)
thirtyYears f'''
sc.sql( SELECT *
FROM athlete_bio_vw
WHERE born >= "{thirtyYears}"
LIMIT 6''').show()
+----------+------------------+------+----------+------+------+--------------+-----------+
|athlete_id| name| sex| born|height|weight| country|country_noc|
+----------+------------------+------+----------+------+------+--------------+-----------+
| 136346| Pedro Pascual| Male|1996-03-15| 185.0| 70.0| United States| USA|
| 131521| Ana Dascăl|Female|2002-09-12| 183.0| 60.0| Romania| ROU|
| 137091| Saskia Alusalu|Female|1994-04-14| 175.0| 64.0| Estonia| EST|
| 134220| Jason Osborne| Male|1994-03-20| 178.0| 72.0| Germany| GER|
| 143729|C. A. Bhavani Devi|Female|1993-08-27| 168.0| 58.0| India| IND|
| 138162| Oskar Svensson| Male|1995-09-07| 190.0| 86.0| Sweden| SWE|
+----------+------------------+------+----------+------+------+--------------+-----------+
Disconnecting From Spark
Before we wrap up, let’s disconnect from Spark real quick.
spark_disconnect(sc)
sc.stop()
Wrapping Up
At the beginning of this post, I made a promise that moving your processes to the cloud doesn’t have to be super painful. It probably still will be painful, but I’m hoping that it’ll now be at most regular painful. We learned that we can either translate our queries to R or Python, run a SQL directly, or save the query as a string, and insert it into later queries where it will be run directly. We can then take our new tables and transfer them back over to Spark so we can reference the tables directly without any shenanigans involving glue
or f-strings. Lastly, we can use those same glue
and f-string tricks to dynamically change our queries based on R or Python variables.
Resources
Both R and Python have some really great resources out there to help you get started. I highly recommend starting with the documentation for sparklyr
and PySpark
as each have everything you need to know (besides the info in my post!) to get started.
Python: PySpark
All code in this article is available here. If you want to see more from me, check out my GitHub or guslipkin.github.io. If you want to hear from me, I’m also on Twitter @guslipkin.