/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.Collection;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.RequiresStableInputIT;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.JobExecutor;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FlinkRequiresStableInputTest {
    @ClassRule
    public static @UnknownKeyFor @NonNull @Initialized TemporaryFolder tempFolder = new TemporaryFolder();
    private static @UnknownKeyFor @NonNull @Initialized CountDownLatch latch;
    private static final @UnknownKeyFor @NonNull @Initialized String VALUE = "value";
    private static final @UnknownKeyFor @NonNull @Initialized String VALUE_CHECKSUM = "f32b67c7e26342af42efabc674d441dca0a281c5";
    private static transient @UnknownKeyFor @NonNull @Initialized MiniCluster flinkCluster;

    @BeforeClass
    public static void beforeClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        boolean parallelism = true;
        Configuration config = new Configuration();
        config.setInteger(RestOptions.PORT, 0);
        config.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
        config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + tempFolder.getRoot().getAbsolutePath());
        config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file://" + tempFolder.getRoot().getAbsolutePath());
        MiniClusterConfiguration clusterConfig = new MiniClusterConfiguration.Builder().setConfiguration(config).setNumTaskManagers(1).setNumSlotsPerTaskManager(1).build();
        flinkCluster = new MiniCluster(clusterConfig);
        flinkCluster.start();
        TestStreamEnvironment.setAsContext((JobExecutor)flinkCluster, (int)1);
    }

    @AfterClass
    public static void afterClass() throws @UnknownKeyFor @NonNull @Initialized Exception {
        TestStreamEnvironment.unsetAsContext();
        flinkCluster.close();
        flinkCluster = null;
    }

    @Test(timeout=30000L)
    public void testParDoRequiresStableInput() throws @UnknownKeyFor @NonNull @Initialized Exception {
        String savepointDir;
        FlinkPipelineOptions options = FlinkPipelineOptions.defaults();
        options.setParallelism(Integer.valueOf(1));
        options.setCheckpointingInterval(Long.valueOf(Long.MAX_VALUE));
        options.setRunner(FlinkRunner.class);
        options.setStreaming(true);
        ResourceId outputDir = FileSystems.matchNewResource((String)tempFolder.getRoot().getAbsolutePath(), (boolean)true).resolve(String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
        String singleOutputPrefix = outputDir.resolve("pardo-single-output", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        String multiOutputPrefix = outputDir.resolve("pardo-multi-output", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", (ResolveOptions)ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        Pipeline p = FlinkRequiresStableInputTest.createPipeline((PipelineOptions)options, singleOutputPrefix, multiOutputPrefix);
        latch = new CountDownLatch(2);
        JobID jobID = this.executePipeline(p);
        do {
            savepointDir = this.takeSavepoint(jobID);
        } while (!latch.await(100L, TimeUnit.MILLISECONDS));
        flinkCluster.cancelJob(jobID).get();
        options.setShutdownSourcesAfterIdleMs(Long.valueOf(0L));
        this.restoreFromSavepoint(p, savepointDir);
        this.waitUntilJobIsDone();
        MatcherAssert.assertThat((Object)new FilePatternMatchingShardedFile(singleOutputPrefix + "*"), (Matcher)FileChecksumMatcher.fileContentsHaveChecksum((String)VALUE_CHECKSUM));
        MatcherAssert.assertThat((Object)new FilePatternMatchingShardedFile(multiOutputPrefix + "*"), (Matcher)FileChecksumMatcher.fileContentsHaveChecksum((String)VALUE_CHECKSUM));
    }

    private @UnknownKeyFor @NonNull @Initialized JobGraph getJobGraph(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) {
        FlinkRunner flinkRunner = FlinkRunner.fromOptions((PipelineOptions)pipeline.getOptions());
        return flinkRunner.getJobGraph(pipeline);
    }

    private @UnknownKeyFor @NonNull @Initialized JobID executePipeline(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline) throws @UnknownKeyFor @NonNull @Initialized Exception {
        JobGraph jobGraph = this.getJobGraph(pipeline);
        flinkCluster.submitJob(jobGraph).get();
        return jobGraph.getJobID();
    }

    private @UnknownKeyFor @NonNull @Initialized String takeSavepoint(@UnknownKeyFor @NonNull @Initialized JobID jobID) throws @UnknownKeyFor @NonNull @Initialized Exception {
        Exception exception = null;
        for (int i = 0; i < 10; ++i) {
            try {
                return (String)flinkCluster.triggerSavepoint(jobID, null, false).get();
            }
            catch (Exception e) {
                exception = e;
                Thread.sleep(100L);
                continue;
            }
        }
        throw exception;
    }

    private @UnknownKeyFor @NonNull @Initialized JobID restoreFromSavepoint(@UnknownKeyFor @NonNull @Initialized Pipeline pipeline, @UnknownKeyFor @NonNull @Initialized String savepointDir) throws @UnknownKeyFor @NonNull @Initialized ExecutionException, @UnknownKeyFor @NonNull @Initialized InterruptedException {
        JobGraph jobGraph = this.getJobGraph(pipeline);
        SavepointRestoreSettings savepointSettings = SavepointRestoreSettings.forPath((String)savepointDir);
        jobGraph.setSavepointRestoreSettings(savepointSettings);
        return ((JobSubmissionResult)flinkCluster.submitJob(jobGraph).get()).getJobID();
    }

    private void waitUntilJobIsDone() throws @UnknownKeyFor @NonNull @Initialized InterruptedException, @UnknownKeyFor @NonNull @Initialized ExecutionException {
        while (((Collection)flinkCluster.listJobs().get()).stream().anyMatch(message -> message.getJobState().name().equals("RUNNING"))) {
            Thread.sleep(100L);
        }
    }

    private static @UnknownKeyFor @NonNull @Initialized Pipeline createPipeline(@UnknownKeyFor @NonNull @Initialized PipelineOptions options, @UnknownKeyFor @NonNull @Initialized String singleOutputPrefix, @UnknownKeyFor @NonNull @Initialized String multiOutputPrefix) {
        Pipeline p = Pipeline.create((PipelineOptions)options);
        SerializableFunction & Serializable firstTime = (SerializableFunction & Serializable)value -> {
            latch.countDown();
            return null;
        };
        PCollection impulse = (PCollection)p.apply("CreatePCollectionOfOneValue", (PTransform)Create.of((Object)VALUE, (Object[])new String[0]));
        ((PCollection)impulse.apply("Single-PairWithRandomKey", (PTransform)MapElements.via((SimpleFunction)new RequiresStableInputIT.PairWithRandomKeyFn()))).apply("Single-MakeSideEffectAndThenFail", (PTransform)ParDo.of((DoFn)new RequiresStableInputIT.MakeSideEffectAndThenFailFn(singleOutputPrefix, (SerializableFunction)firstTime)));
        ((PCollection)impulse.apply("Multi-PairWithRandomKey", (PTransform)MapElements.via((SimpleFunction)new RequiresStableInputIT.PairWithRandomKeyFn()))).apply("Multi-MakeSideEffectAndThenFail", (PTransform)ParDo.of((DoFn)new RequiresStableInputIT.MakeSideEffectAndThenFailFn(multiOutputPrefix, (SerializableFunction)firstTime)).withOutputTags(new TupleTag(), TupleTagList.empty()));
        return p;
    }
}

