package org.apache.flink.test.hadoopcompatibility.mapred;

import java.io.IOException;
import java.util.Iterator;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction;
import org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.hamcrest.core.IsEqual;
import org.junit.Assume;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase.class */
public class HadoopReduceCombineFunctionITCase extends MultipleProgramsTestBase {

    @Rule
    public TemporaryFolder tempFolder;

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$ConfigurableCntReducer.class */
    public static class ConfigurableCntReducer implements Reducer<IntWritable, Text, IntWritable, IntWritable> {
        private String countPrefix;

        public void reduce(IntWritable intWritable, Iterator<Text> it, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
            int i = 0;
            while (it.hasNext()) {
                if (it.next().toString().startsWith(this.countPrefix)) {
                    i++;
                }
            }
            outputCollector.collect(intWritable, new IntWritable(i));
        }

        public void configure(JobConf jobConf) {
            this.countPrefix = jobConf.get("my.cntPrefix");
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
            reduce((IntWritable) obj, (Iterator<Text>) it, (OutputCollector<IntWritable, IntWritable>) outputCollector, reporter);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$KeyChangingReducer.class */
    public static class KeyChangingReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        public void reduce(IntWritable intWritable, Iterator<IntWritable> it, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
            while (it.hasNext()) {
                outputCollector.collect(new IntWritable(intWritable.get() % 4), it.next());
            }
        }

        public void configure(JobConf jobConf) {
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
            reduce((IntWritable) obj, (Iterator<IntWritable>) it, (OutputCollector<IntWritable, IntWritable>) outputCollector, reporter);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$Mapper1.class */
    public static class Mapper1 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
        private static final long serialVersionUID = 1;
        Tuple2<IntWritable, IntWritable> outT = new Tuple2<>();

        public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> tuple2) throws Exception {
            this.outT.f0 = new IntWritable(((IntWritable) tuple2.f0).get() / 6);
            this.outT.f1 = new IntWritable(1);
            return this.outT;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$Mapper2.class */
    public static class Mapper2 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
        private static final long serialVersionUID = 1;
        Tuple2<IntWritable, IntWritable> outT = new Tuple2<>();

        public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> tuple2) throws Exception {
            this.outT.f0 = new IntWritable(0);
            this.outT.f1 = tuple2.f0;
            return this.outT;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$Mapper3.class */
    public static class Mapper3 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, IntWritable>> {
        private static final long serialVersionUID = 1;
        Tuple2<IntWritable, IntWritable> outT = new Tuple2<>();

        public Tuple2<IntWritable, IntWritable> map(Tuple2<IntWritable, Text> tuple2) throws Exception {
            this.outT.f0 = tuple2.f0;
            this.outT.f1 = new IntWritable(1);
            return this.outT;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$Mapper4.class */
    public static class Mapper4 implements MapFunction<Tuple2<IntWritable, Text>, Tuple2<IntWritable, Text>> {
        private static final long serialVersionUID = 1;

        public Tuple2<IntWritable, Text> map(Tuple2<IntWritable, Text> tuple2) throws Exception {
            tuple2.f0 = new IntWritable(((IntWritable) tuple2.f0).get() % 5);
            return tuple2;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopReduceCombineFunctionITCase$SumReducer.class */
    public static class SumReducer implements Reducer<IntWritable, IntWritable, IntWritable, IntWritable> {
        public void reduce(IntWritable intWritable, Iterator<IntWritable> it, OutputCollector<IntWritable, IntWritable> outputCollector, Reporter reporter) throws IOException {
            int i = 0;
            while (true) {
                int i2 = i;
                if (!it.hasNext()) {
                    outputCollector.collect(intWritable, new IntWritable(i2));
                    return;
                }
                i = i2 + it.next().get();
            }
        }

        public void configure(JobConf jobConf) {
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void reduce(Object obj, Iterator it, OutputCollector outputCollector, Reporter reporter) throws IOException {
            reduce((IntWritable) obj, (Iterator<IntWritable>) it, (OutputCollector<IntWritable, IntWritable>) outputCollector, reporter);
        }
    }

    public HadoopReduceCombineFunctionITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
        this.tempFolder = new TemporaryFolder();
    }

    @Test
    public void testStandardCountingWithCombiner() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        GroupReduceOperator reduceGroup = HadoopTestData.getKVPairDataSet(executionEnvironment).map(new Mapper1()).groupBy(new int[]{0}).reduceGroup(new HadoopReduceCombineFunction(new SumReducer(), new SumReducer()));
        String uri = this.tempFolder.newFile().toURI().toString();
        reduceGroup.writeAsText(uri);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("(0,5)\n(1,6)\n(2,6)\n(3,4)\n", uri);
    }

    @Test
    public void testUngroupedHadoopReducer() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        GroupReduceOperator reduceGroup = HadoopTestData.getKVPairDataSet(executionEnvironment).map(new Mapper2()).reduceGroup(new HadoopReduceCombineFunction(new SumReducer(), new SumReducer()));
        String uri = this.tempFolder.newFile().toURI().toString();
        reduceGroup.writeAsText(uri);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("(0,231)\n", uri);
    }

    @Test
    public void testCombiner() throws Exception {
        Assume.assumeThat(this.mode, new IsEqual(MultipleProgramsTestBase.TestExecutionMode.CLUSTER));
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        GroupReduceOperator reduceGroup = HadoopTestData.getKVPairDataSet(executionEnvironment).map(new Mapper3()).groupBy(new int[]{0}).reduceGroup(new HadoopReduceCombineFunction(new SumReducer(), new KeyChangingReducer()));
        String uri = this.tempFolder.newFile().toURI().toString();
        reduceGroup.writeAsText(uri);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("(0,5)\n(1,6)\n(2,5)\n(3,5)\n", uri);
    }

    @Test
    public void testConfigurationViaJobConf() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JobConf jobConf = new JobConf();
        jobConf.set("my.cntPrefix", "Hello");
        GroupReduceOperator reduceGroup = HadoopTestData.getKVPairDataSet(executionEnvironment).map(new Mapper4()).groupBy(new int[]{0}).reduceGroup(new HadoopReduceFunction(new ConfigurableCntReducer(), jobConf));
        String uri = this.tempFolder.newFile().toURI().toString();
        reduceGroup.writeAsText(uri);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("(0,0)\n(1,0)\n(2,1)\n(3,1)\n(4,1)\n", uri);
    }
}
