Camus for Scalding on cdh4 without Avro

Camus is an open source library developed by LinkedIn for downloading data from Kafka to HDFS. The main Hadoop job downloads all whitelist-ed topics to HDFS and stores the offset of the latest downloaded message on HDFS itself. The offsets are stored per topic and partition.

So this is a really helpful library, but the source code is not really confidence inspiring. The main issue is related to code formatting. Both the indentation and the way long lines are split up is nauseatingly inconsistent, which makes you fearful about using this code in production. The fact that the compiled jars are not available from any repository also seems to suggest that this code is meant more as an example rather than finished code. So that is what we ended up doing -- rewriting parts of it for our purposes. Specifically, the reasons were:
  1. Make it work with Hadoop 2 as we use cdh4.
  2. Make the deserialization of messages independent of Camus. The original code deserializes Avro encoded messages before storing on HDFS. We modified it so that the serialized messages are stored on HDFS, with further deserialization handled by a secondary job.
  3. Make it compatible with Cascading.
 The corresponding changes that needed to be made are:
  1. Convert all static methods, and corresponding calls, that use JobContext to use Configuration as JobContext in Hadoop 2 is an interface.
  2. Rewrite EtlInputFormat, EtlRecordReader, and EtlReader to make the type of the value (of the key-value pairs generated) a BytesWritable instead of CamusWrapper. The latter assumes that the value is an Avro serialized one.
  3. The input format and record reader are written in the mapreduce API. Cascading, however, requires them to be in the older mapred API. To make that conversion we used DeprecatedInputFormatWrapper from the excellent elephantbird library written by twitter.
We also had to add the Cascading pieces -- a tap and a scheme. And a source to make it usable from Scalding.

Currently, the input format creates request objects for each topic and partition combination which have host information about the leader broker during the process of creating splits. The problem with this approach is that if a leader changes after the splits have been assigned, the job will fail. An improvement would be to also put the followers in the request object, and if a mapper fails, to retry sending requests to the followers, giving the job a better chance of succeeding.

Comments