Написание простого MapReduce-приложения

MapReduce — модель распределённых вычислений, используемая для параллельных вычислений над очень большими, несколько петабайт, наборами данных в компьютерных кластерах. Преимущество MapReduce заключается в том, что он позволяет распределенно производить операции предварительной обработки и свертки. Операции предварительной обработки работают независимо друг от друга и могут производиться параллельно (хотя на практике это ограничено источником входных данных и/или количеством используемых процессоров). Аналогично, множество рабочих узлов могут осуществлять свертку — для этого необходимо только чтобы все результаты предварительной обработки с одним конкретным значением ключа обрабатывались одним рабочим узлом в один момент времени. Хотя этот процесс может быть менее эффективным по сравнению с более последовательными алгоритмами, MapReduce может быть применен к большим объёмам данных, которые могут обрабатываться большим количеством серверов. Так, MapReduce может быть использован для сортировки петабайта данных, что займет всего лишь несколько часов. Параллелизм также дает некоторые возможности восстановления после частичных сбоев серверов: если в рабочем узле, производящем операцию предварительной обработки или свертки, возникает сбой, то его работа может быть передана другому рабочему узлу (при условии, что входные данные для проводимой операции доступны).

Создаем тестовую директорию в файловой системе hdfs:

$ su - hdfs

$ hdfs dfs -mkdir /user/mytest

$ hdfs dfs -ls /user/

Found 6 items

drwxr-xr-x - hdfs supergroup 0 2016-12-27 14:43 /user/hadoop

drwxr-xr-x - hdfs supergroup 0 2016-12-27 12:39 /user/hdfs

drwxrwxrwx - mapred hadoop 0 2016-12-26 14:19 /user/history

drwxrwxr-t - hive hive 0 2016-12-27 10:26 /user/hive

drwxrwxr-x - hue hue 0 2016-12-27 10:27 /user/hue

drwxr-xr-x - hdfs supergroup 0 2016-12-27 12:23 /user/mytest

Создаем текстовый файл любого содержания и переносим его в HDFS: 

$ hdfs dfs -copyFromLocal /tmp/datatest /user/mytest/

$ hdfs dfs -cat /user/mytest/datatest

This guide describes how to quickly install Cloudera software and create initial deployments for proof of concept (POC) or development. It describes how to download and use the QuickStart virtual machines, which provide everything you need to start a basic installation. It also shows you how to create a new installation of Cloudera Manager 5, CDH 5, and managed services on a cluster of four hosts. QuickStart installations should be used for demonstrations and POC applications only and are not recommended for production.

Создадим простое MapReduce-приложение на языке Python для подсчета слов.

Map:

$ cat > /tmp/map.py

#!/usr/bin/env python

import sys

# input comes from STDIN (standard input)

for line in sys.stdin:

    # remove leading and trailing whitespace

    line = line.strip()

    # split the line into words

    words = line.split()

    # increase counters

    for word in words:

        # write the results to STDOUT (standard output);

        # what we output here will be the input for the

        # Reduce step, i.e. the input for reducer.py

        #

        # tab-delimited; the trivial word count is 1

        print '%s\t%s' % (word, 1)


Reduce:

$ cat > /tmp/reduce.py

#!/usr/bin/env python

from operator import itemgetter

import sys

current_word = None

current_count = 0

word = None

# input comes from STDIN

for line in sys.stdin:

    # remove leading and trailing whitespace

    line = line.strip()

    # parse the input we got from mapper.py

    word, count = line.split('\t', 1)

    # convert count (currently a string) to int

    try:

        count = int(count)

        except ValueError:

            # count was not a number, so silently

            # ignore/discard this line

            continue

        # this IF-switch only works because Hadoop sorts map output

        # by key (here: word) before it is passed to the reducer

        if current_word == word:

            current_count += count

        else:

            if current_word:

                # write result to STDOUT

                print '%s\t%s' % (current_word, current_count)

            current_count = count

            current_word = word

# do not forget to output the last word if needed!

if current_word == word:

    print '%s\t%s' % (current_word, current_count)

Вызываем MapReduce приложение в среде Hadoop:

$ hadoop jar /opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/jars/hadoop-streaming-2.6.0-cdh5.9.0.jar -files /tmp/map.py,/tmp/reduce.py -mapper map.py -reducer reduce.py -input /user/mytest/datatest -output /user/mytest/outputtest

packageJobJar: [] [/opt/cloudera/parcels/CDH-5.9.0-1.cdh5.9.0.p0.23/jars/hadoop-streaming-2.6.0-cdh5.9.0.jar] /tmp/streamjob6732053312263900298.jar tmpDir=null

17/01/10 12:03:06 INFO client.ConfiguredRMFailoverProxyProvider: Failing over to rm122

17/01/10 12:03:07 INFO mapred.FileInputFormat: Total input paths to process : 1

17/01/10 12:03:07 INFO mapreduce.JobSubmitter: number of splits:2

17/01/10 12:03:07 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1484035485729_0001

17/01/10 12:03:08 INFO impl.YarnClientImpl: Submitted application application_1484035485729_0001

17/01/10 12:03:08 INFO mapreduce.Job: The url to track the job: http://master2.hadoop.stage.int.nic.ru:8088/proxy/application_1484035485729_0001/

17/01/10 12:03:08 INFO mapreduce.Job: Running job: job_1484035485729_0001

17/01/10 12:03:16 INFO mapreduce.Job: Job job_1484035485729_0001 running in uber mode : false

17/01/10 12:03:16 INFO mapreduce.Job: map 0% reduce 0%

17/01/10 12:03:25 INFO mapreduce.Job: map 100% reduce 0%

17/01/10 12:03:34 INFO mapreduce.Job: map 100% reduce 100%

17/01/10 12:03:35 INFO mapreduce.Job: Job job_1484035485729_0001 completed successfully

17/01/10 12:03:35 INFO mapreduce.Job: Counters: 50

File System Counters

FILE: Number of bytes read=616

FILE: Number of bytes written=393679

FILE: Number of read operations=0

FILE: Number of large read operations=0

FILE: Number of write operations=0

HDFS: Number of bytes read=973

HDFS: Number of bytes written=550

HDFS: Number of read operations=9

HDFS: Number of large read operations=0

HDFS: Number of write operations=2

Job Counters

Launched map tasks=2

Launched reduce tasks=1

Data-local map tasks=1

Rack-local map tasks=1

Total time spent by all maps in occupied slots (ms)=13480

Total time spent by all reduces in occupied slots (ms)=5332

Total time spent by all map tasks (ms)=13480

Total time spent by all reduce tasks (ms)=5332

Total vcore-seconds taken by all map tasks=13480

Total vcore-seconds taken by all reduce tasks=5332

Total megabyte-seconds taken by all map tasks=13803520

Total megabyte-seconds taken by all reduce tasks=5459968

Map-Reduce Framework

Map input records=1

Map output records=83

Map output bytes=692

Map output materialized bytes=628

Input split bytes=184

Combine input records=0

Combine output records=0

Reduce input groups=61

Reduce shuffle bytes=628

Reduce input records=83

Reduce output records=61

Spilled Records=166

Shuffled Maps =2

Failed Shuffles=0

Merged Map outputs=2

GC time elapsed (ms)=151

CPU time spent (ms)=3110

Physical memory (bytes) snapshot=1091223552

Virtual memory (bytes) snapshot=4757819392

Total committed heap usage (bytes)=813170688

Shuffle Errors

BAD_ID=0

CONNECTION=0

IO_ERROR=0

WRONG_LENGTH=0

WRONG_MAP=0

WRONG_REDUCE=0

File Input Format Counters

Bytes Read=789

File Output Format Counters

Bytes Written=550

17/01/10 12:03:35 INFO streaming.StreamJob: Output directory: /user/mytest/outputtest


Т.е. в скрипт map.py отправляется на вход текстовый файл, а на вход reduce.py отправляется результат работы скрипта map.py.

Ответ в директории /user/mytest/outputtest.

 

unix-way