What is Storm?

Storm is a free and open source distributed real-time computation system. Storm makes it easy to reliably process unbounded streams of data, doing for real-time processing what Hadoop did for batch processing. It was created in 2011 by Backtype, which was acquired by Twitter that same year.

Who’s using it?

  • Groupon: At Groupon we use Storm to build real-time data integration systems. Storm helps us analyze, clean, normalize, and resolve large amounts of non-unique data points with low latency and high throughput.
  • Twitter: Storm powers Twitter’s publisher analytics product, processing every tweet and click that happens on Twitter to provide analytics for Twitter’s publisher partners. Storm integrates with the rest of Twitter’s infrastructure, including Cassandra, the Kestrel infrastructure, and Mesos. Many other projects are underway using Storm, including projects in the areas of revenue optimization, anti-spam, and content discovery.
  • Alipay: Alipay is China’s leading third-party online payment platform. We are using Storm in many scenarios
    • Calculate realtime trade quantity, trade amount, the TOP N seller trading information, user register count. More than 100 million messages per day.
    • Log processing, more than 6T data per day.
  • Ooyala: Ooyala powers personalized multi-screen video experiences for some of the world’s largest networks, brands and media companies. At the core of our technology is an analytics engine that processes over two billion analytics events each day, derived from nearly 200 million viewers worldwide who watch video on an Ooyala-powered player. Ooyala will be deploying Storm in production to give our customers real-time streaming analytics on consumer viewing behavior and digital content trends. Storm enables us to rapidly mine one of the world’s largest online video data sets to deliver up-to-the-minute business intelligence ranging from real-time viewing patterns to personalized content recommendations to dynamic programming guides and dozens of other insights for maximizing revenue with online video.
  • Rubicon Project: Storm is being used in production mode at the Rubicon Project to analyze the results of auctions of ad impressions on its RTB exchange as they occur. It is currently processing around 650 million auction results in three data centers daily (with 3 separate Storm clusters). One simple application is identifying new creatives (ads) in real time for ad quality purposes. A more sophisticated application is an “Inventory Valuation Service” that uses DRPC to return appraisals of new impressions before the auction takes place.

Find more examples at: https://github.com/nathanmarz/storm/wiki/Powered-By

 

How does it work?

A Storm cluster is superficially similar to a Hadoop cluster. Whereas on Hadoop you run “MapReduce jobs”, on Storm you run “topologies”. “Jobs” and “topologies” themselves are very different — one key difference is that a MapReduce job eventually finishes, whereas a topology processes messages forever (or until you kill it).

There are two kinds of nodes on a Storm cluster: the master node and the worker nodes. The master node runs a daemon called “Nimbus” that is similar to Hadoop’s “JobTracker”. Nimbus is responsible for distributing code around the cluster, assigning tasks to machines, and monitoring for failures.

Each worker node runs a daemon called the “Supervisor”. The supervisor listens for work assigned to its machine and starts and stops worker processes as necessary based on what Nimbus has assigned to it. Each worker process executes a subset of a topology; a running topology consists of many worker processes spread across many machines.

All coordination between Nimbus and the Supervisors is done through a Zookeeper cluster. Additionally, the Nimbus daemon and Supervisor daemons are fail-fast and stateless; all state is kept in Zookeeper or on local disk. This means you can kill -9 Nimbus or the Supervisors and they’ll start back up like nothing happened. This design leads to Storm clusters being incredibly stable. We’ve had topologies running for months without requiring any maintenance.

The core abstraction in Storm is the “stream”. A stream is an unbounded sequence of tuples. Storm provides the primitives for transforming a stream into a new stream in a distributed and reliable way. For example, you may transform a stream of tweets into a stream of trending topics.

The basic primitives Storm provides for doing stream transformations are “spouts” and “bolts”. Spouts and bolts have interfaces that you implement to run your application-specific logic.

A spout is a source of streams. For example, a spout may read tuples off of a Kestrel queue and emit them as a stream. Or a spout may connect to the Twitter API and emit a stream of tweets.

A bolt does single-step stream transformations. It creates new streams based on its input streams. Complex stream transformations, like computing a stream of trending topics from a stream of tweets, require multiple steps and thus multiple bolts.

Multi-step stream transformations are packaged into a “topology” which is the top-level abstraction that you submit to Storm clusters for execution. A topology is a graph of stream transformations where each node is a spout or bolt. Edges in the graph indicate which bolts are subscribing to which streams. When a spout or bolt emits a tuple to a stream, it sends the tuple to every bolt that subscribed to that stream.

Everything in Storm runs in parallel in a distributed way. Spouts and bolts execute as many threads across the cluster, and they pass messages to each other in a distributed way. Messages never pass through any sort of central router, and there are no intermediate queues. A tuple is passed directly from the thread who created it to the threads that need to consume it.

When a task for Bolt A emits a tuple to Bolt B, which task should it send the tuple to?
A “stream grouping” answers this question by telling Storm how to send tuples between sets of tasks. There’s a few different kinds of stream groupings.
The simplest kind of grouping is called a “shuffle grouping” which sends the tuple to a random task. It has the effect of evenly distributing the work of processing the tuples across all of Bolt B’s tasks.

A more interesting kind of grouping is the “fields grouping”. A fields grouping lets you group a stream by a subset of its fields. This causes equal values for that subset of fields to go to the same task.
Fields groupings are the basis of implementing streaming joins and streaming aggregations as well as a plethora of other use cases. Underneath the hood, fields groupings are implemented using consistent hashing.

 

Under the Hood

  1. Guaranteed message processing: Storm guarantees that each tuple coming off a spout will be fully processed by the topology. To do this, Storm tracks the tree of messages that a tuple triggers. If a tuple fails to be fully processed, Storm will replay the tuple from the Spout. Storm incorporates some clever tricks to track the tree of messages in an efficient way.
  2. Robust process management: One of Storm’s main tasks is managing processes around the cluster. When a new worker is assigned to a supervisor, that worker should be started as quickly as possible. When that worker is no longer assigned to that supervisor, it should be killed and cleaned up.
  3. Fault detection and automatic reassignment: Tasks in a running topology heartbeat to Nimbus to indicate that they are running smoothly. Nimbus monitors heartbeats and will reassign tasks that have timed out. Additionally, all the tasks throughout the cluster that were sending messages to the failed tasks quickly reconnect to the new location of the tasks.
  4. Efficient message passing: No intermediate queuing is used for message passing between tasks. Instead, messages are passed directly between tasks using ZeroMQ. This is simpler and way more efficient than using intermediate queuing. ZeroMQ is a clever “super-socket” library that employs a number of tricks for maximizing the throughput of messages. For example, it will detect if the network is busy and automatically batch messages to the destination.
  5. Local mode and distributed mode: Storm has a “local mode” where it simulates a Storm cluster completely in-process. This lets you iterate on your topologies quickly and write unit tests for your topologies. You can run the same code in local mode as you run on the cluster.

Dependencies

Storm requires the following set of dependencies to be installed in the Nimbus and worker servers.

 

  • ZeroMQ 2.1.7 – Note that you should not install version 2.1.10, as that version has some serious bugs that can cause strange issues for a Storm cluster. In some rare cases, users have reported an “IllegalArgumentException” bubbling up from the ZeroMQ code when using 2.1.7 – in these cases downgrading to 2.1.4 fixed the problem.
  • JZMQ
  • Java 6
  • Python 2.6.6
  • unzip


Experience at Globant

Storm was introduce to client Metamarkets and used to provide real-time analytics of data coming from multiple queues. It is also being used internally at Globant as part of an effort to provide real-time processing of streams coming from the Twitter API in order to analyze and predict stock market trends.

Similar Technologies

  • Esper: Uses a tailored Event Processing Language (EPL) to register queries against its engine.
  • Darkstar: Being used by NYSE Euronext, Mass Exchange, and Oyster Consulting. A technical overview is available via request.
  • Streambase: Proprietary Software. Available free as a Developer Edition with the alternative to purchase the Enterprise Edition.
  • S4: Developed by Yahoo! and currently an Apache Incubator project. A new version was released in August 2012 including a complete refactoring of the previous version.

For more information and examples please visit the Twitter Engineering Blog.

facebooktwittergoogle_plusredditlinkedinby feather

Leave a Reply

Your email address will not be published. Required fields are marked *

You may use these HTML tags and attributes:

<a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>