package org.apache.beam.runners.spark;

import java.io.File;
import java.io.Serializable;
import java.nio.file.FileSystems;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.beam.model.jobmanagement.v1.JobApi;
import org.apache.beam.runners.core.construction.JavaReadViaImpulse;
import org.apache.beam.runners.core.construction.PipelineTranslation;
import org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.testing.CrashingRunner;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.ListeningExecutorService;
import org.apache.beam.vendor.guava.v20_0.com.google.common.util.concurrent.MoreExecutors;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkPortableExecutionTest.class */
public class SparkPortableExecutionTest implements Serializable {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Logger LOG = LoggerFactory.getLogger(SparkPortableExecutionTest.class);
    private static ListeningExecutorService sparkJobExecutor;

    /* loaded from: input_file:org/apache/beam/runners/spark/SparkPortableExecutionTest$DoFnWithSideEffect.class */
    private class DoFnWithSideEffect<InputT> extends DoFn<InputT, KV<String, String>> {
        private final String name;
        private final File file;

        DoFnWithSideEffect(String str) {
            this.name = str;
            this.file = new File(FileSystems.getDefault().getPath(SparkPortableExecutionTest.temporaryFolder.getRoot().getAbsolutePath(), String.format("%s-%s", this.name, UUID.randomUUID().toString())).toString());
        }

        @DoFn.ProcessElement
        public void process(DoFn<InputT, KV<String, String>>.ProcessContext processContext) throws Exception {
            processContext.output(KV.of(this.name, this.name));
            Assert.assertTrue(String.format("Create file %s failed (DoFn %s should only have been run once).", this.file.getAbsolutePath(), this.name), this.file.createNewFile());
        }
    }

    @BeforeClass
    public static void setup() {
        sparkJobExecutor = MoreExecutors.newDirectExecutorService();
    }

    @AfterClass
    public static void tearDown() throws InterruptedException {
        sparkJobExecutor.shutdown();
        sparkJobExecutor.awaitTermination(10L, TimeUnit.SECONDS);
        if (!sparkJobExecutor.isShutdown()) {
            LOG.warn("Could not shut down Spark job executor");
        }
        sparkJobExecutor = null;
    }

    @Test(timeout = 120000)
    public void testExecution() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        final PCollectionView apply = create2.apply("impulse23", Impulse.create()).apply("create23", ParDo.of(new DoFn<byte[], Long>() { // from class: org.apache.beam.runners.spark.SparkPortableExecutionTest.1
            @DoFn.ProcessElement
            public void process(DoFn<byte[], Long>.ProcessContext processContext) {
                processContext.output(23L);
            }
        })).apply(View.asSingleton());
        PAssert.that(create2.apply("impulse", Impulse.create()).apply("create", ParDo.of(new DoFn<byte[], String>() { // from class: org.apache.beam.runners.spark.SparkPortableExecutionTest.2
            @DoFn.ProcessElement
            public void process(DoFn<byte[], String>.ProcessContext processContext) {
                processContext.output("zero");
                processContext.output("one");
                processContext.output("two");
            }
        })).apply("len", ParDo.of(new DoFn<String, Long>() { // from class: org.apache.beam.runners.spark.SparkPortableExecutionTest.3
            @DoFn.ProcessElement
            public void process(DoFn<String, Long>.ProcessContext processContext) {
                processContext.output(Long.valueOf(((String) processContext.element()).length()));
            }
        })).apply("addKeys", WithKeys.of("foo")).apply("gbk", GroupByKey.create()).apply("print", ParDo.of(new DoFn<KV<String, Iterable<Long>>, KV<String, Long>>() { // from class: org.apache.beam.runners.spark.SparkPortableExecutionTest.4
            @DoFn.ProcessElement
            public void process(DoFn<KV<String, Iterable<Long>>, KV<String, Long>>.ProcessContext processContext) {
                processContext.output(KV.of("bar", (Long) processContext.sideInput(apply)));
                Iterator it = ((Iterable) ((KV) processContext.element()).getValue()).iterator();
                while (it.hasNext()) {
                    processContext.output(KV.of((String) ((KV) processContext.element()).getKey(), (Long) it.next()));
                }
            }
        }).withSideInputs(new PCollectionView[]{apply})).apply("gbk", GroupByKey.create())).containsInAnyOrder(new KV[]{KV.of("foo", ImmutableList.of(4L, 3L, 3L)), KV.of("bar", ImmutableList.of(23L))});
        create2.replaceAll(Collections.singletonList(JavaReadViaImpulse.boundedOverride()));
        JobInvocation createJobInvocation = SparkJobInvoker.createJobInvocation("fakeId", "fakeRetrievalToken", sparkJobExecutor, PipelineTranslation.toProto(create2), create.as(SparkPipelineOptions.class));
        createJobInvocation.start();
        Assert.assertEquals(JobApi.JobState.Enum.DONE, createJobInvocation.getState());
    }

    @Test(timeout = 120000)
    public void testExecStageWithMultipleOutputs() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        PCollection apply = create2.apply("impulse", Impulse.create()).apply("A", ParDo.of(new DoFnWithSideEffect("A")));
        PCollection apply2 = apply.apply("B", ParDo.of(new DoFnWithSideEffect("B")));
        PCollection apply3 = apply.apply("C", ParDo.of(new DoFnWithSideEffect("C")));
        apply2.apply(GroupByKey.create());
        apply3.apply(GroupByKey.create());
        JobInvocation createJobInvocation = SparkJobInvoker.createJobInvocation("testExecStageWithMultipleOutputs", "testExecStageWithMultipleOutputsRetrievalToken", sparkJobExecutor, PipelineTranslation.toProto(create2), create.as(SparkPipelineOptions.class));
        createJobInvocation.start();
        Assert.assertEquals(JobApi.JobState.Enum.DONE, createJobInvocation.getState());
    }

    @Test(timeout = 120000)
    public void testExecStageWithMultipleConsumers() throws Exception {
        PipelineOptions create = PipelineOptionsFactory.create();
        create.setRunner(CrashingRunner.class);
        create.as(PortablePipelineOptions.class).setDefaultEnvironmentType("EMBEDDED");
        Pipeline create2 = Pipeline.create(create);
        PCollection apply = create2.apply("impulse", Impulse.create()).apply("F", ParDo.of(new DoFnWithSideEffect("F"))).apply(GroupByKey.create());
        apply.apply("G", ParDo.of(new DoFnWithSideEffect("G")));
        apply.apply("H", ParDo.of(new DoFnWithSideEffect("H")));
        JobInvocation createJobInvocation = SparkJobInvoker.createJobInvocation("testExecStageWithMultipleConsumers", "testExecStageWithMultipleConsumersRetrievalToken", sparkJobExecutor, PipelineTranslation.toProto(create2), create.as(SparkPipelineOptions.class));
        createJobInvocation.start();
        Assert.assertEquals(JobApi.JobState.Enum.DONE, createJobInvocation.getState());
    }
}
