This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.

In this post, I’ll very briefly summarize the Spark SQL functions necessary for the CCA175 exam. I also have a longer article on Spark available that goes into more detail and spans a few more topics.

Introduction

  • RDDs can contain any kind of unstructured data
  • Spark 1.6 introduced DataFrames, which are DataSets of Row objects. These Row objects contain structured data (i.e. they have a schema: names and types)
  • Big advantages:
    • You can run SQL queries on DataFrames
    • You can read and write to JSON, Hive, and parquet
    • You can communicate with JDBC/ODBC
  • Note: The all-encompassing SparkSession creator only appeared in Spark 2.0. Currently, the CCA175 exam provides Spark 1.6, where you still work with a SparkContext, and a SQLContext or HiveContext “on top”.

Creating DataFrames and DataSets

Spark has a .toDS and .toDF method, that converts an RDD to a DataSet or DataFrame. Note that you need to map the lines into Row objects first:

rdd = sc.parallelize([1,2,3,4])
df = rdd.map(lambda l: Row(l)).toDF()

You can also use an sqlContext to directly load data as a DataFrame:

df = sqlContext.read.load("examples/src/main/resources/people.json", format="json")

df.select("name", "age").write.save("namesAndAges.parquet", format="parquet")

Two ways to play

Consider the following table people.csv:

0, "Anna", 32, "female", 172
1, "Bert", 21, "male", 182
2, "Curt", 42, "male", 170
3, "Doris", 43, "female", 164
4, "Edna", 22, "female", 171
5, "Felix", 19, "male", 160
from pyspark.sql import Row

# json data could have been loaded simply by:
# myDF = sqlContext.read.format('json').load('/home/cloudera/people.json')

myDF = sc.textFile("file:///home/cloudera/people.csv"). \
    map(lambda l: l.split(",")). \
    map(labmda l: Row(ID=int(l[0]), name=l[1], age=int(l[2]), sex=l[3], height=int(l[4]))).toDF()

There are two ways of programming Spark SQL queries. One is to issue actual SQL statements, like so:

# You must register a (temporary) table before being able to issue SQL queries:
myDF.registerTempTable("myDF_tab")

sqlContext.sql("SELECT sex, AVG(height) AS avgheight FROM myDF_tab WHERE age > 21 GROUP BY sex")

The other way is to chain the respective methods together, in this example, in Python:

myDF.filter('age > 21').groupBy('sex').mean('height')

This is similar to the intuitive way the R packages dplyr and magrittr use. What’s more, this way you don’t have to create a temporary view.

A basic Spark/Python script

Coming back to the mini analysis in this post, let’s re-implement it in Spark SQL with a DataFrame:

myDF = sc.parallelize([
  ("Monday", 1.24),
  ("Tuesday", 1.42),
  ("Sunday", 1.33),
  ("Sunday", 1.21),
  ("Monday", 1.18)
]). \
    map(lambda l: Row(day=l[0], price=l[1]). \  # float typecasting is not necessary
    toDF()

# Total mean price, just for fun:
myDF.groupBy().mean('price')  # group by empty because 'mean' is only available for GroupedData objects

myDF.select('day', (myDF.price * 1.18 / 0.265).alias('priceUSDperGallon')). \
    groupBy('day'). \
    mean('priceUSDperGallon'). \
    show()

Much faster and more elegant, right?

Reading and writing data

Spark < 2.0 has the spark-csv package to read CSVs, which must be supplied when calling pyspark from the command line. But importing CSVs as an RDD and mapping to DataFrames works, too.

Writing data from a DataFrame’s write method can only write to partitioned files. If you want to write a single text file, use the RDDs saveAsTextFile method.

myDF.write.format('json').save('file:///home/cloudera/myDFjson')
myDF = sqlContext.read.format('json').load('file:///home/cloudera/myDFjson')

Use metastore tables as an input source or output sink

On the Cloudera sandbox, you connect to the metastore database by issuing

mysql -u root -pcloudera metastore

(there is no space between -p and cloudera).

From there, you can list all stored databases via SELECT * FROM DBS;. By default, you have one toy database stored in the DBS table, which is at hdfs:///user/hive/warehouse. The table TBLS shows which tables exist in which DB. And lastly, you get the columns with column types in the table COLUMNS_V2.

This is how you would access and modify the metastore, but I don’t see the advantage of diving in there. It seems both easier and safer to just modify the tables from within, say, pyspark.

Filter, aggregate, join (between disparate sources), rank, and sort datasets

Consider the following two DataFrame objects, users and hobbies:

# users
# ID, name
0, "Hans"
1, "Peter
# hobbies
# userID, hobby, frequency
0, "gym", "daily"
0, "drawing", "weekly"
1, "reading", "daily"
1, "guitar", "weekly"
1, "gaming", "daily"
1, "movies", "daily"

To use all objectives (filter, join, aggregate, rank) in one task, we are now going to do the following: Rank the users by number of daily hobbies. That is:

  1. Filter and keep only the daily hobbies.
  2. Join users to (daily) hobbies
  3. Aggregate and count the daily hobbies per users
  4. Rank the resulting table by number of daily hobbies

First, we do it using pyspark methods:

from pyspark.sql.functions import col

result = hobbies. \
filter(hobbies.frequency == "daily"). \
join(users, users.ID == hobbies.userID). \
groupBy("ID", "name"). \
agg({"hobby": "count"}). \
select(users.name, col("count(hobby)").alias("n"))
# result:
# name, n
Hans, 1
Peter, 3

Ranking is an inefficient operation, and still “a bit” tedious to write in pyspark:

from pyspark.sql.window import Window

result. \
    select("name", "n", col("n").alias("rankN")). \
    withColumn("rankN", rank().over(Window.orderBy(desc("rankN")))). \
	show()
# name, n, rankN
Peter, 3, 1
Hans, 1, 2

Next, we do the same thing using Spark SQL:

users.registerTempTable("users")
hobbies.registerTempTable("hobbies")

sqlContext.sql("

SELECT doot.name, doot.n, RANK() OVER (ORDER BY doot.n DESC) AS rankN
FROM (
    SELECT u.name, COUNT(u.name) as n
    FROM hobbies h LEFT JOIN users u ON u.ID = h.userID
    WHERE h.frequency = 'daily'
    GROUP BY u.name
) doot
	
").show()