使用 Python 编写 Hadoop MapReduce 程序
#!/usr/bin/env pythonimport sys#input comes from STDIN (standard input)for line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # split the line into words words = line.split() # increase counters for word in words: # write the results to STDOUT (standard output); # what we output here will be the input for the # Reduce step, i.e. the input for reducer.py # tab-delimited; the trivial word count is 1 print '%s\t%s' % (word, 1)
?
#!/usr/bin/env pythonfrom operator import itemgetterimport syscurrent_word = Nonecurrent_count = 0word = None# input comes from STDINfor line in sys.stdin: # remove leading and trailing whitespace line = line.strip() # parse the input we got from mapper.py word, count = line.split('\t', 1) # convert count (currently a string) to int try: count = int(count) except ValueError: # count was not a number, so silently # ignore/discard this line continue # this IF-switch only works because Hadoop sorts map output # by key (here: word) before it is passed to the reducer if current_word == word: current_count += count else: if current_word: # write result to STDOUT print '%s\t%s' % (current_word, current_count) current_count = count current_word = word# do not forget to output the last word if needed!if current_word == word: print '%s\t%s' % (current_word, current_count)
?
?
echo "foo foo quux labs foo bar quux" | python ./mapper.py foo 1 foo 1 quux 1 labs 1 foo 1 bar 1 quux 1
?
?进一步可以看到
?
echo "foo foo quux labs foo bar quux" | python ./mapper.py | sort -k1,1 | ./reducer.py bar 1 foo 3 labs 1 quux 2
?
hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-*streaming*.jar –mapper mapperfile –file mapper_file_path –reducer reducefile –file reducer_file_path –input input_path –output output_path
?
Usage: $HADOOP_HOME/bin/hadoop jar $HADOOP_HOME/contrib/streaming/hadoop-streaming.jar [options] Options: -input <path> DFS input file(s) for the Map step -output <path> DFS output directory for the Reduce step -mapper <cmd|JavaClassName> The streaming command to run -combiner <JavaClassName> Combiner has to be a Java class -reducer <cmd|JavaClassName> The streaming command to run -file <file> File/dir to be shipped in the Job jar file -dfs <h:p>|local Optional. Override DFS configuration -jt <h:p>|local Optional. Override JobTracker configuration -additionalconfspec specfile Optional. -inputformat TextInputFormat(default)|SequenceFileAsTextInputFormat|JavaClassName Optional. -outputformat TextOutputFormat(default)|JavaClassName Optional. -partitioner JavaClassName Optional. -numReduceTasks <num> Optional. -inputreader <spec> Optional. -jobconf <n>=<v> Optional. Add or override a JobConf property -cmdenv <n>=<v> Optional. Pass env.var to streaming commands -cacheFile fileNameURI -cacheArchive fileNameURI -verbose
?
下面简单说下参数的意思:
?
?