The idea is to access the cascading FlowProcess through a cascading operator. We use an anonymous function for the operator which does nothing except incrementing a specified hadoop counter.
The counter will appear on the job details page of the job in the job tracker, and will count all the tuples that make it past the filter.
Adapted from here.
Update (2013/08/29): had to add a dummy field which I finally discard. Otherwise, no tuples were being emitted when the method was called, even though they were being counted.
def addCounter(pipe : Pipe, group : String, counter : String) = { pipe.each(() -> ('addCounter)) { fields => new BaseOperation[Any](fields) with Function[Any] { def operate(flowProcess : FlowProcess[_], functionCall : FunctionCall[Any]) { try { flowProcess.asInstanceOf[HadoopFlowProcess] .increment(group, counter, 1L) functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*)) } catch { case cce: ClassCastException => // HadoopFlowProcess is not available in local mode } } } }.discard('addCounter) }In the scalding job, the tuples passing through any part of the Flow can be counted using code like this:
.then { p => addCounter(p, "my_group", "my_counter") }
The counter will appear on the job details page of the job in the job tracker, and will count all the tuples that make it past the filter.
Adapted from here.
Update (2013/08/29): had to add a dummy field which I finally discard. Otherwise, no tuples were being emitted when the method was called, even though they were being counted.
Can you share a more elaborate example (for example word count) with the counters?
ReplyDeleteAlso, how can you use logic with a specific scalding step that decides whether or not to omit a value? Also, how do you decide how to increment by a certain value?
Not sure if this answers your questions but here are a few pointer's. An example with a very trivial job:
ReplyDelete1. Tsv("input0").read.mapTo((0, 1) -> ('x1, 'x2)) { input : (String, String) => input }
2. .then { p => addCounter(p, "my_group", "my_counter") }
3. .write(Tsv("output"))
The jobtracker will show a count of 1 for this counter under the map column. Step 2 just counts all the tuples passing through that point of the code. In this case that is just one tuple.
Hadoop counters can only be incremented by 1 at a time as far as I know.
To use counters within other scalding methods check out this pull request: https://github.com/twitter/scalding/pull/733
Hey @sundar, I'm glad you were able to get something out of this blog.
ReplyDeleteCheers!