/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.scheduling;

import java.io.File;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HeartbeatManagerOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.core.execution.CheckpointingMode;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobExceptionsHeaders;
import org.apache.flink.runtime.rest.messages.JobExceptionsInfoWithHistory;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.job.JobExceptionsMessageParameters;
import org.apache.flink.runtime.scheduler.stopwithsavepoint.StopWithSavepointStoppingException;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.legacy.DiscardingSink;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;
import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.assertj.core.api.AbstractStringAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class AdaptiveSchedulerITCase
extends TestLogger {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static final int NUMBER_TASK_MANAGERS = 2;
    private static final int NUMBER_SLOTS_PER_TASK_MANAGER = 2;
    private static final int PARALLELISM = 4;
    private static final Configuration configuration = AdaptiveSchedulerITCase.getConfiguration();
    @ClassRule
    public static final MiniClusterWithClientResource MINI_CLUSTER_WITH_CLIENT_RESOURCE = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(configuration).setNumberTaskManagers(2).setNumberSlotsPerTaskManager(2).build());

    private static Configuration getConfiguration() {
        Configuration conf = new Configuration();
        conf.set(JobManagerOptions.SCHEDULER, (Object)JobManagerOptions.SchedulerType.Adaptive);
        conf.set(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, (Object)Duration.ofMillis(1000L));
        conf.set(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT, (Object)Duration.ofMillis(5000L));
        return conf;
    }

    @Before
    public void ensureAdaptiveSchedulerEnabled() {
        Assume.assumeTrue((boolean)ClusterOptions.isAdaptiveSchedulerEnabled((Configuration)configuration));
    }

    @After
    public void cancelRunningJobs() {
        MINI_CLUSTER_WITH_CLIENT_RESOURCE.cancelAllJobsAndWaitUntilSlotsAreFreed();
    }

    @Test
    public void testGlobalFailoverCanRecoverState() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
        DataStreamSource input = env.addSource((SourceFunction)new SimpleSource());
        input.addSink((SinkFunction)new DiscardingSink());
        env.execute();
    }

    @Test
    public void testStopWithSavepointNoError() throws Exception {
        StreamExecutionEnvironment env = AdaptiveSchedulerITCase.getEnvWithSource(StopWithSavepointTestBehavior.NO_FAILURE);
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        File savepointDirectory = this.tempFolder.newFolder("savepoint");
        String savepoint = (String)client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
        Assert.assertThat((Object)savepoint, (Matcher)CoreMatchers.containsString((String)savepointDirectory.getAbsolutePath()));
        Assert.assertThat((Object)((JobStatus)client.getJobStatus().get()), (Matcher)CoreMatchers.is((Object)JobStatus.FINISHED));
    }

    @Test
    public void testStopWithSavepointFailOnCheckpoint() throws Exception {
        StreamExecutionEnvironment env = AdaptiveSchedulerITCase.getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)Integer.MAX_VALUE, (long)0L);
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        try {
            client.stopWithSavepoint(false, this.tempFolder.newFolder("savepoint").getAbsolutePath(), SavepointFormatType.CANONICAL).get();
            Assert.fail((String)"Expect exception");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
        }
        CommonTestUtils.waitUntilCondition(() -> client.getJobStatus().get() == JobStatus.RUNNING);
    }

    @Test
    public void testStopWithSavepointFailOnStop() throws Throwable {
        StreamExecutionEnvironment env = AdaptiveSchedulerITCase.getEnvWithSource(StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT_COMPLETE);
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)Integer.MAX_VALUE, (long)0L);
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        CompletableFuture savepointCompleted = client.stopWithSavepoint(false, this.tempFolder.newFolder("savepoint").getAbsolutePath(), SavepointFormatType.CANONICAL);
        Throwable savepointException = ((ExecutionException)Assert.assertThrows(ExecutionException.class, savepointCompleted::get)).getCause();
        ExceptionUtils.assertThrowable((Throwable)savepointException, throwable -> throwable instanceof StopWithSavepointStoppingException && throwable.getMessage().startsWith("A savepoint has been created at: "));
        Assert.assertThat((Object)((JobStatus)client.getJobStatus().get()), (Matcher)CoreMatchers.either((Matcher)CoreMatchers.is((Object)JobStatus.FAILED)).or(CoreMatchers.is((Object)JobStatus.FAILING)));
    }

    @Test
    public void testStopWithSavepointFailOnFirstSavepointSucceedOnSecond() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        RestartStrategyUtils.configureFixedDelayRestartStrategy((StreamExecutionEnvironment)env, (int)1, (long)0L);
        env.setParallelism(4);
        env.addSource((SourceFunction)new DummySource(StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY)).addSink((SinkFunction)new DiscardingSink());
        DummySource.resetForParallelism(4);
        JobClient client = env.executeAsync();
        DummySource.awaitRunning();
        DummySource.resetForParallelism(4);
        File savepointDirectory = this.tempFolder.newFolder("savepoint");
        try {
            client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
            Assert.fail((String)"Expect failure of operation");
        }
        catch (ExecutionException e) {
            Assert.assertThat((Object)e, (Matcher)FlinkMatchers.containsCause(FlinkException.class));
        }
        DummySource.awaitRunning();
        CommonTestUtils.waitUntilCondition(() -> this.isDirectoryEmpty(savepointDirectory));
        String savepoint = (String)client.stopWithSavepoint(false, savepointDirectory.getAbsolutePath(), SavepointFormatType.CANONICAL).get();
        Assert.assertThat((Object)savepoint, (Matcher)CoreMatchers.containsString((String)savepointDirectory.getAbsolutePath()));
    }

    @Test
    public void testExceptionHistoryIsRetrievableFromTheRestAPI() throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.enableCheckpointing(20L, CheckpointingMode.EXACTLY_ONCE);
        env.addSource((SourceFunction)new FailOnCompletedCheckpointSource()).addSink((SinkFunction)new DiscardingSink());
        JobClient jobClient = env.executeAsync();
        CommonTestUtils.waitUntilCondition(() -> {
            List exceptions = AdaptiveSchedulerITCase.getJobExceptions(jobClient.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE).get().getExceptionHistory().getEntries();
            return !exceptions.isEmpty();
        });
        jobClient.cancel().get();
        CommonTestUtils.waitForJobStatus((JobClient)jobClient, Collections.singletonList(JobStatus.CANCELED));
    }

    @Test
    public void testGlobalFailureOnRestart() throws Exception {
        MiniCluster miniCluster = MINI_CLUSTER_WITH_CLIENT_RESOURCE.getMiniCluster();
        JobVertexID jobVertexId = new JobVertexID();
        JobVertex jobVertex = new JobVertex("jobVertex", jobVertexId);
        jobVertex.setInvokableClass(FailingInvokable.class);
        jobVertex.addOperatorCoordinator(new SerializedValue((Object)new FailingCoordinatorProvider(OperatorID.fromJobVertexID((JobVertexID)jobVertexId))));
        jobVertex.setParallelism(1);
        JobGraph jobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().addJobVertices(Collections.singletonList(jobVertex)).build();
        RestartStrategyUtils.configureFixedDelayRestartStrategy((JobGraph)jobGraph, (int)1, (Duration)Duration.ofHours(1L));
        miniCluster.submitJob(jobGraph).join();
        CommonTestUtils.waitUntilCondition(() -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == JobStatus.RESTARTING);
        FailingCoordinatorProvider.JOB_RESTARTING.countDown();
        FlinkAssertions.assertThatFuture(AdaptiveSchedulerITCase.getJobExceptions(jobGraph.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE)).eventuallySucceeds();
        miniCluster.cancelJob(jobGraph.getJobID());
        CommonTestUtils.waitUntilCondition(() -> miniCluster.getJobStatus(jobGraph.getJobID()).join() == JobStatus.CANCELED);
        JobExceptionsInfoWithHistory jobExceptions = AdaptiveSchedulerITCase.getJobExceptions(jobGraph.getJobID(), MINI_CLUSTER_WITH_CLIENT_RESOURCE).get();
        ((ListAssert)((ListAssert)Assertions.assertThat((List)jobExceptions.getExceptionHistory().getEntries()).hasSize(1)).allSatisfy(rootExceptionInfo -> ((AbstractStringAssert)Assertions.assertThat((String)rootExceptionInfo.getStacktrace()).contains(new CharSequence[]{"Local exception."})).doesNotContain(new CharSequence[]{"Global exception."}))).allSatisfy(rootExceptionInfo -> Assertions.assertThat((Collection)rootExceptionInfo.getConcurrentExceptions()).anySatisfy(exceptionInfo -> Assertions.assertThat((String)exceptionInfo.getStacktrace()).contains(new CharSequence[]{"Global exception."})));
    }

    private boolean isDirectoryEmpty(File directory) {
        File[] files = directory.listFiles();
        if (files.length > 0) {
            this.log.warn("There are still unexpected files: {}", (Object)Arrays.stream(files).map(File::getAbsolutePath).collect(Collectors.joining(", ")));
            return false;
        }
        return true;
    }

    private static StreamExecutionEnvironment getEnvWithSource(StopWithSavepointTestBehavior behavior) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);
        env.addSource((SourceFunction)new DummySource(behavior)).addSink((SinkFunction)new DiscardingSink());
        return env;
    }

    private static CompletableFuture<JobExceptionsInfoWithHistory> getJobExceptions(JobID jobId, MiniClusterWithClientResource minClusterRes) throws Exception {
        RestClusterClient restClusterClient = minClusterRes.getRestClusterClient();
        JobExceptionsMessageParameters params = new JobExceptionsMessageParameters();
        params.jobPathParameter.resolve((Object)jobId);
        return restClusterClient.sendRequest((MessageHeaders)JobExceptionsHeaders.getInstance(), (MessageParameters)params, (RequestBody)EmptyRequestBody.getInstance());
    }

    public static final class FailOnCompletedCheckpointSource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointListener {
        private volatile boolean running = true;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
                    Thread.sleep(5L);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            throw new RuntimeException("Test exception.");
        }
    }

    public static final class SimpleSource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointListener,
    CheckpointedFunction {
        private static final ListStateDescriptor<Boolean> unionStateListDescriptor = new ListStateDescriptor("state", Boolean.class);
        private volatile boolean running = true;
        @Nullable
        private ListState<Boolean> unionListState = null;
        private boolean hasFailedBefore = false;
        private boolean fail = false;

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            while (this.running && !this.hasFailedBefore) {
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)this.getRuntimeContext().getTaskInfo().getIndexOfThisSubtask());
                    Thread.sleep(5L);
                }
                if (!this.fail) continue;
                throw new FlinkException("Test failure.");
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            this.fail = true;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
            this.unionListState = context.getOperatorStateStore().getUnionListState(unionStateListDescriptor);
            for (Boolean previousState : (Iterable)this.unionListState.get()) {
                this.hasFailedBefore |= previousState.booleanValue();
            }
            this.unionListState.update(Collections.singletonList(true));
        }
    }

    private static final class DummySource
    extends RichParallelSourceFunction<Integer>
    implements CheckpointedFunction,
    CheckpointListener {
        private final StopWithSavepointTestBehavior behavior;
        private volatile boolean running = true;
        private static volatile CountDownLatch instancesRunning;

        public DummySource(StopWithSavepointTestBehavior behavior) {
            this.behavior = behavior;
        }

        private static void resetForParallelism(int para) {
            instancesRunning = new CountDownLatch(para);
        }

        private static void awaitRunning() throws InterruptedException {
            Preconditions.checkNotNull((Object)instancesRunning);
            instancesRunning.await();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void run(SourceFunction.SourceContext<Integer> ctx) throws Exception {
            Preconditions.checkNotNull((Object)instancesRunning);
            instancesRunning.countDown();
            int i = Integer.MIN_VALUE;
            while (this.running) {
                Thread.sleep(10L);
                Object object = ctx.getCheckpointLock();
                synchronized (object) {
                    ctx.collect((Object)i++);
                }
            }
        }

        public void cancel() {
            this.running = false;
        }

        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT) {
                throw new RuntimeException(this.behavior.name());
            }
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_FIRST_CHECKPOINT_ONLY && context.getCheckpointId() == 1L) {
                throw new RuntimeException(this.behavior.name());
            }
        }

        public void initializeState(FunctionInitializationContext context) throws Exception {
        }

        public void notifyCheckpointComplete(long checkpointId) throws Exception {
            if (this.behavior == StopWithSavepointTestBehavior.FAIL_ON_CHECKPOINT_COMPLETE) {
                throw new RuntimeException(this.behavior.name());
            }
        }
    }

    private static class FailingCoordinatorProvider
    implements OperatorCoordinator.Provider {
        private static final CountDownLatch JOB_RESTARTING = new CountDownLatch(1);
        private final OperatorID operatorId;
        private static final String globalExceptionMsg = "Global exception.";

        FailingCoordinatorProvider(OperatorID operatorId) {
            this.operatorId = operatorId;
        }

        public OperatorID getOperatorId() {
            return this.operatorId;
        }

        public OperatorCoordinator create(final OperatorCoordinator.Context context) {
            return new OperatorCoordinator(){
                @Nullable
                private Thread thread;

                public void start() {
                    this.thread = new Thread(() -> {
                        try {
                            JOB_RESTARTING.await();
                            context.failJob((Throwable)new Exception(FailingCoordinatorProvider.globalExceptionMsg));
                        }
                        catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                        }
                    });
                    this.thread.setName(AdaptiveSchedulerITCase.class + "_failing-coordinator");
                    this.thread.setDaemon(true);
                    this.thread.start();
                }

                public void close() throws Exception {
                    if (this.thread != null) {
                        this.thread.interrupt();
                        this.thread.join();
                    }
                }

                public void handleEventFromOperator(int subtask, int attemptNumber, OperatorEvent event) {
                }

                public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> resultFuture) {
                }

                public void notifyCheckpointComplete(long checkpointId) {
                }

                public void resetToCheckpoint(long checkpointId, @Nullable byte[] checkpointData) {
                }

                public void subtaskReset(int subtask, long checkpointId) {
                }

                public void executionAttemptFailed(int subtask, int attemptNumber, @Nullable Throwable reason) {
                }

                public void executionAttemptReady(int subtask, int attemptNumber, OperatorCoordinator.SubtaskGateway gateway) {
                }
            };
        }
    }

    public static class FailingInvokable
    extends AbstractInvokable {
        private static final String localExceptionMsg = "Local exception.";

        public FailingInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            throw new Exception(localExceptionMsg);
        }
    }

    private static enum StopWithSavepointTestBehavior {
        NO_FAILURE,
        FAIL_ON_CHECKPOINT,
        FAIL_ON_CHECKPOINT_COMPLETE,
        FAIL_ON_FIRST_CHECKPOINT_ONLY;

    }
}

