package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.repackaged.beam_runners_flink_1.p0006.com.google.common.collect.Iterables;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.InferableFunction;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.joda.time.Duration;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkSavepointTest.class */
public class FlinkSavepointTest implements Serializable {
    private static CountDownLatch oneShotLatch;
    private static transient MiniCluster flinkCluster;
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSavepointTest.class);

    @ClassRule
    public static transient TemporaryFolder tempFolder = new TemporaryFolder();

    @BeforeClass
    public static void beforeClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 0);
        configuration.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + tempFolder.getRoot().getAbsolutePath());
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file://" + tempFolder.getRoot().getAbsolutePath());
        flinkCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(2).setNumSlotsPerTaskManager(2).build());
        flinkCluster.start();
        TestStreamEnvironment.setAsContext(flinkCluster, 4);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        TestStreamEnvironment.unsetAsContext();
        flinkCluster.close();
    }

    @After
    public void afterTest() throws Exception {
        for (JobStatusMessage jobStatusMessage : (Collection) flinkCluster.listJobs().get()) {
            if (jobStatusMessage.getJobState() == JobStatus.RUNNING) {
                flinkCluster.cancelJob(jobStatusMessage.getJobId()).get();
            }
        }
    }

    @Test(timeout = 60000)
    public void testSavepointRestoreLegacy() throws Exception {
        runSavepointAndRestore(false);
    }

    @Test(timeout = 60000)
    public void testSavepointRestorePortable() throws Exception {
        runSavepointAndRestore(true);
    }

    private void runSavepointAndRestore(boolean z) throws Exception {
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setStreaming(true);
        as.setParallelism(1);
        as.setRunner(FlinkRunner.class);
        oneShotLatch = new CountDownLatch(1);
        Pipeline create = Pipeline.create(as);
        createStreamingJob(create, false, z);
        JobID executePortable = z ? executePortable(create) : executeLegacy(create);
        oneShotLatch.await();
        String takeSavepointAndCancelJob = takeSavepointAndCancelJob(executePortable);
        oneShotLatch = new CountDownLatch(1);
        Pipeline create2 = Pipeline.create(as);
        createStreamingJob(create2, true, z);
        if (z) {
            restoreFromSavepointPortable(create2, takeSavepointAndCancelJob);
        } else {
            restoreFromSavepointLegacy(create2, takeSavepointAndCancelJob);
        }
        oneShotLatch.await();
    }

    private JobID executeLegacy(Pipeline pipeline) throws Exception {
        JobGraph jobGraph = getJobGraph(pipeline);
        flinkCluster.submitJob(jobGraph).get();
        return jobGraph.getJobID();
    }

    private JobID executePortable(Pipeline pipeline) throws Exception {
        pipeline.getOptions().as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        pipeline.getOptions().as(FlinkPipelineOptions.class).setFlinkMaster(flinkCluster.getRestAddress().getHost() + ":" + flinkCluster.getRestAddress().getPort());
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(pipeline);
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
        try {
            FlinkJobInvocation.create("id", "none", listeningDecorator, proto, pipeline.getOptions().as(FlinkPipelineOptions.class), (String) null, Collections.emptyList()).start();
            JobID waitForJobToBeReady = waitForJobToBeReady();
            listeningDecorator.shutdown();
            return waitForJobToBeReady;
        } catch (Throwable th) {
            listeningDecorator.shutdown();
            throw th;
        }
    }

    private JobID waitForJobToBeReady() throws InterruptedException, ExecutionException {
        while (true) {
            JobStatusMessage jobStatusMessage = (JobStatusMessage) Iterables.getFirst((Iterable) flinkCluster.listJobs().get(), null);
            if (jobStatusMessage != null && jobStatusMessage.getJobState() == JobStatus.RUNNING) {
                return jobStatusMessage.getJobId();
            }
            Thread.sleep(100L);
        }
    }

    private String takeSavepointAndCancelJob(JobID jobID) throws Exception {
        Exception exc = null;
        for (int i = 0; i < 10; i++) {
            try {
                return (String) flinkCluster.triggerSavepoint(jobID, (String) null, true).get();
            } catch (Exception e) {
                exc = e;
                Thread.sleep(100L);
            }
        }
        throw exc;
    }

    private void restoreFromSavepointLegacy(Pipeline pipeline, String str) throws ExecutionException, InterruptedException {
        JobGraph jobGraph = getJobGraph(pipeline);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        flinkCluster.submitJob(jobGraph).get();
    }

    private void restoreFromSavepointPortable(Pipeline pipeline, String str) throws Exception {
        pipeline.getOptions().as(FlinkPipelineOptions.class).setSavepointPath(str);
        executePortable(pipeline);
    }

    private JobGraph getJobGraph(Pipeline pipeline) {
        return FlinkRunner.fromOptions(pipeline.getOptions()).getJobGraph(pipeline);
    }

    private static PCollection createStreamingJob(Pipeline pipeline, boolean z, boolean z2) {
        PCollection apply = z2 ? (PCollection) pipeline.apply(Impulse.create()).apply(MapElements.via(new InferableFunction<byte[], KV<String, Void>>() { // from class: org.apache.beam.runners.flink.FlinkSavepointTest.1
            public KV<String, Void> apply(byte[] bArr) throws Exception {
                return KV.of("key", (Object) null);
            }
        })).apply(ParDo.of(new DoFn<KV<String, Void>, KV<String, Long>>() { // from class: org.apache.beam.runners.flink.FlinkSavepointTest.2

            @DoFn.StateId("valueState")
            private final StateSpec<ValueState<Long>> valueStateSpec = StateSpecs.value();

            @DoFn.TimerId("timer")
            private final TimerSpec timer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Void>, KV<String, Long>>.ProcessContext processContext, @DoFn.TimerId("timer") Timer timer) {
                timer.offset(Duration.ZERO).setRelative();
            }

            @DoFn.OnTimer("timer")
            public void onTimer(DoFn<KV<String, Void>, KV<String, Long>>.OnTimerContext onTimerContext, @DoFn.StateId("valueState") ValueState<Long> valueState, @DoFn.TimerId("timer") Timer timer) {
                Long l = (Long) valueState.read();
                if (l == null) {
                    l = -1L;
                }
                long longValue = l.longValue() + 1;
                onTimerContext.output(KV.of("key", Long.valueOf(longValue)));
                valueState.write(Long.valueOf(longValue));
                timer.offset(Duration.millis(100L)).setRelative();
            }
        })) : pipeline.apply(GenerateSequence.from(0L)).apply(ParDo.of(new DoFn<Long, KV<String, Long>>() { // from class: org.apache.beam.runners.flink.FlinkSavepointTest.3
            @DoFn.ProcessElement
            public void processElement(DoFn<Long, KV<String, Long>>.ProcessContext processContext) {
                processContext.output(KV.of("key", (Long) processContext.element()));
            }
        }));
        return z ? apply.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { // from class: org.apache.beam.runners.flink.FlinkSavepointTest.4

            @DoFn.StateId("valueState")
            private final StateSpec<ValueState<Integer>> valueStateSpec = StateSpecs.value();

            @DoFn.StateId("bagState")
            private final StateSpec<BagState<Integer>> bagStateSpec = StateSpecs.bag();

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Long>, String>.ProcessContext processContext, @DoFn.StateId("valueState") ValueState<Integer> valueState, @DoFn.StateId("bagState") BagState<Integer> bagState) {
                Integer num = (Integer) valueState.read();
                Assert.assertNotNull(num);
                if (num.intValue() == 42) {
                    valueState.write(0);
                    FlinkSavepointTest.oneShotLatch.countDown();
                }
            }
        })) : apply.apply(ParDo.of(new DoFn<KV<String, Long>, String>() { // from class: org.apache.beam.runners.flink.FlinkSavepointTest.5

            @DoFn.StateId("valueState")
            private final StateSpec<ValueState<Integer>> valueStateSpec = StateSpecs.value();

            @DoFn.StateId("bagState")
            private final StateSpec<BagState<Integer>> bagStateSpec = StateSpecs.bag();

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Long>, String>.ProcessContext processContext, @DoFn.StateId("valueState") ValueState<Integer> valueState, @DoFn.StateId("bagState") BagState<Integer> bagState) {
                Long l = (Long) ((KV) processContext.element()).getValue();
                Assert.assertNotNull(l);
                if (l.longValue() == 0) {
                    valueState.write(42);
                    bagState.add(40);
                    bagState.add(1);
                    bagState.add(1);
                    FlinkSavepointTest.oneShotLatch.countDown();
                }
            }
        }));
    }
}
