Let’s take an in-depth look at a real-time analysis of popular Uber locations using Apache APIs.
According to Gartner, smart cities will be using about 1.39 billion connected cars, IoT sensors, and devices by 2020. The analysis of location and behavior patterns within cities will allow optimization of traffic, better planning decisions, and smarter advertising. For example, the analysis of GPS car data can allow cities to optimize traffic flows based on real-time traffic information. Telecom companies are using mobile phone location data to provide insights by identifying and predicting the location activity trends and patterns of a population in a large metropolitan area. The application of Machine Learning to geolocation data is being used in telecom, travel, marketing, and manufacturing to identify patterns and trends for services such as recommendations, anomaly detection, and fraud.
In this article, we discuss using Spark Structured Streaming in a data processing pipeline for cluster analysis on Uber event data to detect and visualize popular Uber locations.
We start with a review of several Structured Streaming concepts then explore the end-to-end use case.(Note the code in this example is not from Uber, only the data.)
Publish-Subscribe Event Streams with MapR-ES
MapR-ES is a distributed publish-subscribe event streaming system that enables producers and consumers to exchange events in real time in a parallel and fault-tolerant manner via the Apache Kafka API.
A stream represents a continuous sequence of events that goes from producers to consumers, where an event is defined as a key-value pair.
Topics are a logical stream of events. Topics organize events into categories and decouple producers from consumers. Topics are partitioned for throughput and scalability. MapR-ES can scale to very high throughput levels, easily delivering millions of messages per second using very modest hardware.
You can think of a partition like an event log: new events are appended to the end and are assigned a sequential ID number called the offset.
Like a queue, events are delivered in the order they are received.
Unlike a queue, however, messages are not deleted when read.They remain on the partition available to other consumers. Messages, once published, are immutable and can be retained forever.
Not deleting messages when they are read allows for high performance at scale and also for processing of the same messages by different consumers for different purposes such as multiple views with polyglot persistence.
Spark Dataset, DataFrame, SQL
A Spark Dataset is a distributed collection of typed objects partitioned across multiple nodes in a cluster. A Dataset can be manipulated using functional transformations (map, flatMap, filter, etc.) and/or Spark SQL. A DataFrame is a Dataset of Row objects and represents a table of data with rows and columns.
Spark Structured Streaming
Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. Structured Streaming enables you to view data published to Kafka as an unbounded DataFrame and process this data with the same DataFrame, Dataset, and SQL APIs used for batch processing.
As streaming data continues to arrive, the Spark SQL engine incrementally and continuously processes it and updates the final result.
Stream processing of events is useful for real-time ETL, filtering, transforming, creating counters and aggregations, correlating values, enriching with other Data sources or Machine Learning, persisting to files or Database, and publishing to a different topic for pipelines.
Spark Structured Streaming Use Case Example Code
Below is the data processing pipeline for this use case of cluster analysis on Uber event data to detect popular pickup locations.
- Uber trip data is published to a MapR-ES topic using the Kafka API.
- A Spark streaming application subscribed to the topic:
- Ingests a stream of Uber trip data
- Uses a deployed Machine Learning model to enrich the trip data with a cluster ID and cluster location
- Stores the transformed and enriched data in MapR-DB JSON
Example Use Case Data
The example data set is Uber trip data, which you can read more about in this post on cluster analysis of Uber event data to detect popular pickup locations using Spark machine learning. The incoming data is in CSV format, an example is shown below, with the header:
date/time,latitude,longitude,base, reverse timestamp
2014-08-06T05:29:00.000-07:00, 40.7276, -74.0033, B02682, 9223370505593280605
We enrich this data with the cluster ID and location then transform it into the following JSON object:
Loading the K-Means Model
The Spark KMeansModel class is used to load a k-means model, which was fitted on the historical Uber trip data and then saved to the MapR-XD cluster. Next, a Dataset of Cluster Center IDs and location is created to join later with the Uber trip locations.
Below the cluster centers are displayed on a google map in a Zeppelin Notebook:
Reading Data from Kafka Topics
In order to read from Kafka, we must first specify the stream format, topic, and offset options. For more information on the configuration parameters, see the MapR Streams documentation.
This returns a DataFrame with the following schema:
The next step is to parse and transform the binary values column into a Dataset of Uber objects.
Parsing the Message Values into a Dataset of Uber Objects
A Scala Uber case class defines the schema corresponding to the CSV records. The parseUber function parses a comma separated value string into an Uber object.
In the code below, we register a user-defined function (UDF) to deserialize the message value strings using the parseUber function. Then we use the UDF in a select expression with a String Cast of the df1 column value, which returns a DataFrame of Uber objects.
Enriching the Dataset of Uber Objects with Cluster Center IDs and Location
A VectorAssembler is used to transform and return a new DataFrame with the latitude and longitude feature columns in a vector column.
The k-means model is used to get the clusters from the features with the model transform method, which returns a DataFrame with the cluster ID (labeled predictions). This resulting Dataset is joined with the cluster center Dataset created earlier (ccdf) to create a Dataset of UberC objects, which contain the trip information combined with the cluster Center ID and location.
The final Dataset transformation is to add a unique IDto our objects for storing in MapR-DB JSON. The createUberwId function creates a unique IDconsisting of the cluster ID and the reverse timestamp. Since MapR-DB partitions and sorts rows by the id, the rows will be sorted by cluster ID with the most recent first. This function is used with a map to create a Dataset of UberwId objects.
Writing to a Memory Sink
We have now set up the enrichments and transformations on the streaming data. Next, for debugging purposes, we can start receiving data and storing the data in memory as an in-memory table, which can then be queried.
Here is example output from
%sqlselect * from uber limit 10:
Now we can query the streaming data to ask questions like which hours and clusters have the highest number of pickups? (Output is shown in a Zeppelin notebook.)
SELECT hour(uber.dt) as hr,cid, count(cid) as ct FROM uber group By hour(uber.dt), cid
Spark Streaming Writing to MapR-DB
The MapR-DB Connector for Apache Spark enables you to use MapR-DB as a sink for Spark Structured streaming or Spark Streaming.
One of the challenges, when you are processing lots of streaming data, is where do you want to store it? For this application, MapR-DB JSON, a high-performance NoSQL database, was chosen for its scalability and flexible ease of use with JSON.
JSON Schema Flexibility
MapR-DB supports JSON documents as a native data store. MapR-DB makes it easy to store, query, and build applications with JSON documents. The Spark connector makes it easy to build real-time or batch pipelines between your JSON data and MapR-DB and leverage Spark within the pipeline.
With MapR-DB, a table is automatically partitioned into tablets across a cluster by key range, providing for scalable and fast reads and writes by row key. In this use case the row key, the _id, consists of the cluster ID and reverse timestamp, so the table is automatically partitioned and sorted by cluster ID with the most recent first.
The Spark MapR-DB Connector leverages the Spark DataSource API. The connector architecture has a connection object in every Spark Executor, allowing for distributed parallel writes, reads, or scans with MapR-DB tablets (partitions).
Writing to a MapR-DB Sink
To write a Spark Stream to MapR-DB, specify the format with the tablePath, idFieldPath, createTable, bulkMode, and sampleSize parameters. The following example writes out the cdf DataFrame to MapR-DB and starts the stream.
Querying MapR-DB JSON with Spark SQL
The Spark MapR-DB Connector enables users to perform complex SQL queries and updates on top of MapR-DB using a Spark Dataset while applying critical techniques such as projection and filter pushdown, custom partitioning, and data locality.
Loading Data from MapR-DB into a Spark Dataset
To load data from a MapR-DB JSON table into an Apache Spark Dataset, we invoke the loadFromMapRDB method on a SparkSession object, providing the tableName, schema, and case class. This returns a Dataset of UberwId objects:
Explore and Query the Uber Data with Spark SQL
Now we can query the data that is continuously streaming into MapR-DB to ask questions with the Spark DataFrames domain-specific language or with Spark SQL.
Show the first rows (note how the rows are partitioned and sorted by the _id, which is composed of the cluster id and reverse timestamp, the reverse timestamp sorts most recent first ).
How many pickups occurred in each cluster?
or with Spark SQL:
%sql SELECT COUNT(cid), cid FROM uber GROUP BY cid ORDER BY COUNT(cid) DESC
With Angular and Google Maps script in a Zeppelin notebook, we can display cluster center markers and the latest 5000 trip locations on a map, which shows that the most popular locations — 0, 3, and 9 — are in Manhattan.
Which hours have the highest number of pickups for cluster 0?
df.filter($"\_id" <= "1")
Which hours of the day and which cluster had the highest number of pickups?
%sql SELECT hour(uber.dt), cid, count(cid) FROM uber GROUP BY hour(uber.dt), cid
Display cluster counts for Uber trips by datetime.
%sql select cid, dt, count(cid) as count from uber group by dt, cid order by dt, cid limit 100
In this post, you learned how to use the following:
- A Spark Machine Learning model in a Spark Structured Streaming application
- Spark Structured Streaming with MapR-ES to ingest messages using the Kafka API
- Spark Structured Streaming to persist to MapR-DB for continuously rapidly available SQL analysis
All of the components of the use case architecture we just discussed can run on the same cluster with the MapR Data Platform.