Storm is a distributed realtime computation system, in this page we’ll review a functional example explaining the basic concepts.
Citing from the Rationale page on the wiki:
The past decade has seen a revolution in data processing. MapReduce, Hadoop, and related technologies have made it possible to store and process data at scales previously unthinkable. Unfortunately, these data processing technologies are not realtime systems, nor are they meant to be. There’s no hack that will turn Hadoop into a realtime system; realtime data processing has a fundamentally different set of requirements than batch processing.
However, realtime data processing at massive scale is becoming more and more of a requirement for businesses. The lack of a “Hadoop of realtime” has become the biggest hole in the data processing ecosystem.
Storm fills that hole.
For a full introduction see the following presentation: http://www.infoq.com/presentations/Storm
Our example connects to Twitter’s Streaming API, performs a query, retrieves the results and for those it arbitrarily filters a percentage then randomly assigns a score, saving everything to the filesystem.
The code of the example is attached and can be downloaded here: prueba1.tar.gz it uses Maven to handle dependencies, you’ll also need a valid Twitter user to run it; to configure it rename the file curl_settings.cfg.example to curl_settings.cfg and fill in your credentials and optionally a proxy for the connection (you’ll also need the commandline curl tool).
The example is configured to run a local cluster which is self-contained.
Storm has two basic units of processing: the Spouts and the Bolts.
The Spouts are the elements that generate the data to be processed, they may get that data from external sources or generate it themselves but their mission is to introduce it to the cluster.
Bolts are processing units: they receive data from the Spouts and perform work on it, optionally generating more data to be processed by other Bolts.
The data that flows internally on the cluster is organized in Streams, those are homogeneous lists of named tuples which are known in advance; when a Spout is created it declares the kind of Tuple it will emit, likewise when a Bolt is meant to generate further data to be processed it declares the kind of Tuple it will emit for the next Bolt on the chain.
This architecture of connected Bolts and Spouts is known as the “Topology” in our example we’ll create a Topology such as the one shown in the following diagram:
Dotted lines represent data moving inside the cluster while solid lines are data coming or going to the outside.
A more complete explanation of the concepts can be found on the wiki: https://github.com/nathanmarz/storm/wiki/Concepts
The main class of the program is Prueba1, it has some static attributes which are used as configuration options:
- debug – boolean indicating we’re in debug mode, which in turn makes Storm output more information when running
- num_processes – integer indicating the number of processes Storm should launch on the cluster to run the jobs, the concurrency options are not the focus of this first article so they’re not explained in detail
- duration_secs – in normal usage Storm is a persistent daemon which accepts jobs and runs them indefinitely until they’re removed via the same interface, in this example we forcefully abort the process after this amount of time
- topology_name – this is the identifier used to refer this Topology to the cluster and also used to stop it
- archive_target_dir – this is the base directory where the results will be stored, inside it the example will create another directory with the current date/time for the results of this run
- filter_mod_n – the ModNFilterBolt class uses a very simple mechanism to filter tweets that is to drop those with an id multiple of the filter_mod_n value so if it is 3 it’ll exclude approximately a third of the received tweets
The Prueba1 class has the setup_archive static method which checks and prepares the base directory for the archiver Bolts, the main method checks for commandline arguments which should be the search keywords for Twitter then stores them on disk.
Next we create instances of the Spout and Bolts and create ids for them. Storm requires you to arbitrarily set the ids of the nodes, those are used to connect them to form the Topology.
The next section creates a Config instance to configure the cluster and a Topology instance to define the flow, there’s several ways to determine how the output is sent to the listeners here we use shuffleGrouping which evenly distributes it among the available workers (an example of a different grouping can be found on the Tutorial).
Notice we connect both the TweetArchiverBolt and the ModNFilterBolt to the output of CurlStreamingAPISpout as in our diagram, this doesn’t mean the output will split among those instead it will be received by both (all of it).
Finally we create the local cluster instance and run our Topology for the specified time.
This class implements the IRichSpout interface that declares it as an “emitter” for the Topology, which means it’ll be a source of data to process in the cluster by the Bolts.
The class must declare the Fields from the Tuples of the Stream it’ll emit, that occurs on the declareOutputFields method and in our case we output one-item Tuples which will be a JSON structure representing a single tweet, we’ll transport it as a String.
On the constructor we build the command to be executed, for simplicity we rely on the CLI tool curl and use direct user:pass authentication (Basic Auth) in a more sophisticated setup we would connect to Twitter directly using OAuth.
The open method is called when a new worker is about to start emitting Tuples, there we create the external process that performs the query. We also save the SpoutOutputCollector instance that is used to emit the Tuples.
Once the worker is ready Storm starts calling its nextTuple method to retrieve items to process, every time this method is called we should generate a new Tuple for the Stream. In our case we read a line from the curl process which should give us a new JSON from Twitter for the specified query.
The emit method of SpoutOutputCollector has several variants, here we’re assigning the id of the tweet to the id of the message sent, it isn’t used in our example but that id is sent back to the ack and fail methods that are used to handle errors and control the flow if necessary.
The isDistributed method returns a boolean indicating if this class can be run concurrently, Storm doesn’t care whether the Tuples are generated or retrieved from external sources but in our case we’re connecting to an external service so we want a single instance of this worker to avoid duplicates (and bans).
The close method is only guaranteed to be called when running a local cluster, that because in a real cluster nodes may go down anytime without notice due to failures or by having their processes killed; here we use it to terminate the shell command.
The ModNFilterBolt is a very simple implementation of a filter bolt, it extends BaseBolt and declares its output to be the same as CurlStreamingAPISpout’s which means it’ll generate more data to be processed by other Bolts.
The execute method is called on each Tuple this Bolt receives, it may skip it or emit other Tuples (one or more but all matching its output declaration). Here we exclude Tweets whose numeric id is a multiple of the argument passed to the constructor and emit the others.
After all resulting Tuples are emitted (if any) the Bolt must acknowledge to the cluster that the passed Tuple was fully processed, this because if the worker dies before finishing its work a message will be sent back to the originating Spout so it may resend the Tuple or handle the error (in our example we ignore the error).
The RandomScorerBolt is even simpler than the ModNFilterBolt, it also extends BaseBolt but declares a different output, this bolt emits two-item Tuples that are composed by the original tweet and the assigned score.
The prepare method is similar to the open method for Spouts, it’s called when a worker is about to be put to work; here we use it to initialize the random number generator.
On execute we emit the original tweet along with a random number from 1 to 5 and call ack afterwards to let Storm know we’re done.
The TweetArchiverBolt class deals with persistence of the raw tweet data coming from Twitter, it does not declare any output since it acts as an observer for the output of CurlStreamingAPISpout.
It subclasses BaseArchiverBolt and saves the tweets as JSON files on disk, using the tweet id as the filename.
Even without having emitted anything the input must be acknowledged as “fully processed” to the cluster.
The ScoreArchiverBolt class is very similar to the TweetArchiverBolt class, the only difference is that it saves just the score as a plaintext file using the id as filename which can be used to associate it to the original message.by