MongoDB (from huMONGOus) is a popular choice in corporations, because it’s built by an actual corporation that supports it, instead of being a “pure” open source project. Their website also looks a lot like it’s targeting large corporations.

In the triangle of the CAP theorem, MongoDB sits in the corner of “consistency + partition tolerance”. It has a single master node that you have to talk to to ensure consistency. But if that master node goes down, your availability is of course gone.

MongoDB has a document-based data model that looks like JSON. This means you can put data of almost any structure in a database, for example blog posts. MongoDB does not enforce a schema, so you can have different fields in every document if you want (although it’s maybe not a good idea to do so). Unlike in Cassandra, you don’t need a single unique key identifier. Instead, you can create indices on any field or combination of fields.

Terminology

MongoDB uses a slightly different terminology to highlight the fact that it’s different from relational database.

  • Databases: Basically the same as a database in the relational sense.
  • Collections: A collection is the equivalent to a table in a relational database.
  • Documents: Since the JSON model allows you to store very unstructured data, the word “row” might not make sense here. Instead of tables consisting of rows, in MongoDB we talk of collections consisting of single documents.

MongoDB has extensions that are substitutes for some of Hadoop’s core tools. For example, MongoDB has its own file system, GridFS, and it has built-in aggregation capabilities, so that in some cases you don’t even have to use Hadoop’s MapReduce anymore. But, you can still integrate MongoDB with other tools like Spark.

The MongoDB shell

You can access the MongoDB shell by issuing mongo on your command line. From there you can execute JavaScript code to query your database. For example, the following commands return a collection of all documents with the user_id 100 (just one document, in this case):

> use movielens
switched to db movielens
> db.users.find( {user_id: 100} )
{ "_id" : ObjectId("5964e7f946e0fb4607ef6540"), "age" : NumberLong(36),
"gender" : "M", "occupation" : "executive", "user_id" : NumberLong(100),
"zip" : "90254" }

There is one problem though: The user_id is not yet declared as an index. So when you search for the user ID 100, MongoDB can’t do this efficiently, but instead does a full table scan. If you want to speed up these queries, you have to define the user_ID as an index:

> db.users.createIndex( {user_id: 1} )

The ‘1’ here just means to sort in ascending order. The difference between having an index and not can be visualized by having MongoDB explain what it will do when you execute a query. You can make it show the “winning plan” to get the results for a query like this:

> db.users.explain().find({user_id: 100})

This outputs a JSON document that shows you how it would proceed in finding the user with the user_ID 100. Without an index, you would find the keyword COLLSCAN (for collective scan) in the response, and with an index, it would do the more efficient IXSCAN (an index scan).

Aggregating data in the Mongo shell

As mentioned before, MongoDB has built-in aggregation functions. The syntax is slightly odd, but this is how you would group the user data by occupation, and get the average age in each group:

> db.users.aggregate( [ { $group: { _id: { job: "$occupation"}, avgAge: { $avg: "$age" } } } ] )
{ "_id" : { "job" : "doctor" }, "avgAge" : 43.57142857142857 }
{ "_id" : { "job" : "healthcare" }, "avgAge" : 41.5625 }
{ "_id" : { "job" : "none" }, "avgAge" : 26.555555555555557 }
{ "_id" : { "job" : "engineer" }, "avgAge" : 36.38805970149254 }
{ "_id" : { "job" : "homemaker" }, "avgAge" : 32.57142857142857 }
{ "_id" : { "job" : "marketing" }, "avgAge" : 37.61538461538461 }
{ "_id" : { "job" : "artist" }, "avgAge" : 31.392857142857142 }
{ "_id" : { "job" : "librarian" }, "avgAge" : 40 }
{ "_id" : { "job" : "entertainment" }, "avgAge" : 29.22222222222222 }
{ "_id" : { "job" : "scientist" }, "avgAge" : 35.54838709677419 }
{ "_id" : { "job" : "salesman" }, "avgAge" : 35.666666666666664 }
{ "_id" : { "job" : "educator" }, "avgAge" : 42.01052631578948 }
{ "_id" : { "job" : "lawyer" }, "avgAge" : 36.75 }
{ "_id" : { "job" : "student" }, "avgAge" : 22.081632653061224 }
{ "_id" : { "job" : "programmer" }, "avgAge" : 33.121212121212125 }
{ "_id" : { "job" : "administrator" }, "avgAge" : 38.74683544303797 }
{ "_id" : { "job" : "writer" }, "avgAge" : 36.31111111111111 }
{ "_id" : { "job" : "retired" }, "avgAge" : 63.07142857142857 }
{ "_id" : { "job" : "executive" }, "avgAge" : 38.71875 }
{ "_id" : { "job" : "other" }, "avgAge" : 34.523809523809526 }
Type "it" for more
> it
{ "_id" : { "job" : "technician" }, "avgAge" : 33.148148148148145 }
>

Integrating MongoDB with Spark

The following Python script uses Spark to import the user table from the ml-100k data set into MongoDB, and then reads out all users under 20 years of age. This is the same task, and almost the same script, as in the Cassandra case. Compare the differences, they are only a few lines.

from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql import functions
 
def parseInput(line):
    fields = line.split('|')
    return Row(user_id = int(fields[0]), age = int(fields[1]), gender = fields[2], occupation = fields[3], zip = fields[4])
 
if __name__ == "__main__":
    # Create a SparkSession
    spark = SparkSession.builder.appName("MongoDBIntegration").getOrCreate()
 
    # Get the raw data
    lines = spark.sparkContext.textFile("hdfs:///user/maria_dev/ml-100k/u.user")
    # Convert it to a RDD of Row objects with (userID, age, gender, occupation, zip)
    users = lines.map(parseInput)
    # Convert that to a DataFrame
    usersDataset = spark.createDataFrame(users)
 
    # Write it into MongoDB
    usersDataset.write\
        .format("com.mongodb.spark.sql.DefaultSource")\
        .option("uri","mongodb://127.0.0.1/movielens.users")\
        .mode('append')\
        .save()
 
    # Read it back from MongoDB into a new Dataframe
    readUsers = spark.read\
    .format("com.mongodb.spark.sql.DefaultSource")\
    .option("uri","mongodb://127.0.0.1/movielens.users")\
    .load()
 
    readUsers.createOrReplaceTempView("users")
 
    sqlDF = spark.sql("SELECT * FROM users WHERE age < 20")
    sqlDF.show()
 
    # Stop the session
    spark.stop()

Run it by issuing:

spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 MongoSpark.py

You’ll again see a lot of info messages, and the following table:

+--------------------+---+------+-------------+-------+-----+
|                 _id|age|gender|   occupation|user_id|  zip|
+--------------------+---+------+-------------+-------+-----+
|[5964e7f946e0fb46...|  7|     M|      student|     30|55436|
|[5964e7f946e0fb46...| 19|     F|      student|     36|93117|
|[5964e7f946e0fb46...| 18|     F|      student|     52|55105|
|[5964e7f946e0fb46...| 16|     M|         none|     57|84010|
|[5964e7f946e0fb46...| 17|     M|      student|     67|60402|
|[5964e7f946e0fb46...| 19|     M|      student|     68|22904|
|[5964e7f946e0fb46...| 15|     M|      student|    101|05146|
|[5964e7f946e0fb46...| 19|     M|      student|    110|77840|
|[5964e7f946e0fb46...| 13|     M|        other|    142|48118|
|[5964e7f946e0fb46...| 15|     M|entertainment|    179|20755|
|[5964e7f946e0fb46...| 14|     F|      student|    206|53115|
|[5964e7f946e0fb46...| 19|     M|      student|    221|20685|
|[5964e7f946e0fb46...| 19|     F|      student|    223|47906|
|[5964e7f946e0fb46...| 19|     M|      student|    246|28734|
|[5964e7f946e0fb46...| 17|     M|      student|    257|77005|
|[5964e7f946e0fb46...| 19|     F|      student|    258|77801|
|[5964e7f946e0fb46...| 19|     F|      student|    262|78264|
|[5964e7f946e0fb46...| 18|     F|      student|    270|63119|
|[5964e7f946e0fb46...| 15|     F|      student|    281|06059|
|[5964e7f946e0fb46...| 11|     M|         none|    289|94619|
+--------------------+---+------+-------------+-------+-----+
only showing top 20 rows