Cascading and Pig Planners

| | Comments (1) | TrackBacks (0)

I just completed a Proof of Concept with a client where we wanted to see if Cascading could match Pig performance. For an equivalent load, it looks like Pig rendered 580 MapReduce jobs and Cascading planned 75. Running in local mode on a small dataset, Pig completed in 31 minutes, Cascading in 7 minutes.

A couple quick points.

This was not a scientifically rigorous comparison. We just tested one really complex workload that was developed in (dozens of lines of) PigLatin, and translated it to Cascading. So Pig may not always be slower, it could very well be faster for many workloads. We just don't know. That said, Pig workloads of this complexity could very well be out of scope for Pig.

And as it is with most healthy software projects, Pig will just get better with time.

Back to the PoC.

We haven't run this on a full dataset on a real cluster yet, but expect to see equivalent performance differences. There were some logistical issue preventing this from happening within the next few weeks (Hadoop versions etc).

This PoC brought two things to light for me about Pig.

First, as we know, Pig was designed to offer both Physical and Algebraic or Logical optimizations. Unfortunately it seems some of the Logical optimizations are not kicking in for some syntactical constructs.

Cascading does not offer a planner that looks for Logical optimizations (since there is no Syntactical model to work from). But as the lead developer of the PoC was translating code from PigLatin to the Cascading API, Cascading was leading him to develop optimal solutions. But in the PoC Cascading code, we kept in the more verbose form in an attempt to keep the comparison Apples to Apples (performing unnecessary CoGroup and GroupBy operations, for example).

Second, as was explained to me, Pig does not render all the work upfront. But after each job completion it re-calculates the remaining work and submits the next job. I could be wrong about this.

Cascading renders all the work upfront. And applies its planner rules to the whole set of jobs to make sure they are reasonable. This is how we find and leverage any Physical optimizations by doing DAG manipulations of the whole workload.

Unfortunately I can't offer up more specific details and code examples. So take this all with a grain of salt, or just dismiss it entirely.

Anyways, I've always seen Cascading as complimentary to Pig, and have every expectation Hadoop applications in the wild will be a mix of the two. Using each for the strengths they offer.

[Update: 12/11/08]

I forgot to mention that the version of Pig used was svn trunk as of this week (Dec 8th or so) plus a number of patches to improve memory issues. And a couple "user defined functions".

Its worthy to note none of the Pig "user defined functions" needed to be ported.

On the other hand, the PoC brought a couple bugs to light in Cascading 0.9.0 that needed to be squashed. One bug with stream merging, and another with the Janino expression operations. Expect Cascading 0.10.0 to be out soon.

[Update: 12/12/08]

Below is a diagram of the Cascading generated MR jobs and their relationships.

View image

The yellow oval is interesting. This is where Cascading chose to partition a handful of operations into their own Mapper then fed the result to two successor MR jobs. Typically the Cascading planner assumes the cluster is "network bound" and would tend to allow those operations to be called duplicate times inside of the successor MR jobs, instead of breaking them out.

In one way this could be seen as a bug. But it shows there is room still for improvement in Cascading. Especially where we can tell the planner to assume a network or cpu bound cluster, and plan accordingly.

There are even further improvements on top of that possible, but I'll save that for a future discussion.

0 TrackBacks

Listed below are links to blogs that reference this entry: Cascading and Pig Planners.

TrackBack URL for this entry: http://www.manamplified.org/cgi-bin/mt-tb.cgi/409

1 Comments

A few comments from a pig committer:

I think this is great. Thanks for doing this and posting the results. Even if you can't post the actual pig script you ran, could you post something similar to show the operations you were doing?

All pig work in the last 6 months has been going into the types branch rather than trunk. We're planning on merging that into trunk in the next few weeks. In our tests the types branch performs 2-8 times faster than the trunk code, depending on the query. It also does a much better job of using the combiner. So running these tests using the code from the types branch would be more representative of where pig is now.

As far as local vs map reduce mode, it would be very interesting to see these tests run again against a map reduce cluster. Local mode isn't where we focus on optimizing pig.

One weakness that pig currently has which I think you've identified is that it takes every store as stopping point and executes its script to that point. For example, given a script like:

A = load ...
B = filter ...
store B
C = foreach B ...
store C

pig will load and filter A twice (once for store B and once for store C) rather than run that part once and then tee the output of B into the store and the foreach. I'm guessing that accounts for pig's ridiculously high number of jobs vs. cascading's. Pig is also not very efficient when there is an explicit split in the script. If your script includes splits you may have encountered that too. We plan to start working on a fix for these issues in the next few months.

To answer a few of your questions and comments, pig does not dynamically plan a script. It plans an entire script up front. And while we do have an optimizer for our planning, it's very rudimentary, basically just deciding when to use the combiner. There is work going on on a much better optimizer, but it is not yet ready for integration into the main system.

Alan Gates.

Leave a comment