Share via


Framework for .Net Hadoop MapReduce Job Submission TextOutput Type

Some recent changes made to the “Generics based Framework for .Net Hadoop MapReduce Job Submission” code were to support Json and Binary Serialization from Mapper, in and out of Combiners, and out from the Reducer. However, this precluded one from controlling the format of the Text output. Say one wanted to create a tab delimited string from the Reducer.  This could only be done using Json Serialization. To better support allowing one to construct the final text output I have created a new TextOutput type.

This TextOutput type is simple in structure. However, when this type is encountered during the serialization process, both Json and Binary serialization are bypassed and the text is written out in its raw format; including tabs and other characters usually escaped by the Json serializer.

As an example here is a modified version of one of the C# Reducer samples that supports both Json and Text output:

namespace MSDN.Hadoop.MapReduceCSharp
{
    [DataContract]
    public class MobilePhoneRange
    {
        [DataMember] public TimeSpan MinTime { get; set; }
        [DataMember] public TimeSpan MaxTime { get; set; }

        public MobilePhoneRange(TimeSpan minTime, TimeSpan maxTime)
        {
            this.MinTime = minTime;
            this.MaxTime = maxTime;
        }

        public TextOutput ToText(string format)
        {
            return new TextOutput(String.Format(@"({0}, {1})", this.MinTime.ToString(format), this.MaxTime.ToString(format)));
        }
    }

    public class MobilePhoneRangeReducer : ReducerBase<TimeSpan, MobilePhoneRange>
    {
        public override IEnumerable<Tuple<string, MobilePhoneRange>> Reduce(string key, IEnumerable<TimeSpan> value)
        {
            var baseRange = new MobilePhoneRange(TimeSpan.MaxValue, TimeSpan.MinValue);
            var rangeValue = value.Aggregate(baseRange, (accSpan, timespan) =>
                new MobilePhoneRange((timespan < accSpan.MinTime) ? timespan : accSpan.MinTime, (timespan > accSpan.MaxTime ) ? timespan : accSpan.MaxTime));

            yield return new Tuple<string, MobilePhoneRange>(key, rangeValue);
        }
    }

    public class MobilePhoneRangeTextReducer : ReducerBase<TimeSpan, TextOutput>
    {
        public override IEnumerable<Tuple<string, TextOutput>> Reduce(string key, IEnumerable<TimeSpan> value)
        {
            var baseRange = new MobilePhoneRange(TimeSpan.MaxValue, TimeSpan.MinValue);
            var rangeValue = value.Aggregate(baseRange, (accSpan, timespan) =>
                new MobilePhoneRange((timespan < accSpan.MinTime) ? timespan : accSpan.MinTime, (timespan > accSpan.MaxTime) ? timespan : accSpan.MaxTime));

            yield return new Tuple<string, TextOutput>(key, rangeValue.ToText("G"));
        }
    }
}

For the sample Reducer above the Json serialization output would be:

Android {"MaxTime":"PT23H59M54S","MinTime":"PT6S"}
RIM OS {"MaxTime":"PT23H59M58S","MinTime":"PT1M7S"}
Unknown {"MaxTime":"PT23H52M36S","MinTime":"PT36S"}
Windows Phone {"MaxTime":"PT23H55M17S","MinTime":"PT32S"}
iPhone OS {"MaxTime":"PT23H59M50S","MinTime":"PT1S"}

The corresponding Text Output would be:

Android (0:00:00:06.0000000, 0:23:59:54.0000000)
RIM OS (0:00:01:07.0000000, 0:23:59:58.0000000)
Unknown (0:00:00:36.0000000, 0:23:52:36.0000000)
Windows Phone (0:00:00:32.0000000, 0:23:55:17.0000000)
iPhone OS (0:00:00:01.0000000, 0:23:59:50.0000000)

As mentioned the actual definition of the TextOutput type is simple and is just a wrapper over a string, although depending on needs this may change:

type TextOutput(value:string) =

    // Internal text value
    let mutable text = value

    /// String value of the TextOutput class
    member this.Text
        with get () = text
        and set (value) = text <- value

    /// Byte array value of the TextOutput class
    member this.Bytes
        with get() = Encoding.UTF8.GetBytes(text)
        and set (value:byte array) = text <- Encoding.UTF8.GetString(value)

    new(value:byte array) =
        TextOutput(Encoding.UTF8.GetString(value))

    new(value:TextOutput) =
        TextOutput(value.Text)

    new() =
        TextOutput(String.Empty)

    /// Clear Text
    member this.Clear() =
        text <- String.Empty

    /// Append Text
    member this.Append(value:string) =
        text <- text + value
        text

    /// ToString override
    override this.ToString() =
        text

    ///Equals override
    override this.Equals(value:obj) =
        if not (value.GetType() = typeof<TextOutput>) then
            false
        else
            let objText = (value :?> TextOutput)
            text.Equals(objText.Text)

    /// GetHasCode override
    override this.GetHashCode() =
        text.GetHashCode()

One of the main rationales for adding the TextOutput support is so that data output by the framework can be easily used by Hive CREATE TABLE statements.

Hope you find this change useful.