package org.apache.beam.runners.spark;

import java.io.Serializable;
import javax.annotation.Nullable;
import org.apache.beam.runners.spark.io.CreateStream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineStateTest.class */
public class SparkPipelineStateTest implements Serializable {

    @ClassRule
    public static SparkContextRule contextRule = new SparkContextRule(new KV[0]);
    private static final String FAILED_THE_BATCH_INTENTIONALLY = "Failed the batch intentionally";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineStateTest$CustomException.class */
    public static class CustomException extends RuntimeException {
        CustomException(String str) {
            super(str);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/spark/SparkPipelineStateTest$FailAlways.class */
    public static class FailAlways extends SimpleFunction<String, String> {
        private FailAlways() {
        }

        public String apply(String str) {
            throw new CustomException(SparkPipelineStateTest.FAILED_THE_BATCH_INTENTIONALLY);
        }
    }

    private Pipeline createPipeline(boolean z, @Nullable SimpleFunction<String, String> simpleFunction) {
        SparkContextOptions createPipelineOptions = contextRule.createPipelineOptions();
        createPipelineOptions.setRunner(SparkRunner.class);
        createPipelineOptions.setStreaming(z);
        Pipeline create = Pipeline.create(createPipelineOptions);
        PCollection coder = create.apply(z ? CreateStream.of(StringUtf8Coder.of(), Duration.millis(1L), false).nextBatch(new String[]{"one", "two"}) : Create.of("one", new String[]{"two"})).setCoder(StringUtf8Coder.of());
        if (simpleFunction != null) {
            coder.apply(MapElements.via(simpleFunction));
        }
        return create;
    }

    private void testFailedPipeline(boolean z) throws Exception {
        SparkPipelineResult run = createPipeline(z, new FailAlways()).run();
        Pipeline.PipelineExecutionException assertThrows = Assert.assertThrows(Pipeline.PipelineExecutionException.class, () -> {
            run.waitUntilFinish();
        });
        MatcherAssert.assertThat(assertThrows.getCause(), CoreMatchers.instanceOf(CustomException.class));
        MatcherAssert.assertThat(assertThrows.getCause().getMessage(), Matchers.is(FAILED_THE_BATCH_INTENTIONALLY));
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.FAILED));
        run.cancel();
    }

    private void testWaitUntilFinishedTimeout(boolean z) throws Exception {
        SparkPipelineResult run = createPipeline(z, null).run();
        run.waitUntilFinish(Duration.millis(1L));
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.RUNNING));
        run.cancel();
    }

    private void testCanceledPipeline(boolean z) throws Exception {
        SparkPipelineResult run = createPipeline(z, null).run();
        run.cancel();
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.CANCELLED));
    }

    private void testRunningPipeline(boolean z) throws Exception {
        SparkPipelineResult run = createPipeline(z, null).run();
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.RUNNING));
        run.cancel();
    }

    @Test
    public void testStreamingPipelineRunningState() throws Exception {
        testRunningPipeline(true);
    }

    @Test
    public void testBatchPipelineRunningState() throws Exception {
        testRunningPipeline(false);
    }

    @Test
    public void testStreamingPipelineCanceledState() throws Exception {
        testCanceledPipeline(true);
    }

    @Test
    public void testBatchPipelineCanceledState() throws Exception {
        testCanceledPipeline(false);
    }

    @Test
    public void testStreamingPipelineFailedState() throws Exception {
        testFailedPipeline(true);
    }

    @Test
    public void testBatchPipelineFailedState() throws Exception {
        testFailedPipeline(false);
    }

    @Test
    public void testStreamingPipelineWaitTimeout() throws Exception {
        testWaitUntilFinishedTimeout(true);
    }

    @Test
    public void testBatchPipelineWaitTimeout() throws Exception {
        testWaitUntilFinishedTimeout(false);
    }
}
