package org.apache.beam.runners.spark;

import java.util.Collections;
import org.apache.beam.runners.spark.examples.WordCount;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.Distinct;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.transforms.WithKeys;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.joda.time.Duration;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/runners/spark/SparkRunnerDebuggerTest.class */
public class SparkRunnerDebuggerTest {

    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunnerDebuggerTest$ArbitraryKeyFunction.class */
    private static class ArbitraryKeyFunction implements SerializableFunction<String, String> {
        private ArbitraryKeyFunction() {
        }

        public String apply(String str) {
            return "someKey";
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunnerDebuggerTest$FormatKVFn.class */
    private static class FormatKVFn extends DoFn<KV<String, String>, String> {
        private FormatKVFn() {
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext) {
            processContext.output(((String) ((KV) processContext.element()).getKey()) + "," + ((String) ((KV) processContext.element()).getValue()));
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/spark/SparkRunnerDebuggerTest$PlusOne.class */
    private static class PlusOne extends SimpleFunction<KV<String, Long>, KV<String, Long>> {
        private PlusOne() {
        }

        public KV<String, Long> apply(KV<String, Long> kv) {
            return KV.of((String) kv.getKey(), Long.valueOf(((Long) kv.getValue()).longValue() + 1));
        }
    }

    @Test
    public void debugBatchPipeline() {
        PipelineOptions as = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
        as.setRunner(SparkRunnerDebugger.class);
        Pipeline create = Pipeline.create(as);
        PCollection apply = create.apply(Create.of(Collections.emptyList()).withCoder(StringUtf8Coder.of())).apply(new WordCount.CountWords());
        apply.apply(GroupByKey.create()).apply(Combine.groupedValues(Sum.ofLongs()));
        PCollectionList.of(apply).and(apply.apply(MapElements.via(new PlusOne()))).apply(Flatten.pCollections());
        apply.apply(MapElements.via(new WordCount.FormatAsTextFn())).apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
        MatcherAssert.assertThat("Debug pipeline did not equal expected", create.run().getDebugString(), Matchers.equalTo("_.<org.apache.beam.sdk.io.Read$Bounded>\n_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n_.combineByKey(..., new org.apache.beam.sdk.transforms.Count$CountFn(), ...)\n_.groupByKey()\n_.map(new org.apache.beam.sdk.transforms.Sum$SumLongFn())\n_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\nsparkContext.union(...)\n_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n_.<org.apache.beam.sdk.io.TextIO$Write>"));
    }

    @Test
    public void debugStreamingPipeline() {
        TestSparkPipelineOptions as = PipelineOptionsFactory.create().as(TestSparkPipelineOptions.class);
        as.setForceStreaming(true);
        as.setRunner(SparkRunnerDebugger.class);
        Pipeline create = Pipeline.create(as);
        KafkaIO.Read withValueDeserializer = KafkaIO.read().withBootstrapServers("mykafka:9092").withTopics(Collections.singletonList("my_input_topic")).withKeyDeserializer(StringDeserializer.class).withValueDeserializer(StringDeserializer.class);
        create.apply(withValueDeserializer.withoutMetadata()).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply(Window.into(FixedWindows.of(Duration.standardSeconds(5L)))).apply(ParDo.of(new FormatKVFn())).apply(Distinct.create()).apply(WithKeys.of(new ArbitraryKeyFunction())).apply(KafkaIO.write().withBootstrapServers("myotherkafka:9092").withTopic("my_output_topic").withKeySerializer(StringSerializer.class).withValueSerializer(StringSerializer.class));
        MatcherAssert.assertThat("Debug pipeline did not equal expected", create.run().getDebugString(), Matchers.equalTo("KafkaUtils.createDirectStream(...)\n_.map(new org.apache.beam.sdk.transforms.windowing.FixedWindows())\n_.mapPartitions(new org.apache.beam.runners.spark.SparkRunnerDebuggerTest$FormatKVFn())\n_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n_.groupByKey()\n_.map(new org.apache.beam.sdk.transforms.Combine$IterableCombineFn())\n_.mapPartitions(new org.apache.beam.sdk.transforms.Distinct$3())\n_.mapPartitions(new org.apache.beam.sdk.transforms.Contextful())\n_.<org.apache.beam.sdk.io.kafka.AutoValue_KafkaIO_Write>"));
    }
}
