package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/PortableTimersExecutionTest.class */
public class PortableTimersExecutionTest implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(PortableTimersExecutionTest.class);

    @Parameterized.Parameter
    public boolean isStreaming;
    private static ListeningExecutorService flinkJobExecutor;

    @Parameterized.Parameters(name = "streaming: {0}")
    public static Object[] testModes() {
        return new Object[]{true, false};
    }

    @BeforeClass
    public static void setup() {
        flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
    }

    @AfterClass
    public static void tearDown() throws InterruptedException {
        flinkJobExecutor.shutdown();
        flinkJobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        if (!flinkJobExecutor.isShutdown()) {
            LOG.warn("Could not shutdown Flink job executor");
        }
        flinkJobExecutor = null;
    }

    @Test(timeout = 120000)
    public void testTimerExecution() throws Exception {
        FlinkPipelineOptions as = PipelineOptionsFactory.fromArgs(new String[]{"--experiments=beam_fn_api"}).as(FlinkPipelineOptions.class);
        as.setRunner(CrashingRunner.class);
        as.setFlinkMaster("[local]");
        as.setStreaming(this.isStreaming);
        as.setParallelism(2);
        as.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        final ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        while (true) {
            Integer num = i;
            if (num.intValue() >= 50) {
                break;
            }
            arrayList2.add(KV.of(num.toString(), 4093));
            for (int i2 = 0; i2 < 15; i2++) {
                arrayList.add(KV.of(num.toString(), Integer.valueOf(i2)));
                arrayList2.add(KV.of(num.toString(), Integer.valueOf(i2 + 5000)));
            }
            i = Integer.valueOf(num.intValue() + 1);
        }
        Collections.shuffle(arrayList);
        DoFn<byte[], KV<String, Integer>> doFn = new DoFn<byte[], KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.PortableTimersExecutionTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<byte[], KV<String, Integer>>.ProcessContext processContext) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    processContext.output((KV) it.next());
                }
            }
        };
        DoFn<KV<String, Integer>, KV<String, Integer>> doFn2 = new DoFn<KV<String, Integer>, KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.PortableTimersExecutionTest.2

            @DoFn.TimerId("foo")
            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.StateId("sizzle")
            private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, KV<String, Integer>>.ProcessContext processContext, @DoFn.TimerId("foo") Timer timer, @DoFn.StateId("sizzle") ValueState<String> valueState, BoundedWindow boundedWindow) {
                timer.set(boundedWindow.maxTimestamp());
                valueState.write((String) ((KV) processContext.element()).getKey());
                processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), Integer.valueOf(((Integer) ((KV) processContext.element()).getValue()).intValue() + 5000)));
            }

            @DoFn.OnTimer("foo")
            public void onTimer(@DoFn.StateId("sizzle") ValueState<String> valueState, DoFn.OutputReceiver<KV<String, Integer>> outputReceiver) {
                outputReceiver.output(KV.of((String) Objects.requireNonNull((String) valueState.read(), "State must not be null"), 4093));
            }
        };
        Pipeline create = Pipeline.create(as);
        PAssert.that(create.apply("Impulse", Impulse.create()).apply("Input", ParDo.of(doFn)).apply("Timers", ParDo.of(doFn2))).containsInAnyOrder(arrayList2);
        JobInvocation createJobInvocation = FlinkJobInvoker.create((FlinkJobServerDriver.FlinkServerConfiguration) null).createJobInvocation("id", "none", flinkJobExecutor, PipelineTranslation.toProto(create), as, new FlinkPipelineRunner(as, (String) null, Collections.emptyList()));
        createJobInvocation.start();
        while (createJobInvocation.getState() != JobApi.JobState.Enum.DONE) {
            Thread.sleep(1000L);
        }
        MatcherAssert.assertThat(createJobInvocation.getState(), Is.is(JobApi.JobState.Enum.DONE));
    }
}
