/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.net.URI;
import java.util.Collection;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.flink.FlinkJobInvoker;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.InferableFunction;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsIterableContaining;
import org.joda.time.Instant;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FlinkSavepointTest
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(FlinkSavepointTest.class);
    private static transient @UnknownKeyFor @NonNull @Initialized MiniCluster flinkCluster;
    private static volatile @UnknownKeyFor @NonNull @Initialized CountDownLatch oneShotLatch;
    @ClassRule
    public static transient @UnknownKeyFor @NonNull @Initialized TemporaryFolder tempFolder;
    @Rule
    public @UnknownKeyFor @NonNull @Initialized Timeout timeout = new Timeout(2L, TimeUnit.MINUTES);

    @BeforeClass
    public static void beforeClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT, 0);
        config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
        String savepointPath = "file://" + tempFolder.getRoot().getAbsolutePath();
        LOG.info("Savepoints will be written to {}", (Object)savepointPath);
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, savepointPath);
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointPath);
        MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder().setConfiguration(config).setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build();
        flinkCluster = new MiniCluster(clusterConfig);
        flinkCluster.start();
    }

    @AfterClass
    public static void afterClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        flinkCluster.close();
        flinkCluster = null;
    }

    @After
    public void afterTest() throws @UnknownKeyFor @NonNull @Initialized Exception {
        for (JobStatusMessage jobStatusMessage : (Collection)flinkCluster.listJobs().get()) {
            if (!jobStatusMessage.getJobState().name().equals("RUNNING")) continue;
            flinkCluster.cancelJob(jobStatusMessage.getJobId()).get();
        }
        this.ensureNoJobRunning();
    }

    @Test
    public void testSavepointRestoreLegacy() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.runSavepointAndRestore(false);
    }

    @Test
    public void testSavepointRestorePortable() throws @UnknownKeyFor @NonNull @Initialized Exception {
        this.runSavepointAndRestore(true);
    }

    private void runSavepointAndRestore(@UnknownKeyFor @NonNull @Initialized boolean isPortablePipeline) throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setStreaming(true);
        options.setParallelism(Integer.valueOf(2));
        options.setRunner(FlinkRunner.class);
        options.setShutdownSourcesAfterIdleMs(Long.valueOf(Long.MAX_VALUE));
        oneShotLatch = new CountDownLatch(1);
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        FlinkSavepointTest.createStreamingJob(pipeline, false, isPortablePipeline);
        JobID jobID = isPortablePipeline ? this.executePortable(pipeline) : this.executeLegacy(pipeline);
        oneShotLatch.await();
        String savepointDir = this.takeSavepointAndCancelJob(jobID);
        this.ensureNoJobRunning();
        oneShotLatch = new CountDownLatch(1);
        options.setParallelism(Integer.valueOf(4));
        pipeline = Pipeline.create((PipelineOptions)options);
        FlinkSavepointTest.createStreamingJob(pipeline, true, isPortablePipeline);
        if (isPortablePipeline) {
            this.restoreFromSavepointPortable(pipeline, savepointDir);
        } else {
            this.restoreFromSavepointLegacy(pipeline, savepointDir);
        }
        oneShotLatch.await();
    }

    private @UnknownKeyFor @NonNull @Initialized JobID executeLegacy(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) throws @UnknownKeyFor @NonNull @Initialized Exception {
        JobGraph jobGraph = this.getJobGraph(pipeline);
        flinkCluster.submitJob(jobGraph).get();
        return jobGraph.getJobID();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private @UnknownKeyFor @NonNull @Initialized JobID executePortable(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) throws @UnknownKeyFor @NonNull @Initialized Exception {
        ((PortablePipelineOptions)pipeline.getOptions().as(PortablePipelineOptions.class)).setDefaultEnvironmentType("EMBEDDED");
        ((FlinkPipelineOptions)pipeline.getOptions().as(FlinkPipelineOptions.class)).setFlinkMaster(this.getFlinkMaster());
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)pipeline);
        ListeningExecutorService executorService = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(1));
        FlinkPipelineOptions pipelineOptions = (FlinkPipelineOptions)pipeline.getOptions().as(FlinkPipelineOptions.class);
        try {
            JobInvocation jobInvocation = FlinkJobInvoker.create(null).createJobInvocation("id", "none", executorService, pipelineProto, pipelineOptions, (PortablePipelineRunner)new FlinkPipelineRunner(pipelineOptions, null, Collections.emptyList()));
            jobInvocation.start();
            JobID jobID = this.waitForJobToBeReady();
            return jobID;
        }
        finally {
            executorService.shutdown();
        }
    }

    private @UnknownKeyFor @NonNull @Initialized String getFlinkMaster() throws @UnknownKeyFor @NonNull @Initialized Exception {
        URI uri = (URI)flinkCluster.getRestAddress().get();
        return uri.getHost() + ":" + uri.getPort();
    }

    private void ensureNoJobRunning() throws @UnknownKeyFor @NonNull @Initialized Exception {
        while (!((Collection)flinkCluster.listJobs().get()).stream().allMatch(job -> job.getJobState().isTerminalState())) {
            Thread.sleep(50L);
        }
    }

    private @UnknownKeyFor @NonNull @Initialized JobID waitForJobToBeReady() throws @UnknownKeyFor @NonNull @Initialized InterruptedException, @UnknownKeyFor @NonNull @Initialized ExecutionException {
        JobStatusMessage jobStatus;
        while ((jobStatus = (JobStatusMessage)Iterables.getFirst((Iterable)((Iterable)flinkCluster.listJobs().get()), null)) == null || !jobStatus.getJobState().name().equals("RUNNING")) {
            Thread.sleep(100L);
        }
        return jobStatus.getJobId();
    }

    private @UnknownKeyFor @NonNull @Initialized String takeSavepointAndCancelJob(@UnknownKeyFor @NonNull @Initialized JobID jobID) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Exception exception = null;
        for (int i = 0; i < 10; ++i) {
            try {
                return (String)flinkCluster.triggerSavepoint(jobID, null, true).get();
            }
            catch (Exception e) {
                exception = e;
                Thread.sleep(100L);
                continue;
            }
        }
        throw exception;
    }

    private void restoreFromSavepointLegacy(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized String savepointDir) throws @UnknownKeyFor @NonNull @Initialized ExecutionException, @UnknownKeyFor @NonNull @Initialized InterruptedException {
        JobGraph jobGraph = this.getJobGraph(pipeline);
        SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.forPath((String)savepointDir);
        jobGraph.setSavepointRestoreSettings(savepointSettings);
        flinkCluster.submitJob(jobGraph).get();
    }

    private void restoreFromSavepointPortable(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized String savepointDir) throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions flinkOptions = (FlinkPipelineOptions)pipeline.getOptions().as(FlinkPipelineOptions.class);
        flinkOptions.setSavepointPath(savepointDir);
        this.executePortable(pipeline);
    }

    private @UnknownKeyFor @NonNull @Initialized JobGraph getJobGraph(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        FlinkRunner flinkRunner = FlinkRunner.fromOptions((PipelineOptions)pipeline.getOptions());
        return flinkRunner.getJobGraph(pipeline);
    }

    private static @UnknownKeyFor @NonNull @Initialized PCollection createStreamingJob(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized boolean restored, @UnknownKeyFor @NonNull @Initialized boolean isPortablePipeline) {
        PCollection key = isPortablePipeline ? (PCollection)((PCollection)((PCollection)pipeline.apply("ImpulseStage", (PTransform)Impulse.create())).apply("KvMapperStage", (PTransform)MapElements.via((InferableFunction)new InferableFunction<byte[], KV<String, Void>>(){

            public @UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @Nullable @Initialized Void> apply(@UnknownKeyFor @NonNull @Initialized byte @UnknownKeyFor @NonNull @Initialized [] input) throws @UnknownKeyFor @NonNull @Initialized Exception {
                return KV.of((Object)"key", null);
            }
        }))).apply("TimerStage", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Void>, KV<String, Long>>(){
            @DoFn.StateId(value="nextInteger")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long>> valueStateSpec = StateSpecs.value();
            @DoFn.TimerId(value="timer")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec timer = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.TimerId(value="timer") @UnknownKeyFor @NonNull @Initialized Timer timer) {
                timer.set(new Instant(0L));
            }

            @DoFn.OnTimer(value="timer")
            public void onTimer(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @Nullable @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized OnTimerContext context, @DoFn.StateId(value="nextInteger") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Long> nextInteger, @DoFn.TimerId(value="timer") @UnknownKeyFor @NonNull @Initialized Timer timer) {
                Long current = (Long)nextInteger.read();
                current = current != null ? current : 0L;
                context.output((Object)KV.of((Object)"key", (Object)current));
                LOG.debug("triggering timer {}", (Object)current);
                nextInteger.write((Object)(current + 1L));
                timer.withOutputTimestamp(new Instant(0L)).set(context.fireTimestamp());
            }
        })) : (PCollection)((PCollection)pipeline.apply("IdGeneratorStage", (PTransform)GenerateSequence.from((long)0L))).apply("KvMapperStage", (PTransform)ParDo.of((DoFn)new DoFn<Long, KV<String, Long>>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                context.output((Object)KV.of((Object)"key", (Object)((Long)context.element())));
            }
        }));
        if (restored) {
            return (PCollection)key.apply("VerificationStage", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Long>, String>(){
                @DoFn.StateId(value="valueState")
                private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Integer>> valueStateSpec = StateSpecs.value();
                @DoFn.StateId(value="bagState")
                private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized BagState<@UnknownKeyFor @NonNull @Initialized Integer>> bagStateSpec = StateSpecs.bag();

                @DoFn.ProcessElement
                public void processElement(/*
                 * Issues handling annotations - annotations may be inaccurate
                 */
                // Could not load outer class - annotation placement on inner may be incorrect
                @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.StateId(value="valueState") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Integer> intValueState, @DoFn.StateId(value="bagState") @UnknownKeyFor @NonNull @Initialized BagState<@UnknownKeyFor @NonNull @Initialized Integer> intBagState) {
                    MatcherAssert.assertThat((Object)((Integer)intValueState.read()), (Matcher)Matchers.is((Object)42));
                    MatcherAssert.assertThat((Object)intBagState.read(), (Matcher)IsIterableContaining.hasItems((Object[])new Integer[]{40, 1, 1}));
                    oneShotLatch.countDown();
                }
            }));
        }
        return (PCollection)key.apply("VerificationStage", (PTransform)ParDo.of((DoFn)new DoFn<KV<String, Long>, String>(){
            @DoFn.StateId(value="valueState")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Integer>> valueStateSpec = StateSpecs.value();
            @DoFn.StateId(value="bagState")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized BagState<@UnknownKeyFor @NonNull @Initialized Integer>> bagStateSpec = StateSpecs.bag();

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.StateId(value="valueState") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized Integer> intValueState, @DoFn.StateId(value="bagState") @UnknownKeyFor @NonNull @Initialized BagState<@UnknownKeyFor @NonNull @Initialized Integer> intBagState) {
                long value = Objects.requireNonNull((Long)((KV)context.element()).getValue());
                LOG.debug("value: {} timestamp: {}", (Object)value, (Object)context.timestamp().getMillis());
                if (value == 0L) {
                    intValueState.write((Object)42);
                    intBagState.add((Object)40);
                    intBagState.add((Object)1);
                    intBagState.add((Object)1);
                } else if (value >= 1L) {
                    oneShotLatch.countDown();
                }
            }
        }));
    }

    static {
        tempFolder = new TemporaryFolder();
    }
}

