/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testng.Assert;

public class KafkaSourceTester
extends SourceTester<KafkaContainer> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSourceTester.class);
    private static final String SOURCE_TYPE = "kafka";
    private final String kafkaTopicName;
    private KafkaContainer kafkaContainer;
    private KafkaConsumer<String, String> kafkaConsumer;

    public KafkaSourceTester(String containerName) {
        super(SOURCE_TYPE);
        String suffix = PulsarClusterTestBase.randomName(8) + "_" + System.currentTimeMillis();
        this.kafkaTopicName = "kafka_source_topic_" + suffix;
        this.sourceConfig.put("bootstrapServers", containerName + ":9092");
        this.sourceConfig.put("groupId", "test-source-group");
        this.sourceConfig.put("fetchMinBytes", 1L);
        this.sourceConfig.put("autoCommitIntervalMs", 10L);
        this.sourceConfig.put("sessionTimeoutMs", 10000L);
        this.sourceConfig.put("heartbeatIntervalMs", 5000L);
        this.sourceConfig.put("topic", this.kafkaTopicName);
        this.sourceConfig.put("valueDeserializationClass", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    }

    @Override
    public void setServiceContainer(KafkaContainer container) {
        this.kafkaContainer = container;
    }

    @Override
    public void prepareSource() throws Exception {
        Container.ExecResult execResult = this.kafkaContainer.execInContainer(new String[]{"/usr/bin/kafka-topics", "--create", "--zookeeper", "localhost:2181", "--partitions", "1", "--replication-factor", "1", "--topic", this.kafkaTopicName});
        Assert.assertTrue((boolean)execResult.getStdout().contains("Created topic"), (String)execResult.getStdout());
        this.kafkaConsumer = new KafkaConsumer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)this.kafkaContainer.getBootstrapServers(), (Object)"group.id", (Object)("source-test-" + PulsarClusterTestBase.randomName(8)), (Object)"auto.offset.reset", (Object)"earliest"), (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
        this.kafkaConsumer.subscribe(Arrays.asList(this.kafkaTopicName));
        log.info("Successfully subscribe to kafka topic {}", (Object)this.kafkaTopicName);
    }

    @Override
    public void prepareInsertEvent() throws Exception {
    }

    @Override
    public void prepareDeleteEvent() throws Exception {
    }

    @Override
    public void prepareUpdateEvent() throws Exception {
    }

    @Override
    public Map<String, String> produceSourceMessages(int numMessages) throws Exception {
        KafkaProducer producer = new KafkaProducer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)this.kafkaContainer.getBootstrapServers(), (Object)"client.id", (Object)UUID.randomUUID().toString()), (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
        LinkedHashMap<String, String> kvs = new LinkedHashMap<String, String>();
        for (int i = 0; i < numMessages; ++i) {
            String key = "key-" + i;
            String value = "value-" + i;
            ProducerRecord record = new ProducerRecord(this.kafkaTopicName, (Object)key, (Object)value);
            kvs.put(key, value);
            producer.send(record).get();
        }
        log.info("Successfully produced {} messages to kafka topic {}", (Object)numMessages, (Object)this.kafkaTopicName);
        return kvs;
    }
}

