package org.apache.beam.runners.spark;

import java.io.Serializable;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineStateTest.class */
public class SparkPipelineStateTest implements Serializable {
    private transient SparkPipelineOptions options = PipelineOptionsFactory.as(SparkPipelineOptions.class);

    @Rule
    public transient TestName testName = new TestName();
    private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineStateTest$MyCustomException.class */
    public static class MyCustomException extends RuntimeException {
        MyCustomException(String str) {
            super(str);
        }
    }

    private ParDo.SingleOutput<String, String> printParDo(final String str) {
        return ParDo.of(new DoFn<String, String>() { // from class: org.apache.beam.runners.spark.SparkPipelineStateTest.1
            @DoFn.ProcessElement
            public void processElement(DoFn<String, String>.ProcessContext processContext) {
                System.out.println(str + " " + ((String) processContext.element()));
            }
        });
    }

    private PTransform<PBegin, PCollection<String>> getValues(SparkPipelineOptions sparkPipelineOptions) {
        return sparkPipelineOptions.isStreaming() ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1L), false).nextBatch(new String[]{"one", "two"}) : Create.of("one", new String[]{"two"});
    }

    private SparkPipelineOptions getStreamingOptions() {
        this.options.setRunner(SparkRunner.class);
        this.options.setStreaming(true);
        return this.options;
    }

    private SparkPipelineOptions getBatchOptions() {
        this.options.setRunner(SparkRunner.class);
        this.options.setStreaming(false);
        return this.options;
    }

    private Pipeline getPipeline(SparkPipelineOptions sparkPipelineOptions) {
        Pipeline create = Pipeline.create(sparkPipelineOptions);
        create.apply(getValues(sparkPipelineOptions)).setCoder(StringUtf8Coder.of()).apply(printParDo(this.testName.getMethodName() + "(isStreaming=" + sparkPipelineOptions.isStreaming() + ")"));
        return create;
    }

    private void testFailedPipeline(SparkPipelineOptions sparkPipelineOptions) throws Exception {
        SparkPipelineResult sparkPipelineResult = null;
        try {
            Pipeline create = Pipeline.create(sparkPipelineOptions);
            create.apply(getValues(sparkPipelineOptions)).setCoder(StringUtf8Coder.of()).apply(MapElements.via(new SimpleFunction<String, String>() { // from class: org.apache.beam.runners.spark.SparkPipelineStateTest.2
                public String apply(String str) {
                    throw new MyCustomException(SparkPipelineStateTest.FAILED_THE_BATCH_INTENTIONALLY);
                }
            }));
            sparkPipelineResult = (SparkPipelineResult) create.run();
            sparkPipelineResult.waitUntilFinish();
            Assert.fail("An injected failure did not affect the pipeline as expected.");
        } catch (Exception e) {
            Assert.assertThat(e, CoreMatchers.instanceOf(Pipeline.PipelineExecutionException.class));
            Assert.assertThat(e.getCause(), CoreMatchers.instanceOf(MyCustomException.class));
            Assert.assertThat(e.getCause().getMessage(), Matchers.is(FAILED_THE_BATCH_INTENTIONALLY));
            Assert.assertThat(sparkPipelineResult.getState(), Matchers.is(PipelineResult.State.FAILED));
            sparkPipelineResult.cancel();
        }
    }

    private void testTimeoutPipeline(SparkPipelineOptions sparkPipelineOptions) throws Exception {
        SparkPipelineResult run = getPipeline(sparkPipelineOptions).run();
        run.waitUntilFinish(Duration.millis(1L));
        Assert.assertThat(run.getState(), Matchers.is(PipelineResult.State.RUNNING));
        run.cancel();
    }

    private void testCanceledPipeline(SparkPipelineOptions sparkPipelineOptions) throws Exception {
        SparkPipelineResult run = getPipeline(sparkPipelineOptions).run();
        run.cancel();
        Assert.assertThat(run.getState(), Matchers.is(PipelineResult.State.CANCELLED));
    }

    private void testRunningPipeline(SparkPipelineOptions sparkPipelineOptions) throws Exception {
        SparkPipelineResult run = getPipeline(sparkPipelineOptions).run();
        Assert.assertThat(run.getState(), Matchers.is(PipelineResult.State.RUNNING));
        run.cancel();
    }

    @Test
    public void testStreamingPipelineRunningState() throws Exception {
        testRunningPipeline(getStreamingOptions());
    }

    @Test
    public void testBatchPipelineRunningState() throws Exception {
        testRunningPipeline(getBatchOptions());
    }

    @Test
    public void testStreamingPipelineCanceledState() throws Exception {
        testCanceledPipeline(getStreamingOptions());
    }

    @Test
    public void testBatchPipelineCanceledState() throws Exception {
        testCanceledPipeline(getBatchOptions());
    }

    @Test
    public void testStreamingPipelineFailedState() throws Exception {
        testFailedPipeline(getStreamingOptions());
    }

    @Test
    public void testBatchPipelineFailedState() throws Exception {
        testFailedPipeline(getBatchOptions());
    }

    @Test
    public void testStreamingPipelineTimeoutState() throws Exception {
        testTimeoutPipeline(getStreamingOptions());
    }

    @Test
    public void testBatchPipelineTimeoutState() throws Exception {
        testTimeoutPipeline(getBatchOptions());
    }
}
