package org.apache.flink.test.hadoopcompatibility.mapred;

import java.io.IOException;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.FlatMapOperator;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase.class */
public class HadoopMapFunctionITCase extends MultipleProgramsTestBase {

    @Rule
    public TemporaryFolder tempFolder;

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase$ConfigurableMapper.class */
    public static class ConfigurableMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
        private String filterPrefix;

        public void map(IntWritable intWritable, Text text, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
            if (text.toString().startsWith(this.filterPrefix)) {
                outputCollector.collect(intWritable, text);
            }
        }

        public void configure(JobConf jobConf) {
            this.filterPrefix = jobConf.get("my.filterPrefix");
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((IntWritable) obj, (Text) obj2, (OutputCollector<IntWritable, Text>) outputCollector, reporter);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase$DuplicatingMapper.class */
    public static class DuplicatingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
        public void map(IntWritable intWritable, Text text, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
            outputCollector.collect(intWritable, text);
            outputCollector.collect(intWritable, new Text(text.toString().toUpperCase()));
        }

        public void configure(JobConf jobConf) {
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((IntWritable) obj, (Text) obj2, (OutputCollector<IntWritable, Text>) outputCollector, reporter);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/hadoopcompatibility/mapred/HadoopMapFunctionITCase$NonPassingMapper.class */
    public static class NonPassingMapper implements Mapper<IntWritable, Text, IntWritable, Text> {
        public void map(IntWritable intWritable, Text text, OutputCollector<IntWritable, Text> outputCollector, Reporter reporter) throws IOException {
            if (text.toString().contains("bananas")) {
                outputCollector.collect(intWritable, text);
            }
        }

        public void configure(JobConf jobConf) {
        }

        public void close() throws IOException {
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Object obj2, OutputCollector outputCollector, Reporter reporter) throws IOException {
            map((IntWritable) obj, (Text) obj2, (OutputCollector<IntWritable, Text>) outputCollector, reporter);
        }
    }

    public HadoopMapFunctionITCase(MultipleProgramsTestBase.TestExecutionMode testExecutionMode) {
        super(testExecutionMode);
        this.tempFolder = new TemporaryFolder();
    }

    @Test
    public void testNonPassingMapper() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        FlatMapOperator flatMap = HadoopTestData.getKVPairDataSet(executionEnvironment).flatMap(new HadoopMapFunction(new NonPassingMapper()));
        String uri = this.tempFolder.newFile().toURI().toString();
        flatMap.writeAsText(uri, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("\n", uri);
    }

    @Test
    public void testDataDuplicatingMapper() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        FlatMapOperator flatMap = HadoopTestData.getKVPairDataSet(executionEnvironment).flatMap(new HadoopMapFunction(new DuplicatingMapper()));
        String uri = this.tempFolder.newFile().toURI().toString();
        flatMap.writeAsText(uri, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("(1,Hi)\n(1,HI)\n(2,Hello)\n(2,HELLO)\n(3,Hello world)\n(3,HELLO WORLD)\n(4,Hello world, how are you?)\n(4,HELLO WORLD, HOW ARE YOU?)\n(5,I am fine.)\n(5,I AM FINE.)\n(6,Luke Skywalker)\n(6,LUKE SKYWALKER)\n(7,Comment#1)\n(7,COMMENT#1)\n(8,Comment#2)\n(8,COMMENT#2)\n(9,Comment#3)\n(9,COMMENT#3)\n(10,Comment#4)\n(10,COMMENT#4)\n(11,Comment#5)\n(11,COMMENT#5)\n(12,Comment#6)\n(12,COMMENT#6)\n(13,Comment#7)\n(13,COMMENT#7)\n(14,Comment#8)\n(14,COMMENT#8)\n(15,Comment#9)\n(15,COMMENT#9)\n(16,Comment#10)\n(16,COMMENT#10)\n(17,Comment#11)\n(17,COMMENT#11)\n(18,Comment#12)\n(18,COMMENT#12)\n(19,Comment#13)\n(19,COMMENT#13)\n(20,Comment#14)\n(20,COMMENT#14)\n(21,Comment#15)\n(21,COMMENT#15)\n", uri);
    }

    @Test
    public void testConfigurableMapper() throws Exception {
        ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
        JobConf jobConf = new JobConf();
        jobConf.set("my.filterPrefix", "Hello");
        FlatMapOperator flatMap = HadoopTestData.getKVPairDataSet(executionEnvironment).flatMap(new HadoopMapFunction(new ConfigurableMapper(), jobConf));
        String uri = this.tempFolder.newFile().toURI().toString();
        flatMap.writeAsText(uri, FileSystem.WriteMode.OVERWRITE);
        executionEnvironment.execute();
        TestBaseUtils.compareResultsByLinesInMemory("(2,Hello)\n(3,Hello world)\n(4,Hello world, how are you?)\n", uri);
    }
}
