Cascading: Anatomy of a Cascade

| | Comments (0) | TrackBacks (0)
Here is a quick introduction to the Cascading API showing how we assemble a connected flow and bind them into a cascade for execution on a Hadoop cluster. What's a flow and a cascade? Read on...

If you are more inclined to pass on reading through a bunch of Java code, check out our Cascading Overview.

Note this example is contrived to be more transparent than a real Cascading application would be in a production environment. This is so that we can introduce the core concepts and show how they work together without a lot of back and forth.

So lets say we have an Apache log file we want to push onto a Hadoop distributed filesystem (hdfs) and we want to calculate the number of requests per second and the number of request per minute our web server received.

The first step is to initialize all the paths and our 'connector' factory objects.

FlowConnector flowConnector = new FlowConnector();
CascadeConnector cascadeConnector = new CascadeConnector();

String inputPath = args[ 0 ];
String logsPath = args[ 1 ] + "/logs/";
String arrivalRatePath = args[ 1 ] + "/arrivalrate/";
String arrivalRateSecPath = arrivalRatePath + "sec";
String arrivalRateMinPath = arrivalRatePath + "min";

Next we want to build a pipe assembly that will read the text log file, parse the file into some useful fields, and save the results as a native Hadoop sequence file on our hdfs.

// declares: "time", "method", "event", "status", "size"
RegexParser APACHE_PARSER = new RegexParser( APACHE_GROUP_FIELDS, APACHE_REGEX, APACHE_GROUPS );
Pipe importPipe = new Each( "import", new Fields( "line" ), APACHE_PARSER );

This is a very simple assembly. It declares an Each pipe that will apply a Function to each element in the data stream.

In detail, we instantiated a new Each instance with the name "import". It looks for a field named "line" in the input stream and passes it to the RegexParser function. This function will in turn apply a regular expression to the value of "line". The Each pipe then will return a tuple with the fields "time", "method", "event", "status", and "size" to the stream.

Next we need to define the resource locations we will use by creating appropriate Tap instances. A Tap represents the physical location of a resource. Be it local, on a hdfs, Amazon S3, ftp, http, etc. Taps take Scheme objects that declare the format of the resource and their field names, if any.

// create tap to read a resource from the local file system
// by default TextLine declares the fields "offset" and "line"
Tap localLogTap = new Lfs( new TextLine(), inputPath );
// create a tap to read/write from a Hadoop distributed filesystem
Tap parsedLogTap = new Dfs( APACHE_GROUP_FIELDS, logsPath );

Next we connect the pipe assembly to our taps into a new Flow instance. A Flow is an executable instance of a pipe assembly.

// connect the assembly to source and sink taps
Flow importLogFlow = flowConnector.connect( localLogTap, parsedLogTap, importPipe );

At this point we could just call importLogFlow.start() to execute our flow. But we want to tie this to the next Flow we have yet to create.

Here we start our next assembly.

// create an assembly to parse out the time field into a timestamp
// then count the number of requests per second and per minute

// apply a text parser to create a timestamp with 'second' granularity
// declares field "ts"
DateParser APACHE_DATE_PARSER = new DateParser( APACHE_DATE_FORMAT );
Pipe tsPipe = new Each( "arrival rate", new Fields( "time" ), APACHE_DATE_PARSER );

This give us a new Each pipe that applies a DateParser function to the "time" field found in each tuple. The DateParser uses the Apache time format pattern and returns a new long value representing the time value in milliseconds. This value is returned in the field "ts", for timestamp.

Note that any other fields entering the tsPipe are discarded and replaced by the "ts" field. More on this below.

Next we need to group the stream on the "ts" field and count the number of values in each grouping.

// name the per second assembly and split on tsPipe
Pipe tsCountPipe = new Pipe( "tsCount", tsPipe );
tsCountPipe = new GroupBy( tsCountPipe, new Fields( "ts" ) );
tsCountPipe = new Every( tsCountPipe, Fields.KEYS, new Count() );

This is the most interesting assembly we have created so far. I'll address each line individually.

The first line does two things. First it extends the tsPipe assembly, and second, it labels this location in the new assembly. This becomes important when you need to bind Tap sinks to a particular branch in an assembly.

The second line instantiates a GroupBy pipe that will group on the field "ts". Also note it takes tsCountPipe on the constructor.

The last line instantiates a new Every pipe instance that applies an Aggregator function to every group created by the previous GroupBy pipe. The Fields.KEYS value tells the Every instance to pass the grouping key fields as arguments to the Count aggregator. In this case, the field "ts" is passed as the argument. Instead we could have just created a new Fields instance declaring the field "ts". The Code aggregator will count the number of values in the current group and return the number as the field "count", by default.

Now we add a new branch to our assembly.

// apply expression to create a timestamp with 'minute' granularity
// declares field "tm"
Pipe tmPipe = new Each( tsPipe, new ExpressionFunction( new Fields( "tm" ), "ts - (ts % (60 * 1000))", long.class ) );

// name the per minute assembly and split on tmPipe
Pipe tmCountPipe = new Pipe( "tmCount", tmPipe );
tmCountPipe = new GroupBy( tmCountPipe, new Fields( "tm" ) );
tmCountPipe = new Every( tmCountPipe, Fields.KEYS, new Count() );

The key things to notice are that the tmPipe extends the tsPipe assembly. This is where we branch from the tsCountPipe pipe sub-assembly.

Also see that we are using a ExpressionFunction to pass simple Java code to be parsed and compiled at runtime. This is quite handy if you have lots of functions that are to simple to bother writing custom function classes for.

Lastly we create the necessary Tap instances and connect it all into a new Flow.

// create taps to write the results to a Hadoop distributed file system, using the given fields
Tap tsSinkTap = new Dfs( new TextLine( new Fields( "ts", "count" ) ), arrivalRateSecPath );
Tap tmSinkTap = new Dfs( new TextLine( new Fields( "tm", "count" ) ), arrivalRateMinPath );

// a convenience method for binding taps and pipes, order is significant
Map<String, Tap> sinks = Cascades.tapsMap( Pipe.pipes( tsCountPipe, tmCountPipe ), Tap.taps( tsSinkTap, tmSinkTap ) );

// connect the assembly to the source and sink taps
Flow arrivalRateFlow = flowConnector.connect( parsedLogTap, sinks, tsCountPipe, tmCountPipe );

Generally I'm against magic, but the convenience here is worth it. Note the last line where we connect the taps and assemblies.

The first two arguments are source and sink maps, respectively. The final arguments are the tail ends of our pipe assembly. Since we started with a single Pipe instance named "arrival rate" and branched into tsCountPipe and tmCountPipe, we can easily find the head end of the assembly from the tails. The head will be bound to the given source tap parsedLogTap.

If we had two heads and two tails, we would have had to manually make a tap map giving the Tap tap instances names that correspond to the Pipe instances they should be bound to.

Finally we connect our Flow instances into a Cascade.

// connect the flows by their dependencies, order is not significant
Cascade cascade = cascadeConnector.connect( importLogFlow, arrivalRateFlow );

A Cascade will stuff all the Flow instances into a directed graph where the edges are the taps that connect them. When executed the flows are in turn executed in order, assuming the taps they sink data into are either stale in regard to the sources, or don't exist. If/When the sinks of one flow are up-to-date, the next flow is addressed, until the whole Cascade graph has been walked.

All that's left is to start it, and block until it is complete.

// execute the cascade, which in turn executes each flow in dependency order
cascade.complete();

One really important thing to remember. This code, thanks to Hadoop, can execute on your local desktop, or across a thousand nodes in a remote data center. The only difference is where you choose to launch it.

To help visualize what we just built...

ArrivalRate.png

Be advised this post will be edited periodically to improve clarity or address changes in our API.

0 TrackBacks

Listed below are links to blogs that reference this entry: Cascading: Anatomy of a Cascade.

TrackBack URL for this entry: http://www.manamplified.org/cgi-bin/mt-tb.cgi/375

Leave a comment