package org.apache.beam.runners.flink;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.RequiresStableInputIT;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.SerializableMatcher;
import org.apache.beam.sdk.testing.SerializableMatchers;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.hamcrest.MatcherAssert;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkRequiresStableInputTest.class */
public class FlinkRequiresStableInputTest {

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private static CountDownLatch latch;
    private static final String VALUE = "value";
    private static final String VALUE_CHECKSUM = "f32b67c7e26342af42efabc674d441dca0a281c5";
    private static transient MiniCluster flinkCluster;

    @BeforeClass
    public static void beforeClass() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(RestOptions.PORT, 0);
        configuration.setString(CheckpointingOptions.STATE_BACKEND, "filesystem");
        configuration.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "file://" + tempFolder.getRoot().getAbsolutePath());
        configuration.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, "file://" + tempFolder.getRoot().getAbsolutePath());
        flinkCluster = new MiniCluster(new MiniClusterConfiguration.Builder().setConfiguration(configuration).setNumTaskManagers(1).setNumSlotsPerTaskManager(1).build());
        flinkCluster.start();
        TestStreamEnvironment.setAsContext(flinkCluster, 1);
    }

    @AfterClass
    public static void afterClass() throws Exception {
        TestStreamEnvironment.unsetAsContext();
        flinkCluster.close();
        flinkCluster = null;
    }

    @Test(timeout = 30000)
    public void testParDoRequiresStableInput() throws Exception {
        String takeSavepoint;
        FlinkPipelineOptions as = PipelineOptionsFactory.as(FlinkPipelineOptions.class);
        as.setParallelism(1);
        as.setCheckpointingInterval(Long.MAX_VALUE);
        as.setRunner(FlinkRunner.class);
        as.setStreaming(true);
        ResourceId resolve = FileSystems.matchNewResource(tempFolder.getRoot().getAbsolutePath(), true).resolve(String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
        String resourceId = resolve.resolve("pardo-single-output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        String resourceId2 = resolve.resolve("pardo-multi-output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        Pipeline createPipeline = createPipeline(as, resourceId, resourceId2);
        latch = new CountDownLatch(2);
        JobID executePipeline = executePipeline(createPipeline);
        do {
            takeSavepoint = takeSavepoint(executePipeline);
        } while (!latch.await(100L, TimeUnit.MILLISECONDS));
        flinkCluster.cancelJob(executePipeline).get();
        as.setShutdownSourcesOnFinalWatermark(true);
        restoreFromSavepoint(createPipeline, takeSavepoint);
        waitUntilJobIsDone();
        MatcherAssert.assertThat(new FlinkRunnerResult(Collections.emptyMap(), 1L), SerializableMatchers.allOf(new SerializableMatcher[]{new FileChecksumMatcher(VALUE_CHECKSUM, new FilePatternMatchingShardedFile(resourceId + "*")), new FileChecksumMatcher(VALUE_CHECKSUM, new FilePatternMatchingShardedFile(resourceId2 + "*"))}));
    }

    private JobGraph getJobGraph(Pipeline pipeline) {
        return FlinkRunner.fromOptions(pipeline.getOptions()).getJobGraph(pipeline);
    }

    private JobID executePipeline(Pipeline pipeline) throws Exception {
        JobGraph jobGraph = getJobGraph(pipeline);
        flinkCluster.submitJob(jobGraph).get();
        return jobGraph.getJobID();
    }

    private String takeSavepoint(JobID jobID) throws Exception {
        Exception exc = null;
        for (int i = 0; i < 10; i++) {
            try {
                return (String) flinkCluster.triggerSavepoint(jobID, (String) null, false).get();
            } catch (Exception e) {
                exc = e;
                Thread.sleep(100L);
            }
        }
        throw exc;
    }

    private JobID restoreFromSavepoint(Pipeline pipeline, String str) throws ExecutionException, InterruptedException {
        JobGraph jobGraph = getJobGraph(pipeline);
        jobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(str));
        return ((JobSubmissionResult) flinkCluster.submitJob(jobGraph).get()).getJobID();
    }

    private void waitUntilJobIsDone() throws InterruptedException, ExecutionException {
        while (((Collection) flinkCluster.listJobs().get()).stream().anyMatch(jobStatusMessage -> {
            return jobStatusMessage.getJobState() == JobStatus.RUNNING;
        })) {
            Thread.sleep(100L);
        }
    }

    private static Pipeline createPipeline(PipelineOptions pipelineOptions, String str, String str2) {
        Pipeline create = Pipeline.create(pipelineOptions);
        SerializableFunction serializableFunction = r2 -> {
            latch.countDown();
            return null;
        };
        PCollection apply = create.apply("CreatePCollectionOfOneValue", Create.of(VALUE, new String[0]));
        apply.apply("Single-PairWithRandomKey", MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn())).apply("Single-MakeSideEffectAndThenFail", ParDo.of(new RequiresStableInputIT.MakeSideEffectAndThenFailFn(str, serializableFunction)));
        apply.apply("Multi-PairWithRandomKey", MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn())).apply("Multi-MakeSideEffectAndThenFail", ParDo.of(new RequiresStableInputIT.MakeSideEffectAndThenFailFn(str2, serializableFunction)).withOutputTags(new TupleTag(), TupleTagList.empty()));
        return create;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 861089530:
                if (implMethodName.equals("lambda$createPipeline$687ec69$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/FlinkRequiresStableInputTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Ljava/lang/Void;")) {
                    return r2 -> {
                        latch.countDown();
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
