Moving Local Data Pipelines to Spark: The Easy Way with R and Python

R
R: sparklyr
Python
Python: PySpark
Spark
It might not be easy, but it’ll certainly be less painful
Author

Gus Lipkin

Published

August 27, 2022

Intro

As data people, we know that “the cloud” is usually a server somewhere maybe hosted on Azure, a server in a closet, or maybe a ten year old laptop underneath someone’s desk. When we hear people ask about moving data and processes to the cloud, it’s hard not to think of the Little Green Men from Toy Story worshiping “The Claw”. Within a few short breaths, you’ve been asked to try and move all your current processes to the cloud. It’s a daunting task, but with a few handy tricks, we can make the SQL conversion relatively painless.

Three Little Green Men from Toy Story saying 'The Cloud'

Getting Started

Tip

I’ve generally tried to keep the R and Python code as similar as possible, but that’s not always the wisest move. If you have any questions, don’t hesitate to reach out.

Our first step is to load our packages and connect to spark. We’ll also create a local spark instance to use rather than link to a server. On the R side of things, our core packages are sparklyr and glue while Python is using pyspark and Python 3’s f-String functionality.

library(sparklyr)
library(glue)

sc <- spark_connect(master = "local")
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

sc = SparkSession.builder.getOrCreate()

Prepping the Data

At this point “in the real world” you’ll have loaded your data into the table format of your choosing. Where I work, all of our tables are in delta or parquet formats. For this example, rather than create dummy data for a post like I usually do, I’ve downloaded the Olympic Historical Dataset From Olympedia.org from Kaggle and moved everything to my data folder.

tbl_athlete_bio <- 
  spark_read_csv(sc, name = "athlete_bio",
                 path = "../assets/data/Olympic_Athlete_Bio.csv")

tbl_athlete_results <- 
  spark_read_csv(sc, name = "athlete_results", 
                 path = "../assets/data/Olympic_Athlete_Event_Results.csv")

tbl_results <- 
  spark_read_csv(sc, name = "results",
                 path = "../assets/data/Olympic_Athlete_Event_Results.csv")

tbl_medal_tally <- 
  spark_read_csv(sc, name = "medal_tally",
                 path = "../assets/data/Olympic_Games_Medal_Tally.csv")

tbl_games <- 
  spark_read_csv(sc, name = "games",
                 path = "../assets/data/Olympics_Games.csv")

tbl_country <- spark_read_csv(sc, name = "country",
                              path = "../assets/data/Olympics_Country.csv")
tbl_athlete_bio = sc.read.csv(
  "../assets/data/Olympic_Athlete_Bio.csv", header = True)
tbl_athlete_bio.createOrReplaceTempView("athlete_bio")

tbl_athlete_results = sc.read.csv(
    "../assets/data/Olympic_Athlete_Event_Results.csv", header = True)
tbl_athlete_results.createOrReplaceTempView("athlete_results")

tbl_results = sc.read.csv(
  "../assets/data/Olympic_Athlete_Event_Results.csv", header = True)
tbl_results.createOrReplaceTempView("results")

tbl_medal_tally = sc.read.csv(
  "../assets/data/Olympic_Games_Medal_Tally.csv", header = True)
tbl_medal_tally.createOrReplaceTempView("medal_tally")

tbl_games = sc.read.csv(
  "../assets/data/Olympics_Games.csv", header = True)
tbl_games.createOrReplaceTempView("games")

tbl_country = sc.read.csv(
  "../assets/data/Olympics_Country.csv", header = True)
tbl_country.createOrReplaceTempView("country")

By assigning the tables to both an R and Python variable and a name in Spark, we’re able to access the data from both an R and Python context, and a SQL context.

# R
head(tbl_country)
# SQL
sdf_sql(sc, 'SELECT * FROM country LIMIT 6')
# Source: spark<?> [?? x 2]
  country_noc country       
  <chr>       <chr>         
1 AFG         Afghanistan   
2 ALB         Albania       
3 ALG         Algeria       
4 ASA         American Samoa
5 AND         Andorra       
6 ANG         Angola        
# Source: spark<?> [?? x 2]
  country_noc country       
  <chr>       <chr>         
1 AFG         Afghanistan   
2 ALB         Albania       
3 ALG         Algeria       
4 ASA         American Samoa
5 AND         Andorra       
6 ANG         Angola        
# Python
tbl_country.show(6)
# SQL
sc.sql('SELECT * FROM country LIMIT 6').show()
+-----------+--------------+
|country_noc|       country|
+-----------+--------------+
|        AFG|   Afghanistan|
|        ALB|       Albania|
|        ALG|       Algeria|
|        ASA|American Samoa|
|        AND|       Andorra|
|        ANG|        Angola|
+-----------+--------------+
only showing top 6 rows
+-----------+--------------+
|country_noc|       country|
+-----------+--------------+
|        AFG|   Afghanistan|
|        ALB|       Albania|
|        ALG|       Algeria|
|        ASA|American Samoa|
|        AND|       Andorra|
|        ANG|        Angola|
+-----------+--------------+

Working With Views

Let’s say your database administrators (if they aren’t also you) are kind and have done a small amount of data cleaning and you usually access the data from a mount point which provides a view. You pull the schema for the athlete_bio_vw table and get the following SQL that references athlete_bio.

SELECT 
  athlete_id, name, sex, CAST(born AS DATE), 
  CAST(height AS DOUBLE), CAST(weight AS DOUBLE), 
  country, country_noc
FROM athlete_bio 
WHERE 
  height != "na" AND
  weight != "na"

We can re-create the views three ways. We can use R or Python to recreate the view from scratch by referencing the table variable name, not the internal Spark name. We can also wrap a SQL statement in our language of choice and use the internal Spark name. We then assign this new data frame to tbl_athlete_bio_vw. However, because the new data frame has been assigned as a variable, we no longer have direct and easy access to the view inside further SQL queries, and would have to use R or Python to do any more analysis.

# R
tbl_athlete_bio_vw <-
  tbl_athlete_bio |> 
    filter(height != "na", weight != "na") |>
    mutate(born = as.Date(born), 
           height = as.double(height), 
           weight = as.double(weight)) |>
    select(athlete_id, name, sex, born, height, weight, country, country_noc)
# SQL
tbl_athlete_bio_vw <- sdf_sql(sc, '
  SELECT 
    athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE), 
    CAST(weight AS DOUBLE), country, country_noc
  FROM athlete_bio 
  WHERE 
    height != "na" AND
    weight != "na"')

head(tbl_athlete_bio_vw)
# Source: spark<?> [?? x 8]
  athlete_id name                sex    born       height weight country count…¹
       <int> <chr>               <chr>  <date>      <dbl>  <dbl> <chr>   <chr>  
1      43737 Andrzej Socharski   Male   1947-08-31    173     72 " Pola… POL    
2      50147 Nathalie Wunderlich Female 1971-06-03    170     50 " Swit… SUI    
3       5085 Miha Lokar          Male   1935-09-10    182     76 " Yugo… YUG    
4     136329 Austin Hack         Male   1992-05-17    203    100 " Unit… USA    
5      38633 Tsuneo Ogasawara    Male   1942-07-30    181     80 " Japa… JPN    
6      77095 Fulgence Rwabu      Male   1947-11-23    165     51 " Ugan… UGA    
# … with abbreviated variable name ¹​country_noc
# Python
tbl_athlete_bio_vw = tbl_athlete_bio.filter('height != "na" AND weight != "na"'
    ).select(['athlete_id', 'name', 'sex', col('born').cast('date'), 
              col('height').cast('double'), col('weight').cast('double'), 
              'country', 'country_noc'])
# SQL
tbl_athlete_bio_vw = sc.sql('''
  SELECT 
    athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE), 
    CAST(weight AS DOUBLE), country, country_noc
  FROM athlete_bio 
  WHERE 
    height != "na" AND
    weight != "na"''')
    
tbl_athlete_bio_vw.show(6)
+----------+-------------------+------+----------+------+------+--------------+-----------+
|athlete_id|               name|   sex|      born|height|weight|       country|country_noc|
+----------+-------------------+------+----------+------+------+--------------+-----------+
|     43737|  Andrzej Socharski|  Male|1947-08-31| 173.0|  72.0|        Poland|        POL|
|     50147|Nathalie Wunderlich|Female|1971-06-03| 170.0|  50.0|   Switzerland|        SUI|
|      5085|         Miha Lokar|  Male|1935-09-10| 182.0|  76.0|    Yugoslavia|        YUG|
|    136329|        Austin Hack|  Male|1992-05-17| 203.0| 100.0| United States|        USA|
|     38633|   Tsuneo Ogasawara|  Male|1942-07-30| 181.0|  80.0|         Japan|        JPN|
|     77095|     Fulgence Rwabu|  Male|1947-11-23| 165.0|  51.0|        Uganda|        UGA|
+----------+-------------------+------+----------+------+------+--------------+-----------+
only showing top 6 rows

The third way is a little chaotic, but is actually my preferred method because I can save the queries I want to use as subqueries later on. First we save the query as a string and then use the R {glue} package or f-strings from Python to run the SQL query in the tbl_athlete_bio_qry variable. Because this method lends itself to subqueries so well, I don’t usually use it unless I intend to use the query later on.

If you’re not familiar, both glue and f-strings take a regular string and insert the value of the variable inside curly braces {}. For example, if you have a variable named name that has someone’s name. You can use glue or f-strings to write "hello {name}" and when this is evaluated, if the name is Gus, it will print “hello Gus.”

athlete_bio_vw_qry <- '
  SELECT 
    athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE), 
    CAST(weight AS DOUBLE), country, country_noc
  FROM athlete_bio 
  WHERE 
    height != "na" AND
    weight != "na"'
tbl_athlete_bio_vw <- sdf_sql(sc, glue("{athlete_bio_vw_qry}"))

head(tbl_athlete_bio_vw)
# Source: spark<?> [?? x 8]
  athlete_id name                sex    born       height weight country count…¹
       <int> <chr>               <chr>  <date>      <dbl>  <dbl> <chr>   <chr>  
1      43737 Andrzej Socharski   Male   1947-08-31    173     72 " Pola… POL    
2      50147 Nathalie Wunderlich Female 1971-06-03    170     50 " Swit… SUI    
3       5085 Miha Lokar          Male   1935-09-10    182     76 " Yugo… YUG    
4     136329 Austin Hack         Male   1992-05-17    203    100 " Unit… USA    
5      38633 Tsuneo Ogasawara    Male   1942-07-30    181     80 " Japa… JPN    
6      77095 Fulgence Rwabu      Male   1947-11-23    165     51 " Ugan… UGA    
# … with abbreviated variable name ¹​country_noc
athlete_bio_vw_qry = '''
  SELECT 
    athlete_id, name, sex, CAST(born AS DATE), CAST(height AS DOUBLE), 
    CAST(weight AS DOUBLE), country, country_noc
  FROM athlete_bio 
  WHERE 
    height != "na" AND
    weight != "na"'''
tbl_athlete_bio_vw = sc.sql(f'''{athlete_bio_vw_qry}''')

tbl_athlete_bio_vw.show(6)
+----------+-------------------+------+----------+------+------+--------------+-----------+
|athlete_id|               name|   sex|      born|height|weight|       country|country_noc|
+----------+-------------------+------+----------+------+------+--------------+-----------+
|     43737|  Andrzej Socharski|  Male|1947-08-31| 173.0|  72.0|        Poland|        POL|
|     50147|Nathalie Wunderlich|Female|1971-06-03| 170.0|  50.0|   Switzerland|        SUI|
|      5085|         Miha Lokar|  Male|1935-09-10| 182.0|  76.0|    Yugoslavia|        YUG|
|    136329|        Austin Hack|  Male|1992-05-17| 203.0| 100.0| United States|        USA|
|     38633|   Tsuneo Ogasawara|  Male|1942-07-30| 181.0|  80.0|         Japan|        JPN|
|     77095|     Fulgence Rwabu|  Male|1947-11-23| 165.0|  51.0|        Uganda|        UGA|
+----------+-------------------+------+----------+------+------+--------------+-----------+
only showing top 6 rows

Subqueries

You probably know that the idea behind subqueries is that it lets you nest operations. The idea behind my third method (I really need a better name for it) is the same. Rather, instead of nesting queries directly, we’re nesting strings which are then evaluated as queries. In this example, we’re going to make another subquery called tbl_athlete_results_qry that contains all columns from the athlete_results Spark table for medal winners. We then want to return all rows from tbl_athlete_bio_vw for people who have won medals.

athlete_results_qry <- '
  SELECT athlete_id 
  FROM athlete_results
  WHERE medal != "na"'
sdf_sql(sc, glue('
  SELECT *
  FROM ({athlete_bio_vw_qry})
  WHERE athlete_id IN ({athlete_results_qry})')) |>
  head()
# Source: spark<?> [?? x 8]
  athlete_id name             sex    born       height weight country    count…¹
       <int> <chr>            <chr>  <date>      <dbl>  <dbl> <chr>      <chr>  
1    1700071 Lee Myung-Hwa    Female 1964-09-07    168     57 " Republi… KOR    
2      58599 István Kozma     Male   1939-11-27    198    125 " Hungary" HUN    
3      31969 Valter Matošević Male   1970-06-11    194     99 " Croatia" CRO    
4      91237 Volha Puzhevich  Female 1983-03-17    167     43 " Belarus" BLR    
5     104818 Marko Kemppainen Male   1976-07-13    184    100 " Finland" FIN    
6       5695 Lew Beck         Male   1922-04-19    183     75 " United … USA    
# … with abbreviated variable name ¹​country_noc
athlete_results_qry = '''
  SELECT athlete_id 
  FROM athlete_results
  WHERE medal != "na"'''
sc.sql(f'''
  SELECT *
  FROM ({athlete_bio_vw_qry})
  WHERE athlete_id IN ({athlete_results_qry})''').show(6)
+----------+----------------+------+----------+------+------+------------------+-----------+
|athlete_id|            name|   sex|      born|height|weight|           country|country_noc|
+----------+----------------+------+----------+------+------+------------------+-----------+
|   1700071|   Lee Myung-Hwa|Female|1964-09-07| 168.0|  57.0| Republic of Korea|        KOR|
|     58599|    István Kozma|  Male|1939-11-27| 198.0| 125.0|           Hungary|        HUN|
|     31969|Valter Matošević|  Male|1970-06-11| 194.0|  99.0|           Croatia|        CRO|
|     91237| Volha Puzhevich|Female|1983-03-17| 167.0|  43.0|           Belarus|        BLR|
|    104818|Marko Kemppainen|  Male|1976-07-13| 184.0| 100.0|           Finland|        FIN|
|      5695|        Lew Beck|  Male|1922-04-19| 183.0|  75.0|     United States|        USA|
+----------+----------------+------+----------+------+------+------------------+-----------+
only showing top 6 rows

If, for whatever reason, we wanted to go even deeper with the subqueries, we could. All we have to do is make sure we create our smallest level queries first then build those out inside the larger queries. Let’s save our query for athletes who have won medals as medal_athlete_qry. It’s important that when we save it, we use glue/f-strings to make sure our string is expanded properly. We can then use medal_athlete_qry as a subquery to count the number of medals won by each country.

medal_athlete_qry <- glue('
  SELECT *
  FROM ({athlete_bio_vw_qry})
  WHERE athlete_id IN ({athlete_results_qry})')
tbl_country_medal <- sdf_sql(sc, glue('
  SELECT country, COUNT(country) AS medal_count
  FROM ({medal_athlete_qry})
  GROUP BY country
  ORDER BY medal_count DESC'))

head(tbl_country_medal)
# Source: spark<?> [?? x 2]
  country          medal_count
  <chr>                  <dbl>
1 " United States"        3116
2 " Soviet Union"         1319
3 " Germany"              1005
4 " Canada"                886
5 " France"                803
6 " Australia"             797
medal_athlete_qry = f'''
  SELECT *
  FROM ({athlete_bio_vw_qry})
  WHERE athlete_id IN ({athlete_results_qry})'''
tbl_country_medal = sc.sql(f'''
  SELECT country, COUNT(country) AS medal_count
  FROM ({medal_athlete_qry})
  GROUP BY country
  ORDER BY medal_count DESC''')
                   
tbl_country_medal.show(6)
+--------------+-----------+
|       country|medal_count|
+--------------+-----------+
| United States|       3116|
|  Soviet Union|       1319|
|       Germany|       1005|
|        Canada|        886|
|        France|        803|
|     Australia|        797|
+--------------+-----------+
only showing top 6 rows

Assigning Variable Tables to Spark Tables

At this point, you might decide you want to assign your new tables back to Spark, rather than only keeping them in your R or Python environment. This turns out to be a relatively simple operation and we name tbl_athlete_bio_vw to athlete_bio_vw. We can then check the tables available to us to make sure the operation succeeded, and then run a short SQL query to be extra sure it worked.

tbl_athlete_bio_vw <- copy_to(sc, tbl_athlete_bio_vw, 'athlete_bio_vw')

dplyr::src_tbls(sc)
[1] "athlete_bio"     "athlete_bio_vw"  "athlete_results" "country"        
[5] "games"           "medal_tally"     "results"        
sdf_sql(sc, 'SELECT * FROM athlete_bio_vw LIMIT 6')
# Source: spark<?> [?? x 8]
  athlete_id name                sex    born       height weight country count…¹
       <int> <chr>               <chr>  <date>      <dbl>  <dbl> <chr>   <chr>  
1      43737 Andrzej Socharski   Male   1947-08-31    173     72 " Pola… POL    
2      50147 Nathalie Wunderlich Female 1971-06-03    170     50 " Swit… SUI    
3       5085 Miha Lokar          Male   1935-09-10    182     76 " Yugo… YUG    
4     136329 Austin Hack         Male   1992-05-17    203    100 " Unit… USA    
5      38633 Tsuneo Ogasawara    Male   1942-07-30    181     80 " Japa… JPN    
6      77095 Fulgence Rwabu      Male   1947-11-23    165     51 " Ugan… UGA    
# … with abbreviated variable name ¹​country_noc
tbl_athlete_bio_vw.createOrReplaceTempView('athlete_bio_vw')

sc.sql("show tables").show()
+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|         |    athlete_bio|       true|
|         | athlete_bio_vw|       true|
|         |athlete_results|       true|
|         |        country|       true|
|         |          games|       true|
|         |    medal_tally|       true|
|         |        results|       true|
+---------+---------------+-----------+
sc.sql('SELECT * FROM athlete_bio_vw LIMIT 6').show()
+----------+-------------------+------+----------+------+------+--------------+-----------+
|athlete_id|               name|   sex|      born|height|weight|       country|country_noc|
+----------+-------------------+------+----------+------+------+--------------+-----------+
|     43737|  Andrzej Socharski|  Male|1947-08-31| 173.0|  72.0|        Poland|        POL|
|     50147|Nathalie Wunderlich|Female|1971-06-03| 170.0|  50.0|   Switzerland|        SUI|
|      5085|         Miha Lokar|  Male|1935-09-10| 182.0|  76.0|    Yugoslavia|        YUG|
|    136329|        Austin Hack|  Male|1992-05-17| 203.0| 100.0| United States|        USA|
|     38633|   Tsuneo Ogasawara|  Male|1942-07-30| 181.0|  80.0|         Japan|        JPN|
|     77095|     Fulgence Rwabu|  Male|1947-11-23| 165.0|  51.0|        Uganda|        UGA|
+----------+-------------------+------+----------+------+------+--------------+-----------+

Dynamic Queries

The last piece that ties all of this together is using variables as different components of your data operations. A relatively common task could be to change dates on a monthly query to filter for the last thirty days. However, I don’t think any Olympians have been born in the last thirty days, probably because the last Olympics was more than thirty days ago. Nevertheless, we can filter for athletes that were born in the last thirty years. In R, the {lubridate} makes the year subtraction a breeze while python-dateutil does all the heavy lifting in Python.

library(lubridate)

thirtyYears <- ymd(Sys.Date()) - years(30)

sdf_sql(sc, glue('
  SELECT *
  FROM athlete_bio_vw
  WHERE born >= "{thirtyYears}"
  LIMIT 6'))
# Source: spark<?> [?? x 8]
  athlete_id name               sex    born       height weight country  count…¹
       <int> <chr>              <chr>  <date>      <dbl>  <dbl> <chr>    <chr>  
1     136346 Pedro Pascual      Male   1996-03-15    185     70 " Unite… USA    
2     131521 Ana Dascăl         Female 2002-09-12    183     60 " Roman… ROU    
3     137091 Saskia Alusalu     Female 1994-04-14    175     64 " Eston… EST    
4     134220 Jason Osborne      Male   1994-03-20    178     72 " Germa… GER    
5     143729 C. A. Bhavani Devi Female 1993-08-27    168     58 " India" IND    
6     138162 Oskar Svensson     Male   1995-09-07    190     86 " Swede… SWE    
# … with abbreviated variable name ¹​country_noc
from datetime import date
from dateutil.relativedelta import relativedelta

thirtyYears = date.today() - relativedelta(years = 30)
sc.sql(f'''
  SELECT *
  FROM athlete_bio_vw
  WHERE born >= "{thirtyYears}"
  LIMIT 6''').show()
+----------+------------------+------+----------+------+------+--------------+-----------+
|athlete_id|              name|   sex|      born|height|weight|       country|country_noc|
+----------+------------------+------+----------+------+------+--------------+-----------+
|    136346|     Pedro Pascual|  Male|1996-03-15| 185.0|  70.0| United States|        USA|
|    131521|        Ana Dascăl|Female|2002-09-12| 183.0|  60.0|       Romania|        ROU|
|    137091|    Saskia Alusalu|Female|1994-04-14| 175.0|  64.0|       Estonia|        EST|
|    134220|     Jason Osborne|  Male|1994-03-20| 178.0|  72.0|       Germany|        GER|
|    143729|C. A. Bhavani Devi|Female|1993-08-27| 168.0|  58.0|         India|        IND|
|    138162|    Oskar Svensson|  Male|1995-09-07| 190.0|  86.0|        Sweden|        SWE|
+----------+------------------+------+----------+------+------+--------------+-----------+

Disconnecting From Spark

Before we wrap up, let’s disconnect from Spark real quick.

spark_disconnect(sc)
sc.stop()

Wrapping Up

At the beginning of this post, I made a promise that moving your processes to the cloud doesn’t have to be super painful. It probably still will be painful, but I’m hoping that it’ll now be at most regular painful. We learned that we can either translate our queries to R or Python, run a SQL directly, or save the query as a string, and insert it into later queries where it will be run directly. We can then take our new tables and transfer them back over to Spark so we can reference the tables directly without any shenanigans involving glue or f-strings. Lastly, we can use those same glue and f-string tricks to dynamically change our queries based on R or Python variables.

Resources

Both R and Python have some really great resources out there to help you get started. I highly recommend starting with the documentation for sparklyr and PySpark as each have everything you need to know (besides the info in my post!) to get started.


All code in this article is available here. If you want to see more from me, check out my GitHub or guslipkin.github.io. If you want to hear from me, I’m also on Twitter @guslipkin.

Gus Lipkin is a Data Scientist, Business Analyst, and occasional bike mechanic