Using indexed Lzo files in Scalding

One of the most important preliminary steps was to get the gplcompression jar (hadoop-lzo-cdh4-0.4.15-gplextras.jar) that is compatible with the version of CDH (4.1.0) we are using. It does not get auto-installed when you upgrade. (We upgraded from a pre-4 version directly.) Make sure that the native libs and jars are installed on the cluster.

The process of indexing lzo files which is what makes them splittable is explained here. The lzo input formats are all defined in elephantbird. The code we are using to tap lzo files is borrowed from here and looks like this:

import com.twitter.elephantbird.cascading2.scheme.LzoTextDelimited

object HadoopSchemeInstance {
  def apply(scheme : Scheme[_,_,_,_,_]) =
    scheme.asInstanceOf[Scheme[JobConf,RecordReader[_,_],OutputCollector[_,_],_,_]]
}

trait LzoTsvScheme extends DelimitedScheme {
  override def localScheme = {
    println("This does not work yet");
    new CLTextDelimited(fields, separator, types)
  }
  override def hdfsScheme = HadoopSchemeInstance(new LzoTextDelimited(fields, separator, types))
}

case class LzoTsv(p : String,
  override val fields : Fields = new Fields("line"),
  override val skipHeader : Boolean = false,
  override val writeHeader: Boolean = false,
  val sinkMode: SinkMode = SinkMode.REPLACE) extends FixedPathSource(p) with LzoTsvScheme

Comments