On Spark, you can very flexible write scripts for data processing. It provides APIs in Java, Python, or Scala. Writing in Python is easier, but the programs will be slower and a little less reliable. Spark itself is written in Scala, so this language is recommended. Scala is very similar to Python, so the transition is not too hard. Its flexibility allows you to run advanced analyses, such as iterative algorithms and interactive analyses.

The most important difference to other technologies such as Hive is that Spark has a rich ecosystem on top of it, such as Spark Streaming (for real-time data), Spark SQL (a SQL interface to write SQL queries and functions), MLLib (a library to run MapReduce versions ofmachine learning algorithms on a dataset in Spark), and GraphX (analyzing graphs, for example social networks).

Spark can run on Hadoop, but it doesn’t have to. You can even choose other cluster managers where Spark can run on.

Spark is very fast due to two main reasons: It is not disk-based, but memory-based. It tries to keep as much as possible in RAM, even between jobs. The second factor that contributes to its 10-100x speedup compared to the standard Hadoop MapReduce is its use of directed acyclic graphs (more on this later) to optimize workflows.

Resilient Distributed Datasets (RDDs)

The central concept in Spark is the Resilient Distributed Dataset, or RDD. They are an abstraction for a large dataset. In a typical Spark application, you load one or more RDDs, then perform transformations on them to create a target RDD, which you then store on your file system.

You can create RDDs to experiment from the Spark shell, or load files from your file system, the HDFS, or from HBase. You can even load data from Amazon’s S3 service with the s3n:// protocol. Also, RDDs can be created from any database that is connected to JDBC.

Transforming RDDs can be done through commands such as map, flatmap, filter, distinct, sample, or set operations like union. These look similar to Pig, but Spark is a lot more powerful.

Reducing RDDs can be done by so-called actions, for example collect, count, countByValue, take or reduce.

Spark uses lazy evaluation, so it only executes a query when it hits an action. Then, it can go backward and work out the most efficient way of wrangling the data.

An example Spark 1.0 script

Here is a Spark script (in Python) that analyzes the MovieLens dataset I worked with in the other posts here. It aggregates the average ratings for each movie and prints out the 10 worst rated movies.

Note that this is the Spark 1 way of doing it. A script in Spark 2 will be a bit more simple.

from pyspark import SparkConf, SparkContext
 
# This function just creates a Python "dictionary" we can later
# use to convert movie ID's to movie names while printing out
# the final results.
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:  # this is loaded locally, not from HDFS
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames
 
# Take each line of u.data and convert it to (movieID, (rating, 1.0))
# This way we can then add up all the ratings for each movie, and
# the total number of ratings for each movie (which lets us compute the average)
def parseInput(line):
    fields = line.split()
    return (int(fields[1]), (float(fields[2]), 1.0))
 
if __name__ == "__main__":
    # The main script - create our SparkContext
    conf = SparkConf().setAppName("WorstMovies")
    sc = SparkContext(conf = conf)
 
    # Load up our movie ID -> movie name lookup table
    movieNames = loadMovieNames()
 
    # Load up the raw u.data file
    lines = sc.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
 
    # Convert to (movieID, (rating, 1.0))
    movieRatings = lines.map(parseInput)
 
    # Reduce to (movieID, (sumOfRatings, totalRatings))
    ratingTotalsAndCount = movieRatings.reduceByKey(lambda movie1, movie2: ( movie1[0] + movie2[0], movie1[1] + movie2[1] ) )
 
    # Filter out movies rated 10 or fewer times
    popularTotalsAndCount = ratingTotalsAndCount.filter(lambda key_val: key_val[1][1] > 10)
 
    # Map to (rating, averageRating)
    averageRatings = popularTotalsAndCount.mapValues(lambda totalAndCount : totalAndCount[0] / totalAndCount[1])
 
    # Sort by average rating
    sortedMovies = averageRatings.sortBy(lambda x: x[1])
 
    # Take the top 10 results
    results = sortedMovies.take(10)
 
    # Print them out:
    for result in results:
        print(movieNames[result[0]], result[1])

To run it, don’t invoke Python directly, but use the spark-submit command:

spark-submit LowestRatedMovieSpark.py

This should give you some output, as well as the 10 worst movies in this database:

[maria_dev@sandbox ~]$ spark-submit LowestRatedMovieSpark.py 
Multiple versions of Spark are installed but SPARK_MAJOR_VERSION is not set
Spark1 will be picked by default
('3 Ninjas: High Noon At Mega Mountain (1998)', 1.0)                            
('Beyond Bedlam (1993)', 1.0)
('Power 98 (1995)', 1.0)
('Bloody Child, The (1996)', 1.0)
('Amityville: Dollhouse (1996)', 1.0)
('Babyfever (1994)', 1.0)
('Homage (1995)', 1.0)
('Somebody to Love (1994)', 1.0)
('Crude Oasis, The (1995)', 1.0)
('Every Other Weekend (1990)', 1.0)

I loved “3 Ninjas” as a kid!

Spark 2.0, Spark SQL, DataFrames and DataSets

Spark 2.0 extends RDDs to a “DataFrame” object. These DataFrames contain row objects which contain column types and names. You can run SQL queries (the Hadoop community seems to love SQL), read and write to JSON or Hive, and communicate with JDBC/ODBC and even Tableau.

In Python, you can read a DataFrame object from Hive by HiveContext(), or a JSON file by spark.read.json(), and then run SQL commands on them.

You can even perform many operations directly in Python, for example by issuing myDataFrame.groupBy(myDataFrame("movieID")).mean(). This would compute column means per movie ID. Very handy, isn’t it?

To convert a DataFrame object back to its underlying RDD structure, simply call myDataFrame.rdd(), and then work on the RDD level, e.g. myDataFrame.rdd().map(myMapperFunction).

A DataFrame is really a DataSet of Row objects. The more general DataSet class can hold more than Row objects. This allows Spark to know more about the data structure upfront, and provides advantages such as giving compile-time (as opposed to run-time) errors when writing in Java or Scala. In Spark 2.0, it is recommended to use DataSets instead of DataFrames where possible. Additionally to providing some speed-ups, they are the unified API between the subsystems (e.g. the machine learning library) of Spark 2.

Extending Spark SQL with User-defined functions (UDFs)

It is possible to write Python functions and use them within SQL calls! This example squares a column while extracting it:

from pyspark.sq.types import IntegerType
hiveCtx.registerFunction("square", lambda x: x*x, IntegerType())
df = hiveCtx.sql("SELECT square('someNumericField') FROM tableName)

Revisiting the worst rated movies in Spark 2.0

Coming back to the Spark 1 script from above, we’ll now reimplement it in Spark 2.0.

Instead of a SparkContext, you now create a SparkSession. This encompasses both a Spark context and a SQL context. The parseInput() function does not return a tuple, but a resilient Row object that has column types and column names. The line that creates averageRatings is very elegant and shows the advantage of Spark 2.0 over RDDs here:

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
 
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1]
    return movieNames
 
def parseInput(line):
    fields = line.split()
    return Row(movieID = int(fields[1]), rating = float(fields[2]))
 
if __name__ == "__main__":
    # Create a SparkSession (the config bit is only for Windows!)
    spark = SparkSession.builder.appName("PopularMovies").getOrCreate()
 
    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()
 
    # Get the raw data
    lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.data")
    # Convert it to a RDD of Row objects with (movieID, rating)
    movies = lines.map(parseInput)
    # Convert that to a DataFrame
    movieDataset = spark.createDataFrame(movies)
 
    # Compute average rating for each movieID
    averageRatings = movieDataset.groupBy("movieID").avg("rating")
 
    # Compute count of ratings for each movieID
    counts = movieDataset.groupBy("movieID").count()
 
    # Join the two together (We now have movieID, avg(rating), and count columns)
    averagesAndCounts = counts.join(averageRatings, "movieID")
 
    # Filter movies rated 10 or fewer times
    popularAveragesAndCounts = averagesAndCounts.filter("count > 10")
    # Pull the top 10 results
    topTen = popularAveragesAndCounts.orderBy("avg(rating)").take(10)
 
    # Print them out, converting movie ID's to names as we go.
    for movie in topTen:
        print (movieNames[movie[0]], movie[1], movie[2])
 
    # Stop the session
    spark.stop()

We also computed the rating counts and then joined the two tables together. This was done from Python, but we could have also used SQL commands. The output now shows you the average rating of the worst movies, plus the number of people who rated the movie.

[maria_dev@sandbox ~]$ export SPARK_MAJOR_VERSION=2
[maria_dev@sandbox ~]$ spark-submit LowestRatedMovieDataFrame.py
 
[...]
 
('Amityville: A New Generation (1993)', 5, 1.0)
('Hostile Intentions (1994)', 1, 1.0)
('Lotto Land (1995)', 1, 1.0)
('Careful (1992)', 1, 1.0)
('Falling in Love Again (1980)', 2, 1.0)
('Power 98 (1995)', 1, 1.0)
('Low Life, The (1994)', 1, 1.0)
('Amityville: Dollhouse (1996)', 3, 1.0)
('Further Gesture, A (1996)', 1, 1.0)
('Touki Bouki (Journey of the Hyena) (1973)', 1, 1.0)

Using MLLib on Spark

The previous examples are rather simple, and could have been implemented in Pig or Hive, as well. The power of Spark unleashes with more complex analyses. The machine learning library (MLLib) that’s built on top of Spark enables you to do exactly these complex analyses. Implementing something like alternating least squares (ALS) in pure MapReduce would be insanely harder.

In the following script, you train the ALS algorithm on the movie ratings data set, and create a “dummy user” with the UID 0. He has rated Star Wars and The Empire Strikes Back (movie IDs 50 and 172) with 5 stars, and Gone With the Wind (movie ID 133) with 1 star. I used Hive from Ambari to input these three rows:

INSERT INTO ratings VALUES
 (0, 50, 5, 881250949),
 (0, 172, 5, 881250949),
 (0, 133, 5, 881250949);

Using this information, the following script then predicts his ratings for all the other movies.

from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
from pyspark.sql.functions import lit
 
# Load up movie ID -> movie name dictionary
def loadMovieNames():
    movieNames = {}
    with open("ml-100k/u.item") as f:
        for line in f:
            fields = line.split('|')
            movieNames[int(fields[0])] = fields[1].decode('ascii', 'ignore')
    return movieNames
 
# Convert u.data lines into (userID, movieID, rating) rows
def parseInput(line):
    fields = line.value.split()
    return Row(userID = int(fields[0]), movieID = int(fields[1]), rating = float(fields[2]))
 
 
if __name__ == "__main__":
    # Create a SparkSession (the config bit is only for Windows!)
    spark = SparkSession.builder.appName("MovieRecs").getOrCreate()
 
    # Load up our movie ID -> name dictionary
    movieNames = loadMovieNames()
 
    # Get the raw data
    lines = spark.read.text("hdfs:///user/maria_dev/ml-100k/u.data").rdd
 
    # Convert it to a RDD of Row objects with (userID, movieID, rating)
    ratingsRDD = lines.map(parseInput)
 
    # Convert to a DataFrame and cache it
    ratings = spark.createDataFrame(ratingsRDD).cache()
 
    # Create an ALS collaborative filtering model from the complete data set
    als = ALS(maxIter=5, regParam=0.01, userCol="userID", itemCol="movieID", ratingCol="rating")
    model = als.fit(ratings)
 
    # Print out ratings from user 0:
    print("\nRatings for user ID 0:")
    userRatings = ratings.filter("userID = 0")
    for rating in userRatings.collect():
        print movieNames[rating['movieID']], rating['rating']
 
    print("\nTop 20 recommendations:")
    # Find movies rated more than 100 times
    ratingCounts = ratings.groupBy("movieID").count().filter("count > 100")
    # Construct a "test" dataframe for user 0 with every movie rated more than 100 times
    popularMovies = ratingCounts.select("movieID").withColumn('userID', lit(0))
 
    # Run our model on that list of popular movies for user ID 0
    recommendations = model.transform(popularMovies)
 
    # Get the top 20 movies with the highest predicted rating for this user
    topRecommendations = recommendations.sort(recommendations.prediction.desc()).take(20)
 
    for recommendation in topRecommendations:
        print (movieNames[recommendation['movieID']], recommendation['prediction'])
 
    spark.stop()

Don’t forget to export SPARK_MAJOR_VERSION=2 again, and run this Spark job as before. Here is the output of this script:

Ratings for user ID 0:
Star Wars (1977) 5.0
Empire Strikes Back, The (1980) 5.0
Gone with the Wind (1939) 1.0
 
Top 20 recommendations:
(u'Wrong Trousers, The (1993)', 5.749821662902832)                              
(u'Fifth Element, The (1997)', 5.2325282096862793)
(u'Close Shave, A (1995)', 5.0506253242492676)
(u'Monty Python and the Holy Grail (1974)', 4.9965953826904297)
(u'Star Wars (1977)', 4.9895496368408203)
(u'Army of Darkness (1993)', 4.980320930480957)
(u'Empire Strikes Back, The (1980)', 4.9729299545288086)
(u'Princess Bride, The (1987)', 4.9577054977416992)
(u'Blade Runner (1982)', 4.9106745719909668)
(u'Return of the Jedi (1983)', 4.7780814170837402)
(u'Rumble in the Bronx (1995)', 4.6917591094970703)
(u'Raiders of the Lost Ark (1981)', 4.6367182731628418)
(u"Jackie Chan's First Strike (1996)", 4.632108211517334)
(u'Twelve Monkeys (1995)', 4.6148405075073242)
(u'Spawn (1997)', 4.5741710662841797)
(u'Terminator, The (1984)', 4.5611510276794434)
(u'Alien (1979)', 4.5415172576904297)
(u'Terminator 2: Judgment Day (1991)', 4.529487133026123)
(u'Usual Suspects, The (1995)', 4.5179119110107422)
(u'Mystery Science Theater 3000: The Movie (1996)', 4.5095906257629395)

Very advanced stuff, and relatively simple to run thanks to MLLib!