Posts Tagged ‘map reduce’

Map Reduce at Google App Engine

March 29, 2012

I recently had to prepare a Cloud Computing 101 lecture including exercises for a teacher exchange week at the Oulu University of Applied Sciences.

As I the curriculum was targeted on internet computing, I wanted to present Google App Engine as a PaaS alternative and MapReduce as an important background technology of the cloud. Luckily, these two are working together – available at an experimental stage here. I quickly found out that the Python support is much more advanced than the Java one (no reducer), so I decided to have a go with it.

The typical WordCount example is available, but Map/Reduce function are both not programmed in a way that you are able to include a Combiner (also not documented at the moment) in the MapReduce pipeline. Below is the changed Map/Reduce code.

def word_count_map(data):
  """Word count map function."""
  (entry, text_fn) = data
  text = text_fn()
  logging.debug("Got %s", entry.filename)
  for s in split_into_sentences(text):
   for w in split_into_words(s.lower()):
    #original - not working with combiner
    #yield (w, "")
    yield (w, "1")

def word_count_reduce(key, values):
  """Word count reduce function."""
  #original not working with combiner
  #yield "%s: %d\n" % (key, len(values))
  value_ints = [int(x) for x in values]
  yield "%s: %d\n" % (key,sum(value_ints))

The combiner has to be included in the pipeline via

def run(self, filekey, blobkey):
  logging.debug("filename is %s" % filekey)
  output = yield mapreduce_pipeline.MapreducePipeline(
           "word_count",
           "main.word_count_map",
           "main.word_count_reduce",
           combiner_spec="main.TestCombiner",
           input_reader_spec="mapreduce.input_readers.BlobstoreZipInputReader",
           output_writer_spec="mapreduce.output_writers.BlobstoreOutputWriter",
           mapper_params={
                          "blob_key": blobkey,
           },
           reducer_params={
                          "mime_type": "text/plain",
           },
           shards=16)
  yield StoreOutput("WordCount", filekey, output)

and the Combiner has to be defined as follows:

class TestCombiner(object):
  """Test combine handler."""
  invocations = []

  def __call__(self, key, values, combiner_values):
    self.invocations.append((key, values, combiner_values))

    value_ints = [int(x) for x in values]
    combiner_values_int = [int(x) for x in combiner_values]
    yield sum(value_ints + combiner_values_int)
    yield operation.counters.Increment("combiner-call")

  @classmethod
  def reset(cls):
    cls.invocations = []

Given that you can play around with GAE/MR with different pipelines, one including a combiner and the other not.

Advertisements