package org.apache.flink.test.iterative.nephele;

import java.util.Collection;
import java.util.Iterator;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.operators.util.UserCodeClassWrapper;
import org.apache.flink.api.common.typeutils.record.RecordComparatorFactory;
import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
import org.apache.flink.api.java.record.functions.MapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.channels.ChannelType;
import org.apache.flink.runtime.iterative.task.IterationHeadPactTask;
import org.apache.flink.runtime.iterative.task.IterationTailPactTask;
import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.OutputFormatVertex;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.CollectorMapDriver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.GroupReduceDriver;
import org.apache.flink.runtime.operators.chaining.ChainedCollectorMapDriver;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.test.recordJobs.kmeans.udfs.CoordVector;
import org.apache.flink.test.recordJobs.kmeans.udfs.PointInFormat;
import org.apache.flink.test.recordJobs.kmeans.udfs.PointOutFormat;
import org.apache.flink.test.util.RecordAPITestBase;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.Record;
import org.apache.flink.util.Collector;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase.class */
public class IterationWithChainingNepheleITCase extends RecordAPITestBase {
    private static final String INPUT_STRING = "0|%d.25|\n1|%d.25|\n";
    private String dataPath;
    private String resultPath;

    /* loaded from: input_file:org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase$DummyMapper.class */
    public static final class DummyMapper extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) {
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase$DummyReducer.class */
    public static final class DummyReducer implements GroupReduceFunction<Record, Record> {
        private static final long serialVersionUID = 1;

        public void reduce(Iterable<Record> iterable, Collector<Record> collector) {
            Iterator<Record> it = iterable.iterator();
            while (it.hasNext()) {
                collector.collect(it.next());
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/test/iterative/nephele/IterationWithChainingNepheleITCase$IncrementCoordinatesMapper.class */
    public static final class IncrementCoordinatesMapper extends MapFunction {
        private static final long serialVersionUID = 1;

        public void map(Record record, Collector<Record> collector) {
            CoordVector field = record.getField(1, CoordVector.class);
            double[] coordinates = field.getCoordinates();
            for (int i = 0; i < coordinates.length; i++) {
                int i2 = i;
                coordinates[i2] = coordinates[i2] + 1.0d;
            }
            record.setField(1, field);
            collector.collect(record);
        }

        public /* bridge */ /* synthetic */ void map(Object obj, Collector collector) throws Exception {
            map((Record) obj, (Collector<Record>) collector);
        }
    }

    public IterationWithChainingNepheleITCase(Configuration configuration) {
        super(configuration);
        setTaskManagerNumSlots(4);
    }

    protected void preSubmit() throws Exception {
        this.dataPath = createTempFile("data_points.txt", String.format(INPUT_STRING, 1, 2));
        this.resultPath = getTempFilePath("result");
    }

    protected void postSubmit() throws Exception {
        int integer = this.config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1);
        compareResultsByLinesInMemory(String.format(INPUT_STRING, Integer.valueOf(1 + integer), Integer.valueOf(2 + integer)), this.resultPath);
    }

    @Parameterized.Parameters
    public static Collection<Object[]> getConfigurations() {
        Configuration configuration = new Configuration();
        configuration.setInteger("ChainedMapperNepheleITCase#NoSubtasks", 4);
        configuration.setInteger("ChainedMapperNepheleITCase#MaxIterations", 2);
        return toParameterList(new Configuration[]{configuration});
    }

    protected JobGraph getJobGraph() throws Exception {
        return getTestJobGraph(this.dataPath, this.resultPath, this.config.getInteger("ChainedMapperNepheleITCase#NoSubtasks", 1), this.config.getInteger("ChainedMapperNepheleITCase#MaxIterations", 1));
    }

    private JobGraph getTestJobGraph(String str, String str2, int i, int i2) {
        JobGraph jobGraph = new JobGraph("Iteration Tail with Chaining");
        RecordSerializerFactory recordSerializerFactory = RecordSerializerFactory.get();
        RecordComparatorFactory recordComparatorFactory = new RecordComparatorFactory(new int[]{0}, new Class[]{IntValue.class});
        InputFormatVertex createInput = JobGraphUtils.createInput(new PointInFormat(), str, "Input", jobGraph, i);
        TaskConfig taskConfig = new TaskConfig(createInput.getConfiguration());
        taskConfig.setOutputSerializer(recordSerializerFactory);
        taskConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);
        AbstractJobVertex createTask = JobGraphUtils.createTask(IterationHeadPactTask.class, "Iteration Head", jobGraph, i);
        TaskConfig taskConfig2 = new TaskConfig(createTask.getConfiguration());
        taskConfig2.setIterationId(1);
        taskConfig2.addInputToGroup(0);
        taskConfig2.setInputSerializer(recordSerializerFactory, 0);
        taskConfig2.setInputLocalStrategy(0, LocalStrategy.NONE);
        taskConfig2.setIterationHeadPartialSolutionOrWorksetInputIndex(0);
        taskConfig2.setOutputSerializer(recordSerializerFactory);
        taskConfig2.addOutputShipStrategy(ShipStrategyType.PARTITION_HASH);
        taskConfig2.setOutputComparator(recordComparatorFactory, 0);
        TaskConfig taskConfig3 = new TaskConfig(new Configuration());
        taskConfig3.setOutputSerializer(recordSerializerFactory);
        taskConfig3.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig2.setIterationHeadFinalOutputConfig(taskConfig3);
        taskConfig2.setIterationHeadIndexOfSyncOutput(2);
        taskConfig2.setDriver(CollectorMapDriver.class);
        taskConfig2.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig2.setStubWrapper(new UserCodeClassWrapper(DummyMapper.class));
        taskConfig2.setRelativeBackChannelMemory(1.0d);
        AbstractJobVertex createTask2 = JobGraphUtils.createTask(IterationTailPactTask.class, "Chained Iteration Tail", jobGraph, i);
        TaskConfig taskConfig4 = new TaskConfig(createTask2.getConfiguration());
        taskConfig4.setIterationId(1);
        taskConfig4.addInputToGroup(0);
        taskConfig4.setInputSerializer(recordSerializerFactory, 0);
        taskConfig4.addOutputShipStrategy(ShipStrategyType.FORWARD);
        taskConfig4.setOutputSerializer(recordSerializerFactory);
        taskConfig4.setDriver(GroupReduceDriver.class);
        taskConfig4.setDriverStrategy(DriverStrategy.SORTED_GROUP_REDUCE);
        taskConfig4.setDriverComparator(recordComparatorFactory, 0);
        taskConfig4.setStubWrapper(new UserCodeClassWrapper(DummyReducer.class));
        TaskConfig taskConfig5 = new TaskConfig(new Configuration());
        taskConfig5.setDriverStrategy(DriverStrategy.COLLECTOR_MAP);
        taskConfig5.setStubWrapper(new UserCodeClassWrapper(IncrementCoordinatesMapper.class));
        taskConfig5.setInputLocalStrategy(0, LocalStrategy.NONE);
        taskConfig5.setInputSerializer(recordSerializerFactory, 0);
        taskConfig5.setOutputSerializer(recordSerializerFactory);
        taskConfig5.setIsWorksetUpdate();
        taskConfig4.addChainedTask(ChainedCollectorMapDriver.class, taskConfig5, "Chained ID Mapper");
        OutputFormatVertex createFileOutput = JobGraphUtils.createFileOutput(jobGraph, "Output", i);
        TaskConfig taskConfig6 = new TaskConfig(createFileOutput.getConfiguration());
        taskConfig6.addInputToGroup(0);
        taskConfig6.setInputSerializer(recordSerializerFactory, 0);
        taskConfig6.setStubWrapper(new UserCodeClassWrapper(PointOutFormat.class));
        taskConfig6.setStubParameter("flink.output.file", str2);
        AbstractJobVertex createSync = JobGraphUtils.createSync(jobGraph, i);
        TaskConfig taskConfig7 = new TaskConfig(createSync.getConfiguration());
        taskConfig7.setNumberOfIterations(i2);
        taskConfig7.setIterationId(1);
        JobGraphUtils.connect(createInput, createTask, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask, createTask2, ChannelType.IN_MEMORY, DistributionPattern.BIPARTITE);
        taskConfig4.setGateIterativeWithNumberOfEventsUntilInterrupt(0, i);
        JobGraphUtils.connect(createTask, createFileOutput, ChannelType.IN_MEMORY, DistributionPattern.POINTWISE);
        JobGraphUtils.connect(createTask, createSync, ChannelType.NETWORK, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        createInput.setSlotSharingGroup(slotSharingGroup);
        createTask.setSlotSharingGroup(slotSharingGroup);
        createTask2.setSlotSharingGroup(slotSharingGroup);
        createFileOutput.setSlotSharingGroup(slotSharingGroup);
        createSync.setSlotSharingGroup(slotSharingGroup);
        createTask2.setStrictlyCoLocatedWith(createTask);
        return jobGraph;
    }
}
