package org.apache.flink.runtime.operators.chaining;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import org.apache.flink.api.common.functions.GroupCombineFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.operators.BatchTask;
import org.apache.flink.runtime.operators.DataSourceTask;
import org.apache.flink.runtime.operators.DataSourceTaskTest;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.FlatMapDriver;
import org.apache.flink.runtime.operators.FlatMapTaskTest;
import org.apache.flink.runtime.operators.ReduceTaskTest;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.testutils.TaskTestBase;
import org.apache.flink.runtime.operators.testutils.UniformRecordGenerator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.runtime.testutils.recordutils.RecordComparatorFactory;
import org.apache.flink.runtime.testutils.recordutils.RecordSerializerFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/operators/chaining/ChainTaskTest.class */
public class ChainTaskTest extends TaskTestBase {
    private static final int MEMORY_MANAGER_SIZE = 3145728;
    private static final int NETWORK_BUFFER_SIZE = 1024;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private final List<Record> outList = new ArrayList();
    private final RecordComparatorFactory compFact = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class}, new boolean[]{true});
    private final RecordSerializerFactory serFact = RecordSerializerFactory.get();

    /* loaded from: input_file:org/apache/flink/runtime/operators/chaining/ChainTaskTest$MockDuplicateLastValueMapFunction.class */
    public static class MockDuplicateLastValueMapFunction<T> extends RichFlatMapFunction<T, T> {
        private boolean closed = false;
        private transient T value;
        private transient Collector<T> out;

        public void flatMap(T t, Collector<T> collector) throws Exception {
            if (this.closed) {
                throw new IllegalStateException("Task is already closed.");
            }
            this.value = t;
            this.out = collector;
            collector.collect(t);
        }

        public void close() throws Exception {
            this.closed = true;
            this.out.collect(this.value);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/operators/chaining/ChainTaskTest$MockFailingCombineStub.class */
    public static final class MockFailingCombineStub implements GroupReduceFunction<Record, Record>, GroupCombineFunction<Record, Record> {
        private static final long serialVersionUID = 1;
        private int cnt = 0;

        public void reduce(Iterable<Record> iterable, Collector<Record> collector) throws Exception {
            int i = this.cnt + 1;
            this.cnt = i;
            if (i >= 5) {
                throw new RuntimeException("Expected Test Exception");
            }
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }

        public void combine(Iterable<Record> iterable, Collector<Record> collector) throws Exception {
            reduce(iterable, collector);
        }
    }

    @Test
    public void testMapTask() {
        try {
            initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
            addInput(new UniformRecordGenerator(100, 20, false), 0);
            addOutput(this.outList);
            TaskConfig taskConfig = new TaskConfig(new Configuration());
            taskConfig.addInputToGroup(0);
            taskConfig.setInputSerializer(this.serFact, 0);
            taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
            taskConfig.setOutputSerializer(this.serFact);
            taskConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
            taskConfig.setDriverComparator(this.compFact, 0);
            taskConfig.setDriverComparator(this.compFact, 1);
            taskConfig.setRelativeMemoryDriver(1.0d);
            taskConfig.setStubWrapper(new UserCodeClassWrapper(ReduceTaskTest.MockCombiningReduceStub.class));
            getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, taskConfig, "combine");
            registerTask(FlatMapDriver.class, FlatMapTaskTest.MockMapStub.class);
            try {
                new BatchTask(this.mockEnv).invoke();
            } catch (Exception e) {
                e.printStackTrace();
                Assert.fail("Invoke method caused exception.");
            }
            Assert.assertEquals(100L, this.outList.size());
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testFailingMapTask() {
        try {
            initEnvironment(3145728L, 1038336);
            addInput(new UniformRecordGenerator(100, 20, false), 0);
            addOutput(this.outList);
            TaskConfig taskConfig = new TaskConfig(new Configuration());
            taskConfig.addInputToGroup(0);
            taskConfig.setInputSerializer(this.serFact, 0);
            taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
            taskConfig.setOutputSerializer(this.serFact);
            taskConfig.setDriverStrategy(DriverStrategy.SORTED_GROUP_COMBINE);
            taskConfig.setDriverComparator(this.compFact, 0);
            taskConfig.setDriverComparator(this.compFact, 1);
            taskConfig.setRelativeMemoryDriver(1.0d);
            taskConfig.setStubWrapper(new UserCodeClassWrapper(MockFailingCombineStub.class));
            getTaskConfig().addChainedTask(SynchronousChainedCombineDriver.class, taskConfig, "combine");
            registerTask(FlatMapDriver.class, FlatMapTaskTest.MockMapStub.class);
            boolean z = false;
            try {
                new BatchTask(this.mockEnv).invoke();
            } catch (Exception e) {
                z = true;
            }
            Assert.assertTrue("Function exception was not forwarded.", z);
        } catch (Exception e2) {
            e2.printStackTrace();
            Assert.fail(e2.getMessage());
        }
    }

    @Test
    public void testBatchTaskOutputInCloseMethod() {
        try {
            initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
            addInput(new UniformRecordGenerator(100, 10, false), 0);
            addOutput(this.outList);
            registerTask(FlatMapDriver.class, FlatMapTaskTest.MockMapStub.class);
            for (int i = 0; i < 10; i++) {
                TaskConfig taskConfig = new TaskConfig(new Configuration());
                taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
                taskConfig.setOutputSerializer(this.serFact);
                taskConfig.setStubWrapper(new UserCodeClassWrapper(MockDuplicateLastValueMapFunction.class));
                getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, taskConfig, "chained-" + i);
            }
            new BatchTask(this.mockEnv).invoke();
            Assert.assertEquals(1010L, this.outList.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail(e.getMessage());
        }
    }

    @Test
    public void testDataSourceTaskOutputInCloseMethod() throws IOException {
        File file = new File(this.tempFolder.getRoot(), UUID.randomUUID().toString());
        DataSourceTaskTest.InputFilePreparator.prepareInputFile(new UniformRecordGenerator(100, 10, false), file, true);
        initEnvironment(3145728L, NETWORK_BUFFER_SIZE);
        addOutput(this.outList);
        DataSourceTask dataSourceTask = new DataSourceTask(this.mockEnv);
        registerFileInputTask(dataSourceTask, DataSourceTaskTest.MockInputFormat.class, file.toURI().toString(), "\n");
        for (int i = 0; i < 10; i++) {
            TaskConfig taskConfig = new TaskConfig(new Configuration());
            taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
            taskConfig.setOutputSerializer(this.serFact);
            taskConfig.setStubWrapper(new UserCodeClassWrapper(MockDuplicateLastValueMapFunction.class));
            getTaskConfig().addChainedTask(ChainedFlatMapDriver.class, taskConfig, "chained-" + i);
        }
        try {
            dataSourceTask.invoke();
            Assert.assertEquals(1010L, this.outList.size());
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Invoke method caused exception.");
        }
    }
}
