This post is part of my preparation series for the Cloudera CCA175 exam, “Certified Spark and Hadoop Developer”. It is intentionally concise, to serve me as a cheat sheet.

There are two categories of operations on RDDs: Transformations modify an RDD (e.g. filter out some lines) and return an RDD, and actions modify an RDD and return a Python object.

I’ll show examples with two RDDs: one consists of only values (think “one column”, the other of key/value pairs). These are the example RDDs:

# only values:
v_RDD = sc.parallelize([1, 2, 4, 8, 16])

# keys (year) and values (temperature):
kv_RDD = sc.parallelize([(1950, 0), (1950, 22), (1950, -11), (1949, 111), (1949, 78)])

A key/value RDD just contains a two element tuple, where the first item is the key and the second item is the value (it can be a list of values, too). The best idea is probably to open a pyspark shell and experiment and type along. You can always “print out” an RDD with its .collect() method.

In the following, I’ll go through a quick explanation and an example for the most common methods. A comprehensive list is available here.

Transformations

  • map: Transform your data row-wise and 1:1 with a function
v_RDD.map(lambda x: math.log(x, 2))  # transform value into its log2
kv_RDD.map(lambda row: (row[0], float(row[1]-32)*5/9))  # convert temperature to celsius
  • flatMap: Similar but “flattens” the results, i.e. loses one dimension. Think unlist() in R. The resulting value y in the following example now has five elements after flatMap(), instead of two elements after map().
>>> x = sc.parallelize(["one two three", "four five"])

>>> y = x.map(lambda x: x.split(' '))
>>> y.collect()
[['one', 'two', 'three'], ['four', 'five']]

>>> y = x.flatMap(lambda x: x.split(' '))
>>> y.collect()
['one', 'two', 'three', 'four', 'five']
  • reduceByKey: Reduces an RDD but keeps it as an RDD (unlike ‘reduce’)
kv_RDD.reduceByKey(max)  # create RDD with (year, maxTemperature)
kv_RDD.reduceByKey(lambda x, y: x+y)  # sums up temperatures by year
  • groupByKey: Summarizes the RDD into unique keys and an Iterable of all values. This can be passed to mapValues then, for example. The following line is one of many ways to count the number of elements per key:
kv_RDD.groupByKey().mapValues(len).collect()
  • mapValues, flatMapValues: More efficient than map and flatMap because Spark can maintain the partitioning. Try to use these functions instead where possible.

  • sortByKey: Sorts the keys in ascending order. You can use sortBy to define a custom sorting function (e.g. lambda x: x[1] for the second “column”)

  • filter: Select only interesting entries from your RDD

kv_RDD.filter(lambda x: x[1] > 0)  # keep only positive temperatures
  • distinct: Keep only unique elements

  • join, rightOuterJoin, leftOuterJoin, fullOuterJoin: Performs joins on the keys of two RDDs, and returns the keys along with the pairs of values.

users = sc.parallelize([(0, "Alex"), (1, "Bert"), (2, "Curt"), (3, "Don")])
hobbies = sc.parallelize([(0, "writing"), (0, "gym"), (1, "swimming")])

users.join(hobbies).collect()
# [(0, ('Alex', 'writing')), (0, ('Alex', 'gym')), (1, ('Bert', 'swimming'))]

users.join(hobbies).map(lambda x: x[1][0] + " likes " + x[1][1]).collect()
# ['Alex likes writing', 'Alex likes gym', 'Bert likes swimming']

users.leftOuterJoin(hobbies).collect()
# [(0, ('Alex', 'writing')), (0, ('Alex', 'gym')), (1, ('Bert', 'swimming')), (2, ('Curt', None)), (3, ('Don', None))]
  • union, intersection, subtract, cartesian: These set operations take two RDDs and combine them.
kv_RDD.union(v_RDD).collect()  # combine both RDDs (and print them using collect())

Actions

  • collect: Dump all elements, i.e. converts the RDD to a Python list
  • count: Returns the number of elements in an RDD
  • countByValue: Outputs a dictionary of (key, n), i.e. a count, by unique value. This is similar to doing map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y).collect(). Note that you need the .items() method on your dictionary to convert it into a list of key/value-tuples.
  • take, top: Sample a few values
  • reduce: Aggregate all values for a given key value

Detailed explanations are available here.

Note that transformations return RDDs, but actions return “normal” Python objects. After an action, you can use standard Python on these objects again.

Your RDDs can include single values per element (e.g. one string representing an entire line from a text file), or key-value pairs. With keys, you can work on RDDs much like on NoSQL databases. You create key-value RDDs by having a map output two values for each input, e.g. with z = y.map(lambda x: (x, 1)). Here, the value will be 1. You can also use lists as values.

Some advanced calls

Using this data set (year, temperature, quality code):

1950	0	1
1950	22	1
1950	-11	1
1949	111	1
1949	78	1

In Python, the following piece of code selects all values where the year is not 9999 (a NA value), and the quality score is one of 0, 1, 4, 5, and 9. Then, it selects the year (as key) and temperature (as value), and outputs a text file with the two lines (1949, 111) and (1950, 22).

from pyspark import SparkContext
import re, sys

sc = SparkContext("local", "Max Temperature")
sc.textFile(sys.argv[1]) \
  .map(lambda s: s.split("\t")) \
  .filter(lambda rec: (rec[1] != "9999" and re.match("[01459]", rec[2]))) \
  .map(lambda rec: (int(rec[0]), int(rec[1]))) \
  .reduceByKey(max) \
  .saveAsTextFile(sys.argv[2])

The function reduceByKey can take an anonymous function that is associative and commutative and combines two arguments. For example: reduceByKey(lambda x,y: x+y) would just add up all values by key.

Custom map functions to parse lines

Suppose we have this sample data:

0,Alex,30,123
1,Bert,32,234
2,Curt,28,312
3,Don,32,89

For each age, we want to get the average number of friends.

You can write your own mapper function like this:

def parseLine(line):
    fields = line.split(",")  # split line into list at comma positions
    age = int(fields[2])  # extract and typecast relevant fields
    numFriends = int(fields[3])
    return(age, numFriends)

lines = sc.textFile("hdfs:///...")
rdd = lines.map(parseLine)  # each line gets sent through parseLine()

To get the average, we first get the sum total and the number of entries per age.

totalsByAge = rdd \
    .mapValues(lambda x: (x, 1)) \
    .reuceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))

We transform each value, e.g. 255 friends, into a pair (255, 1). We can then sum up both elements of the value pair and divide the total sum by the total count to get the average.

averagesByAge = totalsByAge.mapValues(lambda x: x[0] / x[1])
averagesByAge.collect()

Advanced concepts

  • You can use distData = sc.broadcast(someData) to distribute your object someData so that the cluster can access it quickly. Use distData.value to access it.
  • Accumulators allow all task executors to increment a shared variable. Create one with myCounter = sc.accumulator(0). Increase it with myCounter.add(1) and access the value with myCounter.value.
  • Spark discards RDDs after you’ve called an action on them. If you want to keep them for further evaluation, use .cache() or .persist() on them

DataFrames and Spark SQL

These two concepts extend the RDD concept to a “DataFrame” object that contains structured data. DataFrames contain Row objects, which allows you to issue SQL queries. The fact that the data has a schema allows Spark to run some optimization on storage and querying. You can also easier read and write to JSON, Hive, or Parquet, and also communicate with JDBC/ODBC or even Tableau.

Some examples using the following data again:

0,Alex,30,123
1,Bert,32,234
2,Curt,28,312
3,Don,32,89

The following script loads this data and creates a DataFrame. Note that with Spark 2.0, this will be a bit easier. This is the Spark 1.6 solution.

from pyspark import SparkContext
from pyspark.sql import SQLContext, Row

sc = SparkContext()
sqlContext = SQLContext(sc)

def processToRDD(line):
    f = line.split(",")
    return f

myRDD = sc.textFile("file:///home/cloudera/Downloads/4lineCSV.txt").map(processToRDD)
# myRDD.collect()

def processToDF(rdd_line):
    return(Row(ID=int(rdd_line[0]), name=rdd_line[1], age=int(rdd_line[2]), nFriends=int(rdd_line[3])))

myRows = myRDD.map(processToDF)
# myRows.collect()  # this is an RDD of Row objects now. Almost finished!

myDF = sqlContext.createDataFrame(myRows)
# or:
myDF = myRows.toDF()
myDF.show()  # pretty!

# Working with DataFrames
myDF.show()                             # top 20 rows
myDF.select(myDF.ID, myDF.age)          # select columns
myDF.filter(myDF.age > 30)              # filter rows
myDF.groupBy('age').mean()              # aggregations

myDF.rdd().map(mapFct)                  # transform back to RDD

myDF.groupBy("age").count().orderBy("age").show()  # chained commands
myDF.groupBy("age").agg({'nFriends': 'mean'}).show()  # "agg" knows avg, max, min, sum, count.

Technicalities: In Spark 1.6, DataFrames appeared. In Spark 2.0, DataFrames became DataSets of Row objects. In Spark 2.0 you should use DataSets where possible. They are more general and can contain elements of other classes as well. The CCA175 currently only comes with Spark 1.6 though.

# Actual Spark SQL:

sqlContext.registerDataFrameAsTable(myDF, "myTab")
myDF.registerTempTable("myTab")  # alternatively

res = sqlContext.sql("SELECT * FROM myTab WHERE age > 30")

# Rank data:
res = sqlContext.sql("SELECT *, RANK() OVER (ORDER BY age) AS rank FROM myTab")

# Add new columns:
myDF.withColumns("newCol", lit(0))  # or instead of lit(0): functions.exp("age")

Joins with DataFrames or SparkSQL

Let’s create a second DataFrame with some users’ hobbies, and join the user name to it:

hobbies_RDD = sc.parallelize([(0, "writing"), (0, "gym"), (1, "swimming")])
hobbies_DF = hobbies_RDD.map(lambda x: Row(ID=int(x[0]), hobby=x[1])).toDF()

hobbies_DF.join(myDF, myDF.ID == hobbies_DF.ID)

# The above solution unfortunately returns two columns, both called "ID". In
#  the case of the join columns having the same name, refer to it with a string
#  to keep only one column:

res = hobbies_DF.join(myDF, "ID")

res.show()

# +---+--------+---+--------+----+
# | ID|   hobby|age|nFriends|name|
# +---+--------+---+--------+----+
# |  0| writing| 30|     123|Alex|
# |  0|     gym| 30|     123|Alex|
# |  1|swimming| 32|     234|Bert|
# +---+--------+---+--------+----+

The exact same result can be obtained via “actual” SQL as follows:

sqlContext.registerDataFrameAsTable(myDF, "users")
sqlContext.registerDataFrameAsTable(hobbies_DF, "hobbies")

res = sqlContext.sql("
    SELECT hobbies.*, users.age, users.nFriends, users.name
    FROM hobbies
    LEFT JOIN users ON hobbies.ID == users.ID
")
res.show()

Ranking data with DataFrames or SparkSQL

We’ll rank the heights of these six people, first globally, and then grouped per gender. I’ll also show it first using DataFrames, and then via Spark SQL.

from pyspark.sql import Row

rdd = sc.parallelize([
    (0, "Anna", "female", 172),
    (1, "Bert", "male", 182),
    (2, "Curt", "male", 170),
    (3, "Doris", "female", 164),
    (4, "Edna", "female", 171),
    (5, "Felix", "male", 160)
])

df = rdd.map(lambda r: Row(ID=int(r[0]), name=r[1], sex=r[2], height=int(r[3]))).toDF()

DataFrame solutions:

from pyspark.sql import Window
from pyspark.sql.functions import rank, min

df.select("name", "height",
          rank().over(Window.orderBy("height")).alias("ronk")).show()

df.select("name", "sex", "height",
          rank().over(Window.partitionBy("sex").orderBy("height")).alias("ronk")).show()

SparkSQL solutions:

sqlContext.registerDataFrameAsTable(df, "people")

ranked_df = sqlContext.sql("
    SELECT *, RANK() OVER (ORDER BY height) AS ronk
    FROM people
")

group_ranked_df = sqlContext.sql("
    SELECT *, RANK() OVER (PARTITION BY sex ORDER BY height) AS grouped_ronk
    FROM people
")