package org.apache.beam.runners.flink;

import java.io.Serializable;
import java.util.Collections;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.flink.FlinkJobServerDriver;
import org.apache.beam.runners.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.VarIntCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/beam/runners/flink/PortableStateExecutionTest.class */
public class PortableStateExecutionTest implements Serializable {
    private static final Logger LOG = LoggerFactory.getLogger(PortableStateExecutionTest.class);

    @Parameterized.Parameter
    public boolean isStreaming;
    private static ListeningExecutorService flinkJobExecutor;
    private static final int CLEAR_STATE = -1;
    private static final int WRITE_STATE = -2;

    @Parameterized.Parameters(name = "streaming: {0}")
    public static Object[] data() {
        return new Object[]{true, false};
    }

    @BeforeClass
    public static void setup() {
        flinkJobExecutor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(1));
    }

    @AfterClass
    public static void tearDown() throws InterruptedException {
        flinkJobExecutor.shutdown();
        flinkJobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        if (!flinkJobExecutor.isShutdown()) {
            LOG.warn("Could not shutdown Flink job executor");
        }
        flinkJobExecutor = null;
    }

    @Test(timeout = 120000)
    public void testExecution() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.fromArgs(new String[]{"--experiments=beam_fn_api"}).create();
        create.setRunner(CrashingRunner.class);
        create.as(FlinkPipelineOptions.class).setFlinkMaster("[local]");
        create.as(FlinkPipelineOptions.class).setStreaming(this.isStreaming);
        create.as(FlinkPipelineOptions.class).setParallelism(2);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        PAssert.that(create2.apply(Impulse.create()).apply(ParDo.of(new DoFn<byte[], KV<String, Integer>>() { // from class: org.apache.beam.runners.flink.PortableStateExecutionTest.1
            @DoFn.ProcessElement
            public void process(DoFn<byte[], KV<String, Integer>>.ProcessContext processContext) {
                processContext.output(KV.of("clearedState", 1));
                processContext.output(KV.of("clearedState", Integer.valueOf(PortableStateExecutionTest.CLEAR_STATE)));
                processContext.output(KV.of("bla1", 42));
                processContext.output(KV.of("bla", 23));
                processContext.output(KV.of("bla2", 64));
                processContext.output(KV.of("bla", 1));
                processContext.output(KV.of("bla", 1));
                processContext.output(KV.of("bla", Integer.valueOf(PortableStateExecutionTest.WRITE_STATE)));
                processContext.output(KV.of("bla1", Integer.valueOf(PortableStateExecutionTest.WRITE_STATE)));
                processContext.output(KV.of("bla2", Integer.valueOf(PortableStateExecutionTest.WRITE_STATE)));
                processContext.output(KV.of("clearedState", Integer.valueOf(PortableStateExecutionTest.WRITE_STATE)));
            }
        })).apply("statefulDoFn", ParDo.of(new DoFn<KV<String, Integer>, KV<String, String>>() { // from class: org.apache.beam.runners.flink.PortableStateExecutionTest.2

            @DoFn.StateId("valueState")
            private final StateSpec<ValueState<Integer>> valueStateSpec = StateSpecs.value(VarIntCoder.of());

            @DoFn.StateId("valueState2")
            private final StateSpec<ValueState<Integer>> valueStateSpec2 = StateSpecs.value(VarIntCoder.of());

            @DoFn.ProcessElement
            public void process(DoFn<KV<String, Integer>, KV<String, String>>.ProcessContext processContext, @DoFn.StateId("valueState") ValueState<Integer> valueState, @DoFn.StateId("valueState2") ValueState<Integer> valueState2) {
                performStateUpdates(processContext, valueState);
                performStateUpdates(processContext, valueState2);
            }

            private void performStateUpdates(DoFn<KV<String, Integer>, KV<String, String>>.ProcessContext processContext, ValueState<Integer> valueState) {
                Integer num = (Integer) ((KV) processContext.element()).getValue();
                if (num == null) {
                    throw new IllegalStateException();
                }
                switch (num.intValue()) {
                    case PortableStateExecutionTest.WRITE_STATE /* -2 */:
                        Integer num2 = (Integer) valueState.read();
                        processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), num2 == null ? "null" : num2.toString()));
                        return;
                    case PortableStateExecutionTest.CLEAR_STATE /* -1 */:
                        valueState.clear();
                        return;
                    default:
                        Integer num3 = (Integer) valueState.read();
                        valueState.write(num3 == null ? num : Integer.valueOf(num3.intValue() + num.intValue()));
                        return;
                }
            }
        }))).containsInAnyOrder(new KV[]{KV.of("bla", "25"), KV.of("bla1", "42"), KV.of("bla2", "64"), KV.of("clearedState", "null"), KV.of("bla", "25"), KV.of("bla1", "42"), KV.of("bla2", "64"), KV.of("clearedState", "null")});
        JobInvocation createJobInvocation = FlinkJobInvoker.create((FlinkJobServerDriver.FlinkServerConfiguration) null).createJobInvocation("id", "none", flinkJobExecutor, PipelineTranslation.toProto(create2), create.as(FlinkPipelineOptions.class), new FlinkPipelineRunner(create.as(FlinkPipelineOptions.class), (String) null, Collections.emptyList()));
        createJobInvocation.start();
        while (createJobInvocation.getState() != JobApi.JobState.Enum.DONE) {
            Thread.sleep(1000L);
        }
    }
}
