package org.apache.beam.runners.spark.coders;

import com.esotericsoftware.kryo.Kryo;
import org.apache.beam.runners.spark.SparkContextOptions;
import org.apache.beam.runners.spark.SparkPipelineOptions;
import org.apache.beam.runners.spark.TestSparkPipelineOptions;
import org.apache.beam.runners.spark.TestSparkRunner;
import org.apache.beam.runners.spark.io.MicrobatchSource;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest.class */
public class BeamSparkRunnerRegistratorTest {

    /* loaded from: input_file:org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest$KryoRegistratorIsNotCalled.class */
    public static class KryoRegistratorIsNotCalled extends BeamSparkRunnerRegistrator {
        public void registerClasses(Kryo kryo) {
            Assert.fail("Default spark.serializer is JavaSerializer so spark.kryo.registrator shouldn't be called");
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/coders/BeamSparkRunnerRegistratorTest$WrapperKryoRegistrator.class */
    public static class WrapperKryoRegistrator extends BeamSparkRunnerRegistrator {
        static boolean wasInitiated = false;

        public WrapperKryoRegistrator() {
            wasInitiated = true;
        }

        public void registerClasses(Kryo kryo) {
            super.registerClasses(kryo);
            Assert.assertTrue(kryo.getRegistration(MicrobatchSource.class).getSerializer() instanceof StatelessJavaSerializer);
        }
    }

    @Test
    public void testKryoRegistration() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.set("spark.kryo.registrator", WrapperKryoRegistrator.class.getName());
        runSimplePipelineWithSparkContext(sparkConf);
        Assert.assertTrue("WrapperKryoRegistrator wasn't initiated, probably KryoSerializer is not set", WrapperKryoRegistrator.wasInitiated);
    }

    @Test
    public void testDefaultSerializerNotCallingKryo() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.kryo.registrator", KryoRegistratorIsNotCalled.class.getName());
        runSimplePipelineWithSparkContext(sparkConf);
    }

    private void runSimplePipelineWithSparkContext(SparkConf sparkConf) {
        SparkPipelineOptions as = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
        as.setRunner(TestSparkRunner.class);
        sparkConf.set("spark.master", "local");
        sparkConf.setAppName("test");
        JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
        as.setUsesProvidedSparkContext(true);
        as.as(SparkContextOptions.class).setProvidedSparkContext(javaSparkContext);
        Pipeline create = Pipeline.create(as);
        create.apply(Create.of("a", new String[0]));
        create.run().waitUntilFinish();
        javaSparkContext.stop();
    }
}
