package org.apache.beam.runners.spark;

import com.google.common.collect.ImmutableSet;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/ProvidedSparkContextTest.class */
public class ProvidedSparkContextTest {
    private static final String[] WORDS_ARRAY;
    private static final List<String> WORDS;
    private static final Set<String> EXPECTED_COUNT_SET;
    private static final String PROVIDED_CONTEXT_EXCEPTION = "The provided Spark context was not created or was stopped";
    static final /* synthetic */ boolean $assertionsDisabled;

    @Test
    public void testWithProvidedContext() throws Exception {
        JavaSparkContext javaSparkContext = new JavaSparkContext("local[*]", "Existing_Context");
        testWithValidProvidedContext(javaSparkContext);
        Assert.assertFalse(javaSparkContext.sc().isStopped());
        javaSparkContext.stop();
    }

    @Test
    public void testWithNullContext() throws Exception {
        testWithInvalidContext(null);
    }

    @Test
    public void testWithStoppedProvidedContext() throws Exception {
        JavaSparkContext javaSparkContext = new JavaSparkContext("local[*]", "Existing_Context");
        javaSparkContext.stop();
        testWithInvalidContext(javaSparkContext);
    }

    private void testWithValidProvidedContext(JavaSparkContext javaSparkContext) throws Exception {
        Pipeline create = Pipeline.create(getSparkContextOptions(javaSparkContext));
        PAssert.that(create.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_COUNT_SET);
        create.run().waitUntilFinish();
    }

    private void testWithInvalidContext(JavaSparkContext javaSparkContext) {
        Pipeline create = Pipeline.create(getSparkContextOptions(javaSparkContext));
        PAssert.that(create.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of())).apply(new WordCount.CountWords()).apply(MapElements.via(new WordCount.FormatAsTextFn()))).containsInAnyOrder(EXPECTED_COUNT_SET);
        try {
            create.run().waitUntilFinish();
            Assert.fail("Should throw an exception when The provided Spark context is null or stopped");
        } catch (RuntimeException e) {
            if (!$assertionsDisabled && !e.getMessage().contains(PROVIDED_CONTEXT_EXCEPTION)) {
                throw new AssertionError();
            }
        }
    }

    private static SparkContextOptions getSparkContextOptions(JavaSparkContext javaSparkContext) {
        SparkContextOptions as = PipelineOptionsFactory.as(SparkContextOptions.class);
        as.setRunner(TestSparkRunner.class);
        as.setUsesProvidedSparkContext(true);
        as.setProvidedSparkContext(javaSparkContext);
        as.setEnableSparkMetricSinks(false);
        return as;
    }

    static {
        $assertionsDisabled = !ProvidedSparkContextTest.class.desiredAssertionStatus();
        WORDS_ARRAY = new String[]{"hi there", "hi", "hi sue bob", "hi sue", "", "bob hi"};
        WORDS = Arrays.asList(WORDS_ARRAY);
        EXPECTED_COUNT_SET = ImmutableSet.of("hi: 5", "there: 1", "sue: 2", "bob: 2");
    }
}
