Spark SQL with Python
This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.
In this post, I’ll very briefly summarize the Spark SQL functions necessary for the CCA175 exam. I also have a longer article on Spark available that goes into more detail and spans a few more topics.
Introduction
- RDDs can contain any kind of unstructured data
- Spark 1.6 introduced DataFrames, which are DataSets of Row objects. These Row objects contain structured data (i.e. they have a schema: names and types)
- Big advantages:
- You can run SQL queries on DataFrames
- You can read and write to JSON, Hive, and parquet
- You can communicate with JDBC/ODBC
- Note: The all-encompassing
SparkSession
creator only appeared in Spark 2.0. Currently, the CCA175 exam provides Spark 1.6, where you still work with aSparkContext
, and aSQLContext
orHiveContext
“on top”.
Creating DataFrames and DataSets
Spark has a .toDS
and .toDF
method, that converts an RDD to a DataSet or DataFrame. Note that you need to map the lines into Row
objects first:
You can also use an sqlContext to directly load data as a DataFrame:
Two ways to play
Consider the following table people.csv
:
0, "Anna", 32, "female", 172
1, "Bert", 21, "male", 182
2, "Curt", 42, "male", 170
3, "Doris", 43, "female", 164
4, "Edna", 22, "female", 171
5, "Felix", 19, "male", 160
There are two ways of programming Spark SQL queries. One is to issue actual SQL statements, like so:
The other way is to chain the respective methods together, in this example, in Python:
This is similar to the intuitive way the R packages dplyr
and magrittr
use. What’s more, this way you don’t have to create a temporary view.
A basic Spark/Python script
Coming back to the mini analysis in this post, let’s re-implement it in Spark SQL with a DataFrame:
Much faster and more elegant, right?
Reading and writing data
Spark < 2.0 has the spark-csv
package to read CSVs, which must be supplied when calling pyspark from the command line. But importing CSVs as an RDD and mapping to DataFrames works, too.
Writing data from a DataFrame’s write
method can only write to partitioned files. If you want to write a single text file, use the RDDs saveAsTextFile
method.
Use metastore tables as an input source or output sink
On the Cloudera sandbox, you connect to the metastore database by issuing
(there is no space between -p
and cloudera
).
From there, you can list all stored databases via SELECT * FROM DBS;
. By default, you have one toy database stored in the DBS
table, which is at hdfs:///user/hive/warehouse
. The table TBLS
shows which tables exist in which DB. And lastly, you get the columns with column types in the table COLUMNS_V2
.
This is how you would access and modify the metastore, but I don’t see the advantage of diving in there. It seems both easier and safer to just modify the tables from within, say, pyspark.
Filter, aggregate, join (between disparate sources), rank, and sort datasets
Consider the following two DataFrame objects, users
and hobbies
:
# users
# ID, name
0, "Hans"
1, "Peter
# hobbies
# userID, hobby, frequency
0, "gym", "daily"
0, "drawing", "weekly"
1, "reading", "daily"
1, "guitar", "weekly"
1, "gaming", "daily"
1, "movies", "daily"
To use all objectives (filter, join, aggregate, rank) in one task, we are now going to do the following: Rank the users by number of daily hobbies. That is:
- Filter and keep only the daily hobbies.
- Join users to (daily) hobbies
- Aggregate and count the daily hobbies per users
- Rank the resulting table by number of daily hobbies
First, we do it using pyspark methods:
# result:
# name, n
Hans, 1
Peter, 3
Ranking is an inefficient operation, and still “a bit” tedious to write in pyspark:
# name, n, rankN
Peter, 3, 1
Hans, 1, 2
Next, we do the same thing using Spark SQL:
users.registerTempTable("users")
hobbies.registerTempTable("hobbies")
sqlContext.sql("
SELECT doot.name, doot.n, RANK() OVER (ORDER BY doot.n DESC) AS rankN
FROM (
SELECT u.name, COUNT(u.name) as n
FROM hobbies h LEFT JOIN users u ON u.ID = h.userID
WHERE h.frequency = 'daily'
GROUP BY u.name
) doot
").show()