package org.apache.beam.runners.flink;

import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.Date;
import java.util.Optional;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.RequiresStableInputIT;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.BagState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.FileChecksumMatcher;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.util.FilePatternMatchingShardedFile;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Instant;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/beam/runners/flink/FlinkRequiresStableInputTest.class */
public class FlinkRequiresStableInputTest {

    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private static final String VALUE = "value";
    private static final String VALUE_CHECKSUM = "f32b67c7e26342af42efabc674d441dca0a281c5";
    private static ListeningExecutorService flinkJobExecutor;
    private static final int PARALLELISM = 1;
    private static final long CHECKPOINT_INTERVAL = 2000;
    private static final long FINISH_SOURCE_INTERVAL = 6000;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/flink/FlinkRequiresStableInputTest$StableDoFn.class */
    public static class StableDoFn extends DoFn<KV<Void, Integer>, Integer> {

        @DoFn.StateId("state")
        final StateSpec<BagState<Integer>> stateSpec;

        @DoFn.TimerId("flush")
        final TimerSpec flushSpec;

        private StableDoFn() {
            this.stateSpec = StateSpecs.bag();
            this.flushSpec = TimerSpecs.timer(TimeDomain.EVENT_TIME);
        }

        @DoFn.ProcessElement
        @DoFn.RequiresStableInput
        public void process(@DoFn.Element KV<Void, Integer> kv, @DoFn.StateId("state") BagState<Integer> bagState, @DoFn.TimerId("flush") Timer timer, DoFn.OutputReceiver<Integer> outputReceiver) {
            outputReceiver.output((Integer) kv.getValue());
        }

        @DoFn.OnTimer("flush")
        public void flush(@DoFn.Timestamp Instant instant, @DoFn.StateId("state") BagState<Integer> bagState, DoFn.OutputReceiver<Integer> outputReceiver) {
            Optional.ofNullable(bagState.read()).ifPresent(iterable -> {
                iterable.forEach(num -> {
                    outputReceiver.outputWithTimestamp(num, instant);
                });
            });
            bagState.clear();
        }
    }

    @BeforeClass
    public static void setup() {
        flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(PARALLELISM));
    }

    @Test(timeout = 30000)
    public void testParDoRequiresStableInput() throws Exception {
        runTest(false);
    }

    @Test(timeout = 30000)
    public void testParDoRequiresStableInputPortable() throws Exception {
        runTest(true);
    }

    @Test(timeout = 30000)
    public void testParDoRequiresStableInputStateful() throws Exception {
        testParDoRequiresStableInputStateful(false);
    }

    @Test(timeout = 30000)
    public void testParDoRequiresStableInputStatefulPortable() throws Exception {
        testParDoRequiresStableInputStateful(true);
    }

    private void testParDoRequiresStableInputStateful(boolean z) throws Exception {
        FlinkPipelineOptions flinkOptions = getFlinkOptions(z);
        flinkOptions.as(FlinkPipelineOptions.class).setShutdownSourcesAfterIdleMs(Long.valueOf(FINISH_SOURCE_INTERVAL));
        flinkOptions.as(FlinkPipelineOptions.class).setNumberOfExecutionRetries(0);
        Pipeline create = Pipeline.create(flinkOptions);
        PAssert.that(create.apply(Create.of(Integer.valueOf(PARALLELISM), new Integer[]{2, 3, 4})).apply(WithKeys.of((Void) null)).apply(ParDo.of(new StableDoFn()))).containsInAnyOrder(new Integer[]{Integer.valueOf(PARALLELISM), 2, 3, 4});
        executePipeline(create, z);
    }

    private void runTest(boolean z) throws Exception {
        FlinkPipelineOptions flinkOptions = getFlinkOptions(z);
        ResourceId resolve = FileSystems.matchNewResource(tempFolder.getRoot().getAbsolutePath(), true).resolve(String.format("requires-stable-input-%tF-%<tH-%<tM-%<tS-%<tL", new Date()), ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY);
        String resourceId = resolve.resolve("pardo-single-output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        String resourceId2 = resolve.resolve("pardo-multi-output", ResolveOptions.StandardResolveOptions.RESOLVE_DIRECTORY).resolve("key-", ResolveOptions.StandardResolveOptions.RESOLVE_FILE).toString();
        executePipeline(createPipeline(flinkOptions, resourceId, resourceId2), z);
        MatcherAssert.assertThat(new FilePatternMatchingShardedFile(resourceId + "*"), FileChecksumMatcher.fileContentsHaveChecksum(VALUE_CHECKSUM));
        MatcherAssert.assertThat(new FilePatternMatchingShardedFile(resourceId2 + "*"), FileChecksumMatcher.fileContentsHaveChecksum(VALUE_CHECKSUM));
    }

    private void executePipeline(Pipeline pipeline, boolean z) throws Exception {
        if (!z) {
            executePipelineLegacy(pipeline);
            return;
        }
        RunnerApi.Pipeline proto = PipelineTranslation.toProto(pipeline);
        FlinkPipelineOptions as = pipeline.getOptions().as(FlinkPipelineOptions.class);
        JobInvocation createJobInvocation = FlinkJobInvoker.create((FlinkJobServerDriver.FlinkServerConfiguration) null).createJobInvocation("fakeId", "fakeRetrievalToken", flinkJobExecutor, proto, as, new FlinkPipelineRunner(as, (String) null, Collections.emptyList()));
        createJobInvocation.start();
        while (createJobInvocation.getState() != JobApi.JobState.Enum.DONE && createJobInvocation.getState() != JobApi.JobState.Enum.FAILED) {
            Thread.sleep(1000L);
        }
        MatcherAssert.assertThat(createJobInvocation.getState(), Matchers.equalTo(JobApi.JobState.Enum.DONE));
    }

    private void executePipelineLegacy(Pipeline pipeline) {
        MatcherAssert.assertThat(FlinkRunner.fromOptions(pipeline.getOptions()).run(pipeline).waitUntilFinish(), Matchers.equalTo(PipelineResult.State.DONE));
    }

    private static Pipeline createPipeline(PipelineOptions pipelineOptions, String str, String str2) {
        Pipeline create = Pipeline.create(pipelineOptions);
        SerializableFunction serializableFunction = r4 -> {
            throw new IllegalStateException("Failing job to test @RequiresStableInput");
        };
        PCollection apply = create.apply("CreatePCollectionOfOneValue", Create.of(VALUE, new String[0]));
        apply.apply("Single-PairWithRandomKey", MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn())).apply(Reshuffle.of()).apply("Single-MakeSideEffectAndThenFail", ParDo.of(new RequiresStableInputIT.MakeSideEffectAndThenFailFn(str, serializableFunction)));
        apply.apply("Multi-PairWithRandomKey", MapElements.via(new RequiresStableInputIT.PairWithRandomKeyFn())).apply(Reshuffle.of()).apply("Multi-MakeSideEffectAndThenFail", ParDo.of(new RequiresStableInputIT.MakeSideEffectAndThenFailFn(str2, serializableFunction)).withOutputTags(new TupleTag(), TupleTagList.empty()));
        return create;
    }

    private FlinkPipelineOptions getFlinkOptions(boolean z) {
        FlinkPipelineOptions defaults = FlinkPipelineOptions.defaults();
        defaults.setParallelism(Integer.valueOf(PARALLELISM));
        defaults.setCheckpointingInterval(Long.valueOf(CHECKPOINT_INTERVAL));
        defaults.setShutdownSourcesAfterIdleMs(Long.valueOf(FINISH_SOURCE_INTERVAL));
        defaults.setFinishBundleBeforeCheckpointing(true);
        defaults.setMaxBundleTimeMills(100L);
        defaults.setStreaming(true);
        if (z) {
            defaults.setRunner(CrashingRunner.class);
            defaults.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        } else {
            defaults.setRunner(FlinkRunner.class);
        }
        return defaults;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1000753573:
                if (implMethodName.equals("lambda$createPipeline$f8489e6c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/beam/sdk/transforms/SerializableFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/beam/runners/flink/FlinkRequiresStableInputTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Void;)Ljava/lang/Void;")) {
                    return r4 -> {
                        throw new IllegalStateException("Failing job to test @RequiresStableInput");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
