/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.flink.FlinkJobInvoker;
import org.apache.beam.runners.flink.FlinkPipelineOptions;
import org.apache.beam.runners.flink.FlinkPipelineRunner;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(value=Parameterized.class)
public class PortableTimersExecutionTest
implements Serializable {
    private static final @UnknownKeyFor @NonNull @Initialized Logger LOG = LoggerFactory.getLogger(PortableTimersExecutionTest.class);
    @Parameterized.Parameter
    public @UnknownKeyFor @NonNull @Initialized boolean isStreaming;
    private static @UnknownKeyFor @NonNull @Initialized ListeningExecutorService flinkJobExecutor;

    @Parameterized.Parameters(name="streaming: {0}")
    public static @UnknownKeyFor @NonNull @Initialized Object @UnknownKeyFor @NonNull @Initialized [] testModes() {
        return new Object[]{true, false};
    }

    @BeforeClass
    public static void setup() {
        flinkJobExecutor = MoreExecutors.listeningDecorator((ExecutorService)Executors.newFixedThreadPool(1));
    }

    @AfterClass
    public static void tearDown() throws @UnknownKeyFor @NonNull @Initialized InterruptedException {
        flinkJobExecutor.shutdown();
        flinkJobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        if (!flinkJobExecutor.isShutdown()) {
            LOG.warn("Could not shutdown Flink job executor");
        }
        flinkJobExecutor = null;
    }

    @Test(timeout=120000L)
    public void testTimerExecution() throws @UnknownKeyFor @NonNull @Initialized Exception {
        FlinkPipelineOptions options = (FlinkPipelineOptions)PipelineOptionsFactory.fromArgs((String[])new String[]{"--experiments=beam_fn_api"}).as(FlinkPipelineOptions.class);
        options.setRunner(CrashingRunner.class);
        options.setFlinkMaster("[local]");
        options.setStreaming(this.isStreaming);
        options.setParallelism(Integer.valueOf(2));
        ((PortablePipelineOptions)options.as(PortablePipelineOptions.class)).setDefaultEnvironmentType("EMBEDDED");
        String timerId = "foo";
        String stateId = "sizzle";
        int offset = 5000;
        int timerOutput = 4093;
        int numKeys = 50;
        int numDuplicateTimers = 15;
        final ArrayList<KV> input = new ArrayList<KV>();
        ArrayList<KV> expectedOutput = new ArrayList<KV>();
        Integer key = 0;
        while (key < numKeys) {
            expectedOutput.add(KV.of((Object)key.toString(), (Object)4093));
            for (int i = 0; i < numDuplicateTimers; ++i) {
                input.add(KV.of((Object)key.toString(), (Object)i));
                expectedOutput.add(KV.of((Object)key.toString(), (Object)(i + 5000)));
            }
            key = key + 1;
        }
        Collections.shuffle(input);
        DoFn<byte[], KV<String, Integer>> inputFn = new DoFn<byte[], KV<String, Integer>>(){

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context) {
                for (KV stringIntegerKV : input) {
                    context.output((Object)stringIntegerKV);
                }
            }
        };
        DoFn<KV<String, Integer>, KV<String, Integer>> testFn = new DoFn<KV<String, Integer>, KV<String, Integer>>(){
            @DoFn.TimerId(value="foo")
            private final @UnknownKeyFor @NonNull @Initialized TimerSpec spec = TimerSpecs.timer((TimeDomain)TimeDomain.EVENT_TIME);
            @DoFn.StateId(value="sizzle")
            private final @UnknownKeyFor @NonNull @Initialized StateSpec<@UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String>> stateSpec = StateSpecs.value((Coder)StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(/*
             * Issues handling annotations - annotations may be inaccurate
             */
            // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @UnknownKeyFor @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized @NonNull @Initialized DoFn. @UnknownKeyFor @NonNull @Initialized ProcessContext context, @DoFn.TimerId(value="foo") @UnknownKeyFor @NonNull @Initialized Timer timer, @DoFn.StateId(value="sizzle") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> state, @UnknownKeyFor @NonNull @Initialized BoundedWindow window) {
                timer.set(window.maxTimestamp());
                state.write((Object)((String)((KV)context.element()).getKey()));
                context.output((Object)KV.of((Object)((String)((KV)context.element()).getKey()), (Object)((Integer)((KV)context.element()).getValue() + 5000)));
            }

            @DoFn.OnTimer(value="foo")
            public void onTimer(@DoFn.StateId(value="sizzle") @UnknownKeyFor @NonNull @Initialized ValueState<@UnknownKeyFor @NonNull @Initialized String> state, // Could not load outer class - annotation placement on inner may be incorrect
            @UnknownKeyFor @NonNull @Initialized DoFn.OutputReceiver<@UnknownKeyFor @NonNull @Initialized KV<@UnknownKeyFor @NonNull @Initialized String, @UnknownKeyFor @NonNull @Initialized Integer>> r) {
                String read = Objects.requireNonNull((String)state.read(), "State must not be null");
                KV of = KV.of((Object)read, (Object)4093);
                r.output((Object)of);
            }
        };
        Pipeline pipeline = Pipeline.create((PipelineOptions)options);
        PCollection output = (PCollection)((PCollection)((PCollection)pipeline.apply("Impulse", (PTransform)Impulse.create())).apply("Input", (PTransform)ParDo.of((DoFn)inputFn))).apply("Timers", (PTransform)ParDo.of((DoFn)testFn));
        PAssert.that((PCollection)output).containsInAnyOrder(expectedOutput);
        RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto((Pipeline)pipeline);
        JobInvocation jobInvocation = FlinkJobInvoker.create(null).createJobInvocation("id", "none", flinkJobExecutor, pipelineProto, options, (PortablePipelineRunner)new FlinkPipelineRunner(options, null, Collections.emptyList()));
        jobInvocation.start();
        while (jobInvocation.getState() != JobApi.JobState.Enum.DONE) {
            Thread.sleep(1000L);
        }
        MatcherAssert.assertThat((Object)jobInvocation.getState(), (Matcher)Is.is((Object)JobApi.JobState.Enum.DONE));
    }
}

