package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.repackaged.beam_runners_flink_2.p00011.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.values.KV;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/PortableTimersExecutionTest.class */
public class PortableTimersExecutionTest implements Serializable {

    @Parameterized.Parameter
    public boolean isStreaming;
    private transient ListeningExecutorService flinkJobExecutor;
    private static List<KV<String, Integer>> results = new ArrayList();

    @Parameterized.Parameters
    public static Object[] testModes() {
        return new Object[]{true, false};
    }

    @Before
    public void setup() {
        results.clear();
        this.flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
    }

    @After
    public void tearDown() {
        this.flinkJobExecutor.shutdown();
    }

    @Test(timeout = 60000)
    public void testTimerExecution() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        create.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
        create.as(FlinkPipelineOptions.class).setStreaming(this.isStreaming);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        final ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        while (true) {
            Integer num = i;
            if (num.intValue() >= 50) {
                break;
            }
            arrayList2.add(KV.of(num.toString(), 4093));
            for (int i2 = 0; i2 < 15; i2++) {
                arrayList.add(KV.of(num.toString(), Integer.valueOf(i2)));
                arrayList2.add(KV.of(num.toString(), Integer.valueOf(i2 + 5000)));
            }
            i = Integer.valueOf(num.intValue() + 1);
        }
        Collections.shuffle(arrayList);
        DoFn<byte[], KV<String, Integer>> doFn = new DoFn<byte[], KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.PortableTimersExecutionTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<byte[], KV<String, Integer>>.ProcessContext processContext) {
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    processContext.output((KV) it.next());
                }
            }
        };
        DoFn<KV<String, Integer>, KV<String, Integer>> doFn2 = new DoFn<KV<String, Integer>, KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.PortableTimersExecutionTest.2

            @DoFn.TimerId("foo")
            private final TimerSpec spec = TimerSpecs.timer(TimeDomain.EVENT_TIME);

            @DoFn.StateId("sizzle")
            private final StateSpec<ValueState<String>> stateSpec = StateSpecs.value(StringUtf8Coder.of());

            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, KV<String, Integer>>.ProcessContext processContext, @DoFn.TimerId("foo") Timer timer, @DoFn.StateId("sizzle") ValueState<String> valueState, BoundedWindow boundedWindow) {
                timer.set(boundedWindow.maxTimestamp());
                valueState.write((String) ((KV) processContext.element()).getKey());
                processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), Integer.valueOf(((Integer) ((KV) processContext.element()).getValue()).intValue() + 5000)));
            }

            @DoFn.OnTimer("foo")
            public void onTimer(@DoFn.StateId("sizzle") ValueState<String> valueState, DoFn.OutputReceiver<KV<String, Integer>> outputReceiver) {
                outputReceiver.output(KV.of((String) valueState.read(), 4093));
            }
        };
        DoFn<KV<String, Integer>, Void> doFn3 = new DoFn<KV<String, Integer>, Void>() { // from class: org.apache.beam.runners.flink.PortableTimersExecutionTest.3
            @DoFn.ProcessElement
            public void processElement(DoFn<KV<String, Integer>, Void>.ProcessContext processContext) {
                PortableTimersExecutionTest.results.add((KV) processContext.element());
            }
        };
        Pipeline create2 = Pipeline.create(create);
        create2.apply(Impulse.create()).apply(ParDo.of(doFn)).apply(ParDo.of(doFn2)).apply(ParDo.of(doFn3));
        FlinkJobInvocation create3 = FlinkJobInvocation.create("id", "none", this.flinkJobExecutor, PipelineTranslation.toProto(create2), create.as(FlinkPipelineOptions.class), Collections.emptyList());
        create3.start();
        long currentTimeMillis = System.currentTimeMillis() + 60000;
        while (create3.getState() != JobApi.JobState.Enum.DONE && System.currentTimeMillis() < currentTimeMillis) {
            Thread.sleep(1000L);
        }
        Assert.assertThat(create3.getState(), Is.is(JobApi.JobState.Enum.DONE));
        Assert.assertThat(results, Matchers.containsInAnyOrder(arrayList2.toArray()));
    }
}
