/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.javaApiOperators;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.flink.api.common.functions.MapPartitionFunction;
import org.apache.flink.api.common.functions.RichMapPartitionFunction;
import org.apache.flink.api.common.io.GenericInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.util.TestNonRichInputFormat;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.MapPartitionOperator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.GenericInputSplit;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.Collector;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

public class RemoteEnvironmentITCase {
    private static final int TM_SLOTS = 4;
    private static final int NUM_TM = 1;
    private static final int USER_DOP = 2;
    private static final String INVALID_STARTUP_TIMEOUT = "0.001 ms";
    private static final String VALID_STARTUP_TIMEOUT = "100 s";
    private static ForkableFlinkMiniCluster cluster;

    @BeforeClass
    public static void setupCluster() {
        try {
            Configuration config = new Configuration();
            config.setInteger("local.number-taskmanager", 1);
            config.setInteger("taskmanager.numberOfTaskSlots", 4);
            cluster = new ForkableFlinkMiniCluster(config, false);
            cluster.start();
        }
        catch (Exception e) {
            e.printStackTrace();
            Assert.fail((String)("Error starting test cluster: " + e.getMessage()));
        }
    }

    @AfterClass
    public static void tearDownCluster() {
        try {
            cluster.stop();
        }
        catch (Throwable t) {
            t.printStackTrace();
            Assert.fail((String)("Cluster shutdown caused an exception: " + t.getMessage()));
        }
    }

    @Test(expected=IllegalArgumentException.class)
    public void testInvalidAkkaConfiguration() throws Throwable {
        Configuration config = new Configuration();
        config.setString("akka.startup-timeout", INVALID_STARTUP_TIMEOUT);
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)cluster.hostname(), (int)cluster.getLeaderRPCPort(), (Configuration)config, (String[])new String[0]);
        env.getConfig().disableSysoutLogging();
        DataSource result = env.createInput((InputFormat)new TestNonRichInputFormat());
        result.output((OutputFormat)new LocalCollectionOutputFormat(new ArrayList()));
        try {
            env.execute();
            Assert.fail((String)"Program should not run successfully, cause of invalid akka settings.");
        }
        catch (IOException ex) {
            throw ex.getCause();
        }
    }

    @Test
    public void testUserSpecificParallelism() throws Exception {
        Configuration config = new Configuration();
        config.setString("akka.startup-timeout", VALID_STARTUP_TIMEOUT);
        ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment((String)cluster.hostname(), (int)cluster.getLeaderRPCPort(), (Configuration)config, (String[])new String[0]);
        env.setParallelism(2);
        env.getConfig().disableSysoutLogging();
        MapPartitionOperator result = env.createInput((InputFormat)new ParallelismDependentInputFormat()).rebalance().mapPartition((MapPartitionFunction)new RichMapPartitionFunction<Integer, Integer>(){

            public void mapPartition(Iterable<Integer> values, Collector<Integer> out) throws Exception {
                out.collect((Object)this.getRuntimeContext().getIndexOfThisSubtask());
            }
        });
        List resultCollection = result.collect();
        Assert.assertEquals((long)2L, (long)resultCollection.size());
    }

    private static class ParallelismDependentInputFormat
    extends GenericInputFormat<Integer> {
        private transient boolean emitted;

        private ParallelismDependentInputFormat() {
        }

        public GenericInputSplit[] createInputSplits(int numSplits) throws IOException {
            Assert.assertEquals((long)2L, (long)numSplits);
            return super.createInputSplits(numSplits);
        }

        public boolean reachedEnd() {
            return this.emitted;
        }

        public Integer nextRecord(Integer reuse) {
            if (this.emitted) {
                return null;
            }
            this.emitted = true;
            return 1;
        }
    }
}

