/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sinks;

import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pulsar.tests.integration.io.sinks.SinkTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap;
import org.testng.Assert;

public class KafkaSinkTester
extends SinkTester<KafkaContainer> {
    private static final Logger log = LoggerFactory.getLogger(KafkaSinkTester.class);
    private final String kafkaTopicName;
    private KafkaConsumer<String, String> kafkaConsumer;
    private final String containerName;

    public KafkaSinkTester(String containerName) {
        super(containerName, SinkTester.SinkType.KAFKA);
        this.containerName = containerName;
        String suffix = PulsarTestBase.randomName(8) + "_" + System.currentTimeMillis();
        this.kafkaTopicName = "kafka_sink_topic_" + suffix;
        this.sinkConfig.put("bootstrapServers", this.networkAlias + ":9092");
        this.sinkConfig.put("acks", "all");
        this.sinkConfig.put("batchSize", 1L);
        this.sinkConfig.put("maxRequestSize", 0x100000L);
        this.sinkConfig.put("topic", this.kafkaTopicName);
    }

    @Override
    protected KafkaContainer createSinkService(PulsarCluster cluster) {
        return (KafkaContainer)((KafkaContainer)new KafkaContainer().withEmbeddedZookeeper().withNetworkAliases(new String[]{this.containerName})).withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName(this.containerName).withHostName(cluster.getClusterName() + "-" + this.containerName));
    }

    @Override
    public void prepareSink() throws Exception {
        Container.ExecResult execResult = ((KafkaContainer)this.serviceContainer).execInContainer(new String[]{"/usr/bin/kafka-topics", "--create", "--zookeeper", "localhost:2181", "--partitions", "1", "--replication-factor", "1", "--topic", this.kafkaTopicName});
        Assert.assertTrue((boolean)execResult.getStdout().contains("Created topic"), (String)execResult.getStdout());
        this.kafkaConsumer = new KafkaConsumer((Map)ImmutableMap.of((Object)"bootstrap.servers", (Object)((KafkaContainer)this.serviceContainer).getBootstrapServers(), (Object)"group.id", (Object)("sink-test-" + PulsarTestBase.randomName(8)), (Object)"auto.offset.reset", (Object)"earliest"), (Deserializer)new StringDeserializer(), (Deserializer)new StringDeserializer());
        this.kafkaConsumer.subscribe(Arrays.asList(this.kafkaTopicName));
        log.info("Successfully subscribe to kafka topic {}", (Object)this.kafkaTopicName);
    }

    @Override
    public void validateSinkResult(Map<String, String> kvs) {
        Iterator<Map.Entry<String, String>> kvIter = kvs.entrySet().iterator();
        while (kvIter.hasNext()) {
            ConsumerRecords records = this.kafkaConsumer.poll(Duration.ofSeconds(1L));
            log.info("Received {} records from kafka topic {}", (Object)records.count(), (Object)this.kafkaTopicName);
            if (records.isEmpty()) continue;
            Iterator recordsIter = records.iterator();
            while (recordsIter.hasNext() && kvIter.hasNext()) {
                ConsumerRecord consumerRecord = (ConsumerRecord)recordsIter.next();
                Map.Entry<String, String> expectedRecord = kvIter.next();
                Assert.assertEquals((String)expectedRecord.getKey(), (String)((String)consumerRecord.key()));
                Assert.assertEquals((String)expectedRecord.getValue(), (String)((String)consumerRecord.value()));
            }
        }
    }
}

