MapReduce для обработки слабоструктурированных данных в HDInsight

 

В данном примере мы разберем создание и выполнение типового задания MapReduce в облачной реализации Hadoop от Microsoft, которая носит название HDInsight.

В предыдущем примере мы создали 3-узловой кластер Hadoop и загрузили абстрактный журнал слабоструктурированного формата, который сейчас предстоит обработать. Журнал представляет собой в общем случае большой (в нашем конкретном примере маленький, но на принципиальную демонстрацию идеи это не влияет) текстовый файл, содержащий строки с признаками TRACE, DEBUG, INFO, WARN, ERROR, FATAL. Наша элементарная задача будет состоять в том, чтобы подсчитать количество строк с каждым признаком, т.е. сколько раз возникала ситуация WARN, сколько ERROR и т.д. Выражаясь в терминах SQL, нужно сделать COUNT() … GROUP BY по полю признака. Понятно, что поля как такового нет, поскольку файл представляет собой не табличку, а набор строк с текстовым описанием проблемы, в которых встречается подстрока с названием признака. Нужно пробежаться по всем строкам, выделить подстроку признака и просуммировать. Проще говоря, из

 

2012-02-03 18:35:34 SampleClass6 [INFO] everything normal for id 577725851

2012-02-03 18:35:34 SampleClass4 [FATAL] system problem at id 1991281254

2012-02-03 18:35:34 SampleClass3 [DEBUG] detail for id 1304807656

2012-02-03 18:35:34 SampleClass3 [WARN] missing id 423340895

2012-02-03 18:35:34 SampleClass5 [TRACE] verbose detail for id 2082654978

2012-02-03 18:35:34 SampleClass0 [ERROR] incorrect id 1886438513

...

Скрипт 1

 

требуется получить что-нибудь типа

[TRACE] 10

[DEBUG] 20

[INFO] 30

[WARN] 555

[ERROR] 777

[FATAL] 1

Скрипт 2

Идея модели MapReduce очень проста. При наличии распределенной системы, каковой является кластер Hadoop, общее задание разделяется (Map) на параллельные подзадания. Как отмечалось в предыдущем примере, подлежащий обработке при сохранении в файловую систему Hadoop прозрачно для пользователя разбивается на фрагменты по узлам. Теоретически эти узлы могут быть распределены территориально, т.е. находиться в различных географических локациях. Чтобы минимизировать издержки, связанные с передачей данных между ЦОДами (или просто между отдельными узлами), Hadoop учитывает территориальную близость данных - каждое подзадание работает со своим фрагментом данных. В нашем случае узлов в кластере всего 3, не до роскоши. Подзадания будут выполняться на тех же узлах, где лежат фрагменты данных. Результаты выполнения подзаданий затем агрегируются функциями Reduce в единый результат, возвращаемый пользователю. Иными словами, каждый узел выдаст свой частный подрезультат, например, первый -

[TRACE] 1

[DEBUG] 2

[INFO]  3

...

Скрипт 3

 

второй -

 

[TRACE] 9

[DEBUG] 5

[INFO] 7

...

Скрипт 4

из которых в итоге будет составлен общий результат Скрипт 2. Это общая идея параллельной обработки, получившая реализацию в том числе в традиционных реляционных серверах баз данных (напр., Oracle RAC, Microsoft SQL Server Parallel Datawarehouse и т.д.) и облачных сервисах реляционной обработки данных (напр., федерированная БД в Windows Azure SQL Database, ранее известная как шардинг в SQL Azure). Но в данном случае мы имеем дело не с реляционным, а слабоструктурированным форматом входных данных, поэтому, вместо скриптов SQL, функции, выполняющие роль Map / Reduce нам придется написать самостоятельно. Идея MapReduce реализована на различных языках. Например, в бесплатном проекте Apache Hadoop с открытым исходным кодом для этих целей используется Java. Поскольку Microsoft HDInsight совместим с Apache Hadoop, мы также будем использовать язык Java и пакет org.apache.hadoop.mapreduce.

Вначале реализуется класс Map, производный от Mapper. Класс Mapper преобразует исходный набор пар ключ/значение в промежуточный. В нашем случае входными значениями являются строки текстового файла журнала - параметр value типа Text метода map. Внутри метода в каждой value ищем квадратные скобки, вытаскиваем то, что находится между ними, сравниваем с константным набором признаков, который в начале положили в переменную pattern и, если соответствует (if(matcher.matches())), формируем выходную пару ключ-значение. Ключом является подстрока признака TRACE / DEBUG / … (текстовая переменная logLevel), а значением 1. Значение содержится в переменной accumulator типа IntWritable, которую мы инициализировали в конструкторе единицей. IntWriteable является оберткой вокруг явовского типа int, имплементирующей интерфейс Writable. Hadoop использует собственный формат сериализации. Эти единички мы будем складывать в функции Reduce, чтобы подсчитать число вхождений каждого признака. Промежуточные (выходные) значения группируются средой Hadoop для каждого выходного ключа. На этапе маппинга можно выполнить предварительную агрегацию при помощи setCombinerClass, чтобы сократить данные. передаваемые в Reducer. В данном примере эта возможность не используется. Класс Reporter (последний параметр метода map) предназначен для отображения статуса и прогресса выполнения, обновления счетчиков и т.п. В нашем простом примере он также не используется.

Класс Reduce, производный от Reducer, решает обратную задачу. Он собирает промежуточные результаты маппирования и агрегирует их, выполняя в данном случае пресловутый COUNT() значений, т.к. GROUP BY по ключам (включая сортировку) был выполнен в ходе маппирования. Входные типы (Text, IntWritable) для Reduce должны соответствовать выходным от Map. В ходе слияния результатов на этапе Reduce среда Hadoop выполняет вторичную сортировку, поскольку результаты, полученные от различных мапперов могут иметь одинаковые ключи. Таким образом, входной результат для метода Reduce представляет собой набор строк ключ - коллекция соответствующих ему значений. Например, одной из строк будет TRACE (ключ) и коллекция из стольких единичек, сколько вхождений этого признака определил тот или иной экземпляр маппера. Нам остается пробежаться по коллекции и просуммировать единички в переменную count. В OutputCollector записываем традиционную пару ключ-значение, только значением здесь будет результат агрегации по ключу.

Метод main() используется для создания задания Hadoop на основе созданных классов Map и Reduce и его выполнения. Объект JobConf формирует спецификацию задания. Код записывается в JAR-файл, который Hadoop будет распространять по кластеру. Вместо явного указания имени файла, можно передать объемлющий класс, содержащий выполняемый код, (MapReduceTest) в конструктор JobConf, по которому Hadoop найдет соответствующий JAR-файл. Методы setOutputKeyClass() и setOutputValueClass() задают выходные типы для функций Map и Reduce. Как правило, они совпадают, т.е. Map выдает то же, что Reduce. Если они отличаются, выходные типы функции Map можно оговорить с помощью методов setMapOutputKeyClass() и setMapOutputValueClass(). Какой класс будет делать Map, а какой Reduce, как несложно догадаться, задается при помощи методов setMapperClass() и setReducerClass(). Осталось прописать формат ввода/вывода. Это делается методами setInputFormat() и setOutputFormat(). В данном случае этого можно было не делать, т.к. текстовый формат принят по умолчанию. В заключение нужно прописать пути к файлам с исходными данными и рзультатами при помощи статических методов FileInputFormat.setInputPaths() и FileOutputFormat.setOutputPath(). Мы будем передавать имена файлов через аргументы командной строки. Как видно из названия метода, входных файлов может быть несколько. Может быть директория, тогда будут взяты все содержащиеся в ней файлы. Можно указать шаблон имени файла. В качестве локации, куда будут складываться файлы результатов назначается директория. Она не должна существовать, иначе при выполнении произойдет ошибка. Своего рода мера защиты, чтобы одно задание не перетерло результат выполнения другого. Удалить директорию можно при помощи команды hadoop fs -rmr.

Собирая сказанное вместе, получаем следующий код:

//Стандартный явовский импорт

import java.io.IOException;

import java.util.Iterator;

import java.util.regex.Matcher;

import java.util.regex.Pattern;

//Импорт, относящийся к Hadoop

import org.apache.hadoop.fs.Path;

import org.apache.hadoop.io.IntWritable;

import org.apache.hadoop.io.LongWritable;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapred.FileInputFormat;

import org.apache.hadoop.mapred.FileOutputFormat;

import org.apache.hadoop.mapred.JobClient;

import org.apache.hadoop.mapred.JobConf;

import org.apache.hadoop.mapred.MapReduceBase;

import org.apache.hadoop.mapred.Mapper;

import org.apache.hadoop.mapred.OutputCollector;

import org.apache.hadoop.mapred.Reducer;

import org.apache.hadoop.mapred.Reporter;

import org.apache.hadoop.mapred.TextInputFormat;

import org.apache.hadoop.mapred.TextOutputFormat;

public class MapReduceTest

{

/*

 * Маппирование

 */

            public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable>

            {

                        private static final Pattern pattern = Pattern.compile("(TRACE)|(DEBUG)|(INFO)|(WARN)|(ERROR)|(FATAL)"); //список паттернов признаков

                        private static final IntWritable accumulator = new IntWritable(1); //константная единичка в кач-ве значения, если признак найден

   private Text logLevel = new Text();

                        public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable>collector, Reporter reporter)

                                                throws IOException

                                                { // поиск по разделителям '[' и ']'

                                    final String[] tokens = value.toString().split("[ \\[\\]]");

                                    if(tokens != null)

                                    {

                                                //вычленяем признак logLevel

                                                for(final String token : tokens)

    {

                                                            final Matcher matcher = pattern.matcher(token);

                                                            if(matcher.matches()) //если найден

      {

                                                                        logLevel.set(token);

                                                                        collector.collect(logLevel, accumulator); //формируем пары ключ-значение

                                                            }

                                                }

                                    }

                        }

            }

/*

 * Редуцирование

 */

            public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable>

            {

                        public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> collector, Reporter reporter) throws IOException

                        {

                                    int count = 0;

                  //агрегируем в count число вхождений признака

      while(values.hasNext())

                                    { count += values.next().get(); }

                                    System.out.println(key + "\t" + count);

                                    collector.collect(key, new IntWritable(count));

                        }

            }

/*

 * Создаем задание

 */

            public static void main(String[] args) throws Exception

            {

            //конфигурация джобы с назаначением объемлющего класса и классов, выполняющих Map/Reduce

                        final JobConf conf = new JobConf(MapReduceTest.class);

                        conf.setOutputKeyClass(Text.class);

                        conf.setOutputValueClass(IntWritable.class);

                        conf.setMapperClass(Map.class);

                        conf.setReducerClass(Reduce.class);

                        conf.setInputFormat(TextInputFormat.class);

                        conf.setOutputFormat(TextOutputFormat.class);

            //входные-выходные пути берутся из аргументов командной строки

                        FileInputFormat.setInputPaths(conf, new Path(args[0]));

                        FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            //выполняем задание

                        JobClient.runJob(conf);

            }

}

Скрипт 5

Зайдем на хадуповский кластер через Remote Desktop, как показывалось в предыдущей статье, и сохраним этот код в файл MapReduceTest.java, скажем, в той же d:\Temp. Библиотеки поддержки Java в HDInsight находятся в C:\apps\java\bin. Hadoop про это не знает. Имеет смысл зайти в окно командной строки Hadoop (D:\Windows\system32\cmd.exe /k pushd "c:\apps\dist\hadoop-1.1.0-SNAPSHOT" && "c:\apps\dist\hadoop-1.1.0-SNAPSHOT\bin\hadoop.cmd", для удобства на рабочем столе HDInsight имеется ярлык) и прописать этот путь в переменную окружения %path%:

set PATH=%PATH%;C:\apps\java\bin

Скрипт 6

Перейдем в директорию d:\Temp и скомпилируем явовский файл в файлы класса байт-кода. Свитч -encoding потребовался, поскольку я сохранил MapReduceTest.java в юникодовской кодировке.

javac -encoding UNICODE -classpath C:\apps\dist\hadoop-1.1.0-SNAPSHOT\hadoop-core-*.jar d:\Temp\MapReduceTest.java

Скрипт 7

В d:\Temp образовался файл MapReduceTest.class и файлы MapReduceTest$Map.class и MapReduceTest$Reduce.class, соответствующие аложенным классам. Построим сборку:

jar -cvf MapReduceTest.jar *.class

Скрипт 8

image

Рис.1 

По текущему пути d:\Tempобразовался явовский архив MapReduceTest.jar.

 hadoop jar MapReduceTest.jar MapReduceTest Sample1/input/Sample.log Sample1/output

 Скрипт 9

 image

Рис.2 

Здесь Sample1/input/Sample.log - подлежащий обработке файл журнала, загруженный из локальной директории d:\Temp в директорию HDFS /Sample1/Input - см. Рис.5 предыдущей статьи. В прошлый раз я забыл заострить внимание, что перед загрузкой необходимо в явном виде создать входную директорию HDFS (hadoopfs -mkdirSample1/input/) и только после этого класть в нее файл (hadoop fs -put d:\Temp\Sample.log Sample1/input/). Если попытаться загрузить файл без предварительного создания директории, она создается, но файл в нее не загружается, в чем можно убедиться hadoop fs -ls Sample1/input/.

Тем временем задание успешно отработало. В выходной директории HDFSSample1/output образовался файл с результатами, содержащий число появлений каждого признака в журнале, как заказывали: 

hadoop fs -cat Sample1/output/part-00000

Скрипт 10 

image

Рис.3

Алексей Шуленин