/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.misc;

import java.io.IOException;
import java.util.ArrayList;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class AutoParallelismITCase {
    private static final int NUM_TM = 2;
    private static final int SLOTS_PER_TM = 7;
    private static final int PARALLELISM = 14;
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void setupCluster() {
        Configuration config = new Configuration();
        config.setInteger("localinstancemanager.numtaskmanager", 2);
        config.setInteger("taskmanager.numberOfTaskSlots", 7);
        cluster = new ForkableFlinkMiniCluster(config, false);
    }

    @AfterClass
    public static void teardownCluster() {
        try {
            cluster.stop();
        }
        catch (Throwable t) {
            System.err.println("Error stopping cluster on shutdown");
            t.printStackTrace();
            Assert.fail((String)("Cluster shutdown caused an exception: " + t.getMessage()));
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testProgramWithAutoParallelism() {
        try {
            ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)"localhost", (int)cluster.getJobManagerRPCPort(), (String[])new String[0]);
            env.setParallelism(Integer.MAX_VALUE);
            env.getConfig().disableSysoutLogging();
            MapPartitionOperator result = env.createInput((InputFormat)new ParallelismDependentInputFormat()).rebalance().mapPartition((MapPartitionFunction)new ParallelismDependentMapPartition());
            ArrayList resultCollection = new ArrayList();
            result.output((OutputFormat)new LocalCollectionOutputFormat(resultCollection));
            env.execute();
            Assert.assertEquals((long)14L, (long)resultCollection.size());
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)e.getMessage());
        }
        finally {
            try {
                cluster.stop();
            }
            catch (Throwable throwable) {}
        }
    }

    private static class ParallelismDependentMapPartition
    extends RichMapPartitionFunction<Integer, Integer> {
        private ParallelismDependentMapPartition() {
        }

        public void mapPartition(Iterable<Integer> values, Collector<Integer> out) {
            out.collect((Object)this.getRuntimeContext().getIndexOfThisSubtask());
        }
    }

    private static class ParallelismDependentInputFormat
    extends GenericInputFormat<Integer> {
        private transient boolean emitted;

        private ParallelismDependentInputFormat() {
        }

        public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
            Assert.assertEquals((long)14L, (long)numSplits);
            return super.createInputSplits(numSplits);
        }

        public boolean reachedEnd() {
            return this.emitted;
        }

        public Integer nextRecord(Integer reuse) {
            if (this.emitted) {
                return null;
            }
            this.emitted = true;
            return 1;
        }
    }
}

