Limiting Hadoop Part Files using 'shard' and a Common Gotcha

This is accomplished trivially using the shard(n: Int) method on RichPipe which randomly uses n reducers to redistribute the tuples before writing the files to disk using the write method.

A major potential gotcha to watch out for is when reusing a pipe which uses the shard and write combination. If that pipe has more operations performed on it, subsequently, in a different part of the job code, the cascading job planner somehow completely ignores the shard command. This had to bite me many times before I finally realized what was happening. So, in summary, when using the shard in a pipe make sure that the resulting pipe is not used elsewhere in a scalding job.

Here's an example of the gotcha where the number of part files generated under mydir will not be 100 as one might expect. (In fact, the number will be 2000.):



val writeAndReuse = MySource()
  .read
  .groupBy('my-field) { _.reducers(2000).size }
  .shard(100)
  .write(Tsv("my-dir")

writeAndReuse
  .map('myfied -> 'newfield) { i: Int => i + 1}
  .write(Tsv("my-dir2")


Here's how this code should be written:



val read =  MySource().read
  .groupBy('my-field) { _.reducers(2000).size }

read
  .shard(100)
  .write(Tsv("my-dir")

read
  .map('myfied -> 'newfield) { i: Int => i + 1}
  .write(Tsv("my-dir2")


We can see here that the 'shard' code block is not used reused like in the first example.


Comments