package org.apache.beam.runners.spark.translation.streaming;

import com.google.common.collect.ImmutableMap;
import java.io.IOException;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import kafka.serializer.StringDecoder;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.runners.spark.SparkStreamingPipelineOptions;
import org.apache.beam.runners.spark.io.KafkaIO;
import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
import org.apache.beam.runners.spark.translation.streaming.utils.PAssertStreaming;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.KV;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.joda.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest.class */
public class KafkaStreamingTest {
    private static final String TOPIC = "kafka_dataflow_test_topic";
    private static final long TEST_TIMEOUT_MSEC = 1000;
    private static final EmbeddedKafkaCluster.EmbeddedZookeeper EMBEDDED_ZOOKEEPER = new EmbeddedKafkaCluster.EmbeddedZookeeper(17001);
    private static final EmbeddedKafkaCluster EMBEDDED_KAFKA_CLUSTER = new EmbeddedKafkaCluster(EMBEDDED_ZOOKEEPER.getConnection(), new Properties(), Collections.singletonList(6667));
    private static final Map<String, String> KAFKA_MESSAGES = ImmutableMap.of("k1", "v1", "k2", "v2", "k3", "v3", "k4", "v4");
    private static final String[] EXPECTED = {"k1,v1", "k2,v2", "k3,v3", "k4,v4"};

    /* loaded from: input_file:org/apache/beam/runners/spark/translation/streaming/KafkaStreamingTest$FormatKVFn.class */
    private static class FormatKVFn extends DoFn<KV<String, String>, String> {
        private FormatKVFn() {
        }

        public void processElement(DoFn<KV<String, String>, String>.ProcessContext processContext) {
            processContext.output(((String) ((KV) processContext.element()).getKey()) + "," + ((String) ((KV) processContext.element()).getValue()));
        }
    }

    @BeforeClass
    public static void init() throws IOException {
        EMBEDDED_ZOOKEEPER.startup();
        EMBEDDED_KAFKA_CLUSTER.startup();
        Properties properties = new Properties();
        properties.putAll(EMBEDDED_KAFKA_CLUSTER.getProps());
        properties.put("request.required.acks", 1);
        properties.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
        StringSerializer stringSerializer = new StringSerializer();
        KafkaProducer kafkaProducer = new KafkaProducer(properties, stringSerializer, stringSerializer);
        Throwable th = null;
        try {
            try {
                for (Map.Entry<String, String> entry : KAFKA_MESSAGES.entrySet()) {
                    kafkaProducer.send(new ProducerRecord(TOPIC, entry.getKey(), entry.getValue()));
                }
                if (kafkaProducer != null) {
                    if (0 == 0) {
                        kafkaProducer.close();
                        return;
                    }
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (kafkaProducer != null) {
                if (th != null) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testRun() throws Exception {
        SparkStreamingPipelineOptions as = PipelineOptionsFactory.as(SparkStreamingPipelineOptions.class);
        as.setRunner(SparkRunner.class);
        as.setStreaming(true);
        as.setTimeout(Long.valueOf(TEST_TIMEOUT_MSEC));
        Pipeline create = Pipeline.create(as);
        PAssertStreaming.assertContents(create.apply(KafkaIO.Read.from(StringDecoder.class, StringDecoder.class, String.class, String.class, Collections.singleton(TOPIC), ImmutableMap.of("metadata.broker.list", EMBEDDED_KAFKA_CLUSTER.getBrokerList(), "auto.offset.reset", "smallest"))).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply(Window.into(FixedWindows.of(Duration.standardSeconds(1L)))).apply(ParDo.of(new FormatKVFn())), EXPECTED);
        SparkRunner.create(as).run(create).close();
    }

    @AfterClass
    public static void tearDown() {
        EMBEDDED_KAFKA_CLUSTER.shutdown();
        EMBEDDED_ZOOKEEPER.shutdown();
    }
}
