/*
 * Decompiled with CFR 0.152.
 */
package cascading.tuple.hadoop;

import cascading.PlatformTestCase;
import cascading.flow.Flow;
import cascading.flow.FlowProcess;
import cascading.operation.Aggregator;
import cascading.operation.BaseOperation;
import cascading.operation.Function;
import cascading.operation.FunctionCall;
import cascading.operation.OperationCall;
import cascading.operation.aggregator.Count;
import cascading.operation.regex.RegexParser;
import cascading.operation.regex.RegexSplitter;
import cascading.pipe.CoGroup;
import cascading.pipe.Each;
import cascading.pipe.Every;
import cascading.pipe.GroupBy;
import cascading.pipe.Pipe;
import cascading.scheme.Scheme;
import cascading.scheme.hadoop.SequenceFile;
import cascading.scheme.hadoop.TextLine;
import cascading.tap.SinkMode;
import cascading.tap.Tap;
import cascading.tap.hadoop.Hfs;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.tuple.TupleEntryIterator;
import cascading.tuple.coerce.Coercions;
import cascading.tuple.hadoop.BigDecimalSerialization;
import cascading.tuple.hadoop.BytesSerialization;
import cascading.tuple.hadoop.NoTokenTestBytesSerialization;
import cascading.tuple.hadoop.NoTokenTestSerialization;
import cascading.tuple.hadoop.TestSerialization;
import cascading.tuple.hadoop.TestText;
import cascading.tuple.hadoop.TestTextComparator;
import cascading.tuple.hadoop.TupleSerializationProps;
import cascading.tuple.hadoop.util.BytesComparator;
import data.InputData;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Type;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.junit.Test;

public class SerializedPipesPlatformTest
extends PlatformTestCase {
    public SerializedPipesPlatformTest() {
        super(true, 4, 2);
    }

    @Test
    public void testSimpleGroup() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Each(pipe, (Function)new InsertBytes(new Fields(new Comparable[]{"bytes"}), "inserted text as bytes"), Fields.ALL);
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"ip"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"ip", "count"}));
        pipe = new Each(pipe, (Function)new InsertBoolean(new Fields(new Comparable[]{"boolean"}), false), Fields.ALL);
        Hfs sink = new Hfs((Scheme)new SequenceFile(Fields.ALL), this.getOutputPath("serialization"), SinkMode.REPLACE);
        Map jobProperties = this.getProperties();
        TupleSerializationProps.addSerializationToken((Map)jobProperties, (int)1000, (String)BooleanWritable.class.getName());
        Flow flow = this.getPlatform().getFlowConnector(jobProperties).connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        SerializedPipesPlatformTest.validateLength((TupleEntryIterator)flow.openSource(), (int)10);
        SerializedPipesPlatformTest.validateLength((Flow)flow, (int)8, null);
    }

    @Test
    public void testSimpleGroupOnBytes() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Hfs source = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"ip"}));
        pipe = new Each(pipe, (Function)new InsertRawBytes(new Fields(new Comparable[]{"bytes"}), "inserted text as bytes", true, true), Fields.ALL);
        Fields bytes = new Fields(new Comparable[]{"bytes"});
        bytes.setComparator((Comparable)((Object)"bytes"), (Comparator)new BytesComparator());
        pipe = new GroupBy(pipe, bytes);
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"bytes", "count"}));
        Hfs sink = new Hfs((Scheme)new SequenceFile(Fields.ALL), this.getOutputPath("grouponbytes"), SinkMode.REPLACE);
        Map properties = this.getProperties();
        TupleSerializationProps.addSerialization((Map)properties, (String)BytesSerialization.class.getName());
        Flow flow = this.getPlatform().getFlowConnector(properties).connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        SerializedPipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    @Test
    public void testCoGroupWritableAsKeyValue() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new SequenceFile(Fields.ALL), this.getOutputPath("writablekeyvalue"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertBytes(new Fields(new Comparable[]{"group"}), "inserted text as bytes"), Fields.ALL);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes"), Fields.ALL);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertBytes(new Fields(new Comparable[]{"group"}), "inserted text as bytes"), Fields.ALL);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes"), Fields.ALL);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"group"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"group"}), Fields.size((int)8));
        Flow countFlow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)splice);
        countFlow.complete();
        SerializedPipesPlatformTest.validateLength((Flow)countFlow, (int)25);
    }

    @Test
    public void testCoGroupBytesWritableAsKeyValue() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"line"})), this.getOutputPath("byteswritablekeyvalue"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, new Fields(new Comparable[]{"char"}), (Function)new ReplaceAsBytes(new Fields(new Comparable[]{"char"})), Fields.REPLACE);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, new Fields(new Comparable[]{"char"}), (Function)new ReplaceAsBytes(new Fields(new Comparable[]{"char"})), Fields.REPLACE);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"num"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"num"}), Fields.size((int)4));
        Flow flow = this.getPlatform().getFlowConnector(this.getProperties()).connect(sources, (Tap)sink, (Pipe)splice);
        flow.complete();
        SerializedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        List results = SerializedPipesPlatformTest.getSinkAsList((Flow)flow);
        SerializedPipesPlatformTest.assertTrue((boolean)results.contains(new Tuple(new Object[]{"1\t61\t1\t41"})));
    }

    @Test
    public void testCoGroupSpillCustomWritable() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Hfs sink = new Hfs((Scheme)new SequenceFile(Fields.ALL), this.getOutputPath("customerwritable"), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", false), Fields.ALL);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertTestText(new Fields(new Comparable[]{"value"}), "inserted text as bytes", false), Fields.ALL);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertTestText(new Fields(new Comparable[]{"text"}), "inserted text as custom text", false), Fields.ALL);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", false), Fields.ALL);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertTestText(new Fields(new Comparable[]{"value"}), "inserted text as bytes", false), Fields.ALL);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertTestText(new Fields(new Comparable[]{"text"}), "inserted text as custom text", false), Fields.ALL);
        CoGroup splice = new CoGroup((Pipe)pipeLower, new Fields(new Comparable[]{"group"}), (Pipe)pipeUpper, new Fields(new Comparable[]{"group"}), Fields.size((int)10));
        Map properties = this.getProperties();
        properties.put("cascading.spill.list.threshold", 1);
        properties.put("io.serializations", TestSerialization.class.getName());
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, (Tap)sink, (Pipe)splice);
        flow.complete();
        SerializedPipesPlatformTest.validateLength((Flow)flow, (int)25);
    }

    @Test
    public void testCoGroupRawAsKeyValue() throws Exception {
        this.invokeRawAsKeyValue(false, true, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefault() throws Exception {
        this.invokeRawAsKeyValue(true, true, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultIgnoreToken() throws Exception {
        this.invokeRawAsKeyValue(true, true, true, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultIgnoreTokenCompositeGrouping() throws Exception {
        this.invokeRawAsKeyValue(true, true, true, true);
    }

    @Test
    public void testCoGroupRawAsKeyValueNoSecondary() throws Exception {
        this.invokeRawAsKeyValue(false, false, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultNoSecondary() throws Exception {
        this.invokeRawAsKeyValue(true, false, false, false);
    }

    @Test
    public void testCoGroupRawAsKeyValueDefaultNoSecondaryCompositeGrouping() throws Exception {
        this.invokeRawAsKeyValue(true, false, false, true);
    }

    private void invokeRawAsKeyValue(boolean useDefaultComparator, boolean secondarySortOnValue, boolean ignoreSerializationToken, boolean compositeGrouping) throws IOException {
        String value;
        this.getPlatform().copyFromLocal(InputData.inputFileLower);
        this.getPlatform().copyFromLocal(InputData.inputFileUpper);
        Hfs sourceLower = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileLower);
        Hfs sourceUpper = new Hfs((Scheme)new TextLine(new Fields(new Comparable[]{"offset", "line"})), InputData.inputFileUpper);
        HashMap<String, Hfs> sources = new HashMap<String, Hfs>();
        sources.put("lower", sourceLower);
        sources.put("upper", sourceUpper);
        RegexSplitter splitter = new RegexSplitter(new Fields(new Comparable[]{"num", "char"}), " ");
        Fields fields = new Fields(new Comparable[]{"num", "char", "group", "value", "num2", "char2", "group2", "value2"});
        Hfs sink = new Hfs((Scheme)new SequenceFile(fields), this.getOutputPath("/rawbyteskeyvalue/" + useDefaultComparator + "/" + secondarySortOnValue + "/" + ignoreSerializationToken + "/" + compositeGrouping), SinkMode.REPLACE);
        Each pipeLower = new Each(new Pipe("lower"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", true, 3, 4), Fields.ALL);
        pipeLower = new Each((Pipe)pipeLower, (Function)new InsertRawBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes", true), Fields.ALL);
        Each pipeUpper = new Each(new Pipe("upper"), new Fields(new Comparable[]{"line"}), (Function)splitter);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertTestText(new Fields(new Comparable[]{"group"}), "inserted text as bytes", true, 3, 4), Fields.ALL);
        pipeUpper = new Each((Pipe)pipeUpper, (Function)new InsertRawBytes(new Fields(new Comparable[]{"value"}), "inserted text as bytes", true), Fields.ALL);
        Fields groupFields = new Fields(new Comparable[]{"group"});
        if (compositeGrouping) {
            groupFields = new Fields(new Comparable[]{"group", "num"});
        }
        if (!useDefaultComparator) {
            groupFields.setComparator((Comparable)((Object)"group"), (Comparator)new TestTextComparator());
        }
        Fields declaredFields = new Fields(new Comparable[]{"num", "char", "group", "value", "num2", "char2", "group2", "value2"});
        CoGroup splice = new CoGroup((Pipe)pipeLower, groupFields, (Pipe)pipeUpper, groupFields, declaredFields);
        Fields valueFields = new Fields(new Comparable[]{"value"});
        if (!useDefaultComparator) {
            valueFields.setComparator((Comparable)((Object)"value"), (Comparator)new BytesComparator());
        }
        splice = secondarySortOnValue ? new GroupBy((Pipe)splice, groupFields, valueFields) : new GroupBy((Pipe)splice, groupFields);
        Map properties = this.getProperties();
        if (!ignoreSerializationToken) {
            TupleSerializationProps.addSerialization((Map)properties, (String)TestSerialization.class.getName());
            TupleSerializationProps.addSerialization((Map)properties, (String)BytesSerialization.class.getName());
        } else {
            TupleSerializationProps.addSerialization((Map)properties, (String)NoTokenTestSerialization.class.getName());
            TupleSerializationProps.addSerialization((Map)properties, (String)NoTokenTestBytesSerialization.class.getName());
        }
        this.getPlatform().setNumMapTasks(properties, 1);
        this.getPlatform().setNumReduceTasks(properties, 1);
        this.getPlatform().setNumGatherPartitionTasks(properties, 1);
        Flow flow = this.getPlatform().getFlowConnector(properties).connect(sources, (Tap)sink, (Pipe)splice);
        flow.complete();
        SerializedPipesPlatformTest.validateLength((Flow)flow, (int)5);
        TupleEntryIterator iterator = flow.openSink();
        TestText target = (TestText)((TupleEntry)iterator.next()).getObject((Comparable)((Object)"group"));
        String string = value = target == null ? null : target.value;
        while (iterator.hasNext()) {
            String next;
            TestText nextTarget = (TestText)((TupleEntry)iterator.next()).getObject((Comparable)((Object)"group"));
            String string2 = next = nextTarget == null ? null : nextTarget.value;
            if (value != null && value.compareTo(next) >= 0) {
                SerializedPipesPlatformTest.fail((String)("not increasing: " + value + " " + value));
            }
            value = next;
        }
        iterator.close();
    }

    @Test
    public void testBigDecimal() throws Exception {
        this.getPlatform().copyFromLocal(InputData.inputFileApache);
        Fields sourceFields = new Fields(new Comparable[]{"offset", "line"}).applyTypes(new Type[]{Coercions.BIG_DECIMAL, String.class});
        Hfs source = new Hfs((Scheme)new TextLine(sourceFields), InputData.inputFileApache);
        Pipe pipe = new Pipe("test");
        pipe = new Each(pipe, new Fields(new Comparable[]{"line"}), (Function)new RegexParser(new Fields(new Comparable[]{"ip"}), "^[^ ]*"), new Fields(new Comparable[]{"offset", "ip"}));
        pipe = new GroupBy(pipe, new Fields(new Comparable[]{"offset"}));
        pipe = new Every(pipe, (Aggregator)new Count(), new Fields(new Comparable[]{"offset", "count"}));
        Fields sinkFields = new Fields(new Comparable[]{"offset", "count"}).applyTypes(new Type[]{Coercions.BIG_DECIMAL, Long.TYPE});
        Hfs sink = new Hfs((Scheme)new SequenceFile(sinkFields), this.getOutputPath("bigdecimal"), SinkMode.REPLACE);
        Map jobProperties = this.getProperties();
        TupleSerializationProps.addSerialization((Map)jobProperties, (String)BigDecimalSerialization.class.getName());
        Flow flow = this.getPlatform().getFlowConnector(jobProperties).connect((Tap)source, (Tap)sink, pipe);
        flow.complete();
        SerializedPipesPlatformTest.validateLength((Flow)flow, (int)10);
    }

    public static class InsertTestText
    extends BaseOperation<Long>
    implements Function<Long> {
        private String testText;
        private boolean increment;
        private int moduloValueIsNull;
        private int moduloResultIsNull;

        public InsertTestText(Fields fieldDeclaration, String testText, boolean increment) {
            this(fieldDeclaration, testText, increment, -1, -1);
        }

        public InsertTestText(Fields fieldDeclaration, String testText, boolean increment, int moduloValueIsNull, int moduloResultIsNull) {
            super(fieldDeclaration);
            this.testText = testText;
            this.increment = increment;
            this.moduloValueIsNull = moduloValueIsNull;
            this.moduloResultIsNull = moduloResultIsNull;
        }

        public void prepare(FlowProcess flowProcess, OperationCall<Long> operationCall) {
            operationCall.setContext((Object)(this.increment ? 0L : -1L));
        }

        public void operate(FlowProcess flowProcess, FunctionCall<Long> functionCall) {
            String string = this.testText;
            if ((Long)functionCall.getContext() != -1L) {
                string = functionCall.getContext() + string;
                functionCall.setContext((Object)((Long)functionCall.getContext() + 1L));
                if (this.moduloValueIsNull != -1 && (Long)functionCall.getContext() % (long)this.moduloValueIsNull == 0L) {
                    string = null;
                }
            }
            TestText result = null;
            if (this.moduloResultIsNull != -1 && (Long)functionCall.getContext() % (long)this.moduloResultIsNull != 0L) {
                result = new TestText(string);
            }
            functionCall.getOutputCollector().add(new Tuple(new Object[]{result}));
        }
    }

    public static class Container
    implements Serializable,
    Comparable<String> {
        String value;

        public Container(String value) {
            this.value = value;
        }

        @Override
        public int compareTo(String o) {
            return this.value.compareTo(o);
        }
    }

    public static class InsertBoolean
    extends BaseOperation
    implements Function {
        boolean asBoolean;

        public InsertBoolean(Fields fieldDeclaration, boolean asBoolean) {
            super(fieldDeclaration);
            this.asBoolean = asBoolean;
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            functionCall.getOutputCollector().add(new Tuple(new Object[]{new BooleanWritable(this.asBoolean)}));
        }
    }

    public static class InsertRawBytes
    extends BaseOperation<Long>
    implements Function<Long> {
        String asBytes;
        private boolean increment = false;
        private boolean randomIncrement = false;

        public InsertRawBytes(Fields fieldDeclaration, String asBytes, boolean increment, boolean randomIncrement) {
            super(fieldDeclaration);
            this.asBytes = asBytes;
            this.increment = increment;
            this.randomIncrement = randomIncrement;
        }

        public InsertRawBytes(Fields fieldDeclaration, String asBytes, boolean increment) {
            super(fieldDeclaration);
            this.asBytes = asBytes;
            this.increment = increment;
        }

        public void prepare(FlowProcess flowProcess, OperationCall<Long> operationCall) {
            operationCall.setContext((Object)(this.increment ? this.getIncrement(0L) : -1L));
        }

        private long getIncrement(long value) {
            if (this.randomIncrement) {
                return value + (long)(Math.random() * (double)new Object().hashCode());
            }
            return value + 1L;
        }

        public void operate(FlowProcess flowProcess, FunctionCall<Long> functionCall) {
            String string = this.asBytes;
            if ((Long)functionCall.getContext() != -1L) {
                string = functionCall.getContext() + string;
                functionCall.setContext((Object)((Long)functionCall.getContext() + 1L));
            }
            functionCall.getOutputCollector().add(new Tuple(new Object[]{string.getBytes()}));
        }
    }

    public static class ReplaceAsBytes
    extends BaseOperation
    implements Function {
        public ReplaceAsBytes(Fields fieldDeclaration) {
            super(fieldDeclaration);
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            functionCall.getOutputCollector().add(new Tuple(new Object[]{new BytesWritable(functionCall.getArguments().getString((Comparable)Integer.valueOf(0)).getBytes())}));
        }
    }

    public static class InsertBytes
    extends BaseOperation
    implements Function {
        String asBytes;

        public InsertBytes(Fields fieldDeclaration, String asBytes) {
            super(fieldDeclaration);
            this.asBytes = asBytes;
        }

        public void operate(FlowProcess flowProcess, FunctionCall functionCall) {
            functionCall.getOutputCollector().add(new Tuple(new Object[]{new BytesWritable(this.asBytes.getBytes())}));
        }
    }
}

