/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.util.List;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.assertj.core.api.Assertions;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class ReactiveModeITCase
extends TestLogger {
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int INITIAL_NUMBER_TASK_MANAGERS = 1;
    private static final Configuration configuration = ReactiveModeITCase.getReactiveModeConfiguration();
    private static final ObjectMapper OBJECT_MAPPER = JacksonMapperFactory.createObjectMapper();
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    @Rule
    public final MiniClusterWithClientResource miniClusterResource = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(1).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getReactiveModeConfiguration() {
        Configuration conf = new Configuration();
        conf.set(JobManagerOptions.SCHEDULER_MODE, (Object)SchedulerExecutionMode.REACTIVE);
        return conf;
    }

    @Test
    public void testScaleLimitByMaxParallelism() throws Exception {
        this.startAdditionalTaskManager();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        SingleOutputStreamOperator input = env.addSource((SourceFunction)new FailOnParallelExecutionSource()).setMaxParallelism(1);
        input.sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 1);
    }

    @Test
    public void testScaleUpOnAdditionalTaskManager() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.addSource((SourceFunction)new DummySource());
        input.sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 2);
        this.miniClusterResource.getMiniCluster().startTaskManager();
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 4);
    }

    @Test
    public void testJsonPlanParallelismAfterRescale() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource input = env.addSource((SourceFunction)new DummySource());
        input.sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        int initialParallelism = 2;
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), initialParallelism);
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph)this.miniClusterResource.getMiniCluster().getArchivedExecutionGraph(jobClient.getJobID()).get();
        Assertions.assertThat((List)OBJECT_MAPPER.readTree(archivedExecutionGraph.getJsonPlan()).findValues("parallelism")).allMatch(n -> n.asInt() == initialParallelism);
        this.miniClusterResource.getMiniCluster().startTaskManager();
        int rescaledParallelism = 4;
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), rescaledParallelism);
        archivedExecutionGraph = (ArchivedExecutionGraph)this.miniClusterResource.getMiniCluster().getArchivedExecutionGraph(jobClient.getJobID()).get();
        Assertions.assertThat((List)OBJECT_MAPPER.readTree(archivedExecutionGraph.getJsonPlan()).findValues("parallelism")).allMatch(n -> n.asInt() == rescaledParallelism);
    }

    @Test
    public void testScaleDownOnTaskManagerLoss() throws Exception {
        this.startAdditionalTaskManager();
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)1, (long)0L);
        DataStreamSource input = env.addSource((SourceFunction)new DummySource());
        input.sinkTo((Sink)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 4);
        this.miniClusterResource.getMiniCluster().terminateTaskManager(0).get();
        ReactiveModeITCase.waitUntilParallelismForVertexReached(this.miniClusterResource.getRestClusterClient(), jobClient.getJobID(), 4);
    }

    private int getNumberOfConnectedTaskManagers() throws ExecutionException, InterruptedException {
        return ((ClusterOverview)this.miniClusterResource.getMiniCluster().requestClusterOverview().get()).getNumTaskManagersConnected();
    }

    private void startAdditionalTaskManager() throws Exception {
        this.miniClusterResource.getMiniCluster().startTaskManager();
        CommonTestUtils.waitUntilCondition(() -> this.getNumberOfConnectedTaskManagers() == 2);
    }

    public static void waitUntilParallelismForVertexReached(RestClusterClient<?> restClusterClient, JobID jobId, int targetParallelism) throws Exception {
        CommonTestUtils.waitUntilCondition(() -> {
            JobDetailsInfo detailsInfo = (JobDetailsInfo)restClusterClient.getJobDetails(jobId).get();
            for (JobDetailsInfo.JobVertexDetailsInfo jobVertexInfo : detailsInfo.getJobVertexInfos()) {
                if (!jobVertexInfo.getName().contains("Source:") || jobVertexInfo.getParallelism() != targetParallelism) continue;
                return true;
            }
            return false;
        });
    }

    private static class FailOnParallelExecutionSource
    extends RichParallelSourceFunction<String> {
        private volatile boolean running = true;

        private FailOnParallelExecutionSource() {
        }

        public void open(OpenContext openContext) throws Exception {
            if (this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks() > 1) {
                throw new IllegalStateException("This is not supposed to be executed in parallel, despite extending the right base class.");
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)"test");
                }
                Thread.sleep(100L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }

    private static class DummySource
    implements SourceFunction<String> {
        private volatile boolean running = true;

        private DummySource() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<String> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)"test");
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.running = false;
        }
    }
}

