/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources;

import com.google.common.collect.ImmutableMap;
import com.google.gson.Gson;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.schema.Field;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.common.policies.data.SourceStatusUtil;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.functions.PulsarFunctionsTestBase;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.KafkaContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.containers.wait.strategy.WaitStrategy;
import org.testcontainers.images.builder.Transferable;
import org.testcontainers.utility.DockerImageName;
import org.testng.Assert;
import org.testng.annotations.Test;

public class AvroKafkaSourceTest
extends PulsarFunctionsTestBase {
    private static final Logger log = LoggerFactory.getLogger(AvroKafkaSourceTest.class);
    public static final String CONFLUENT_PLATFORM_VERSION = System.getProperty("confluent.version", "6.2.8");
    private static final String SOURCE_TYPE = "kafka";
    private final String kafkaTopicName = "kafkasourcetopic";
    private EnhancedKafkaContainer kafkaContainer;
    private SchemaRegistryContainer schemaRegistryContainer;
    protected final Map<String, Object> sourceConfig = new HashMap<String, Object>();
    protected final String kafkaContainerName = "kafkacontainer";
    protected final String schemaRegistryContainerName = "schemaregistry";

    @Test(groups={"source"})
    public void test() throws Exception {
        this.startKafkaContainers(this.pulsarCluster);
        try {
            this.testSource();
        }
        finally {
            this.stopKafkaContainers();
        }
    }

    private String getBootstrapServersOnDockerNetwork() {
        return "kafkacontainer:9093";
    }

    public void startKafkaContainers(PulsarCluster cluster) throws Exception {
        this.kafkaContainer = this.createKafkaContainer(cluster);
        cluster.startService("kafkacontainer", (GenericContainer<?>)this.kafkaContainer);
        log.info("creating schema registry kafka {}", (Object)this.getBootstrapServersOnDockerNetwork());
        this.schemaRegistryContainer = new SchemaRegistryContainer(this.getBootstrapServersOnDockerNetwork());
        cluster.startService("schemaregistry", this.schemaRegistryContainer);
        this.sourceConfig.put("bootstrapServers", this.getBootstrapServersOnDockerNetwork());
        this.sourceConfig.put("groupId", "test-source-group");
        this.sourceConfig.put("fetchMinBytes", 1L);
        this.sourceConfig.put("autoCommitIntervalMs", 10L);
        this.sourceConfig.put("sessionTimeoutMs", 10000L);
        this.sourceConfig.put("heartbeatIntervalMs", 5000L);
        this.sourceConfig.put("topic", "kafkasourcetopic");
        this.sourceConfig.put("valueDeserializationClass", "io.confluent.kafka.serializers.KafkaAvroDeserializer");
        this.sourceConfig.put("consumerConfigProperties", ImmutableMap.of((Object)"schema.registry.url", (Object)this.getRegistryAddressInDockerNetwork()));
    }

    protected EnhancedKafkaContainer createKafkaContainer(PulsarCluster cluster) {
        return (EnhancedKafkaContainer)new EnhancedKafkaContainer(DockerImageName.parse((String)("confluentinc/cp-kafka:" + CONFLUENT_PLATFORM_VERSION))).withEmbeddedZookeeper().withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd.withName("kafkacontainer"));
    }

    public void stopKafkaContainers() {
        if (null != this.schemaRegistryContainer) {
            PulsarCluster.stopService("schemaregistry", this.schemaRegistryContainer);
        }
        if (null != this.kafkaContainer) {
            PulsarCluster.stopService("kafkacontainer", this.kafkaContainer);
        }
    }

    public void prepareSource() throws Exception {
        log.info("creating topic");
        Container.ExecResult execResult = this.kafkaContainer.execInContainer(new String[]{"/usr/bin/kafka-topics", "--create", "--zookeeper", this.getZooKeeperAddressInDockerNetwork(), "--partitions", "1", "--replication-factor", "1", "--topic", "kafkasourcetopic"});
        Assert.assertTrue((boolean)execResult.getStdout().contains("Created topic"), (String)execResult.getStdout());
    }

    private String getZooKeeperAddressInDockerNetwork() {
        return "kafkacontainer:2181";
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testSource() throws Exception {
        String tenant = "public";
        String namespace = "default";
        String outputTopicName = "test-source-connector-" + this.functionRuntimeType + "-output-topic-" + AvroKafkaSourceTest.randomName(8);
        String sourceName = "test-source-connector-" + this.functionRuntimeType + "-name-" + AvroKafkaSourceTest.randomName(8);
        int numMessages = 10;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
            try {
                admin.topics().createNonPartitionedTopic(outputTopicName);
                Consumer consumer = client.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{outputTopicName}).subscriptionName("sourcetester").subscribe();
                try {
                    this.prepareSource();
                    this.submitSourceConnector("public", "default", sourceName, outputTopicName);
                    this.getSourceInfoSuccess("public", "default", sourceName);
                    Awaitility.with().timeout(Duration.ofMinutes(1L)).pollInterval(Duration.ofSeconds(10L)).until(() -> {
                        try {
                            this.getSourceStatus("public", "default", sourceName);
                            return true;
                        }
                        catch (Throwable ex) {
                            log.error("Error while getting source status, will retry", ex);
                            return false;
                        }
                    });
                    List<MyBean> messages = this.produceSourceMessages(10);
                    Awaitility.with().timeout(Duration.ofMinutes(1L)).pollInterval(Duration.ofSeconds(10L)).until(() -> {
                        try {
                            this.waitForProcessingSourceMessages("public", "default", sourceName, 10);
                            return true;
                        }
                        catch (Throwable ex) {
                            log.error("Error while processing source messages, will retry", ex);
                            return false;
                        }
                    });
                    this.validateSourceResultAvro((Consumer<GenericRecord>)consumer, messages);
                    this.deleteSource("public", "default", sourceName);
                    this.getSourceInfoNotFound("public", "default", sourceName);
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    public void validateSourceResultAvro(Consumer<GenericRecord> consumer, List<MyBean> beans) throws Exception {
        int recordsNumber = 0;
        Message msg = consumer.receive(10, TimeUnit.SECONDS);
        while (msg != null) {
            GenericRecord valueRecord = (GenericRecord)msg.getValue();
            Assert.assertNotNull((Object)valueRecord.getFields());
            Assert.assertTrue((valueRecord.getFields().size() > 0 ? 1 : 0) != 0);
            for (Field field : valueRecord.getFields()) {
                log.info("field {} value {}", (Object)field, valueRecord.getField(field));
            }
            Assert.assertEquals((Object)beans.get((int)recordsNumber).field, (Object)valueRecord.getField("field"));
            consumer.acknowledge(msg);
            ++recordsNumber;
            msg = consumer.receive(10, TimeUnit.SECONDS);
        }
        Assert.assertEquals((int)recordsNumber, (int)beans.size());
        log.info("Stop {} server container. topic: {} has {} records.", new Object[]{"kafkacontainer", consumer.getTopic(), recordsNumber});
    }

    protected void getSourceInfoSuccess(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "get", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get source info : {}", (Object)result.getStdout());
        Assert.assertTrue((boolean)result.getStdout().contains("\"archive\": \"builtin://kafka\""), (String)result.getStdout());
    }

    protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get source status : {}", (Object)result.getStdout());
        Assert.assertEquals((long)result.getExitCode(), (long)0L);
        SourceStatus sourceStatus = SourceStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)sourceStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)sourceStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)sourceStatus.getInstances().size(), (int)1);
        Assert.assertEquals((boolean)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    protected void submitSourceConnector(String tenant, String namespace, String sourceName, String outputTopicName) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "create", "--tenant", tenant, "--namespace", namespace, "--name", sourceName, "--source-type", SOURCE_TYPE, "--sourceConfig", new Gson().toJson(this.sourceConfig), "--destinationTopicName", outputTopicName};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Created successfully"), (String)result.getStdout());
    }

    public List<MyBean> produceSourceMessages(int numMessages) throws Exception {
        org.apache.avro.Schema schema = ReflectData.get().getSchema(MyBean.class);
        String schemaDef = schema.toString(false);
        log.info("schema {}", (Object)schemaDef);
        ArrayList<MyBean> written = new ArrayList<MyBean>();
        StringBuilder payload = new StringBuilder();
        for (int i = 0; i < numMessages; ++i) {
            MyBean bean = new MyBean();
            bean.setField("value" + i);
            String serialized = AvroKafkaSourceTest.serializeBeanUsingAvro(schema, bean);
            payload.append(serialized);
            if (i != numMessages - 1) {
                payload.append("\n");
            }
            written.add(bean);
        }
        String bashFileTemplate = "echo '" + payload + "' | /usr/bin/kafka-avro-console-producer --broker-list " + this.getBootstrapServersOnDockerNetwork() + " --property 'value.schema=" + schemaDef + "' --property schema.registry.url=" + this.getRegistryAddressInDockerNetwork() + " --topic kafkasourcetopic";
        String file = "/home/appuser/produceRecords.sh";
        this.schemaRegistryContainer.copyFileToContainer(Transferable.of((byte[])bashFileTemplate.getBytes(StandardCharsets.UTF_8), (int)511), file);
        Container.ExecResult cat = this.schemaRegistryContainer.execInContainer(new String[]{"cat", file});
        log.info("cat results: " + cat.getStdout());
        log.info("cat stderr: " + cat.getStderr());
        Container.ExecResult execResult = this.schemaRegistryContainer.execInContainer(new String[]{"/bin/bash", file});
        log.info("script results: " + execResult.getStdout());
        log.info("script stderr: " + execResult.getStderr());
        Assert.assertTrue((boolean)execResult.getStdout().contains("Closing the Kafka producer"), (String)(execResult.getStdout() + " " + execResult.getStderr()));
        Assert.assertTrue((boolean)execResult.getStderr().isEmpty(), (String)execResult.getStderr());
        log.info("Successfully produced {} messages to kafka topic {}", (Object)numMessages, (Object)"kafkasourcetopic");
        return written;
    }

    private static String serializeBeanUsingAvro(org.apache.avro.Schema schema, MyBean bean) throws IOException {
        ReflectDatumWriter userDatumWriter = new ReflectDatumWriter(schema);
        ByteArrayOutputStream stream = new ByteArrayOutputStream();
        JsonEncoder encoder = EncoderFactory.get().jsonEncoder(schema, (OutputStream)stream);
        userDatumWriter.write((Object)bean, (Encoder)encoder);
        encoder.flush();
        String serialized = new String(stream.toByteArray(), StandardCharsets.UTF_8);
        return serialized;
    }

    protected void waitForProcessingSourceMessages(String tenant, String namespace, String sourceName, int numMessages) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get source status : {}", (Object)result.getStdout());
        Assert.assertEquals((long)result.getExitCode(), (long)0L);
        SourceStatus sourceStatus = SourceStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)sourceStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)sourceStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)sourceStatus.getInstances().size(), (int)1);
        Assert.assertEquals((int)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((boolean)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertTrue((((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getLastReceivedTime() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumReceivedFromSource(), (long)numMessages);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumWritten(), (long)numMessages);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
    }

    protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "delete", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Delete source successfully"), (String)result.getStdout());
        Assert.assertTrue((boolean)result.getStderr().isEmpty(), (String)result.getStderr());
    }

    protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "get", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        try {
            this.pulsarCluster.getAnyWorker().execCmd(commands);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
        }
    }

    private String getRegistryAddressInDockerNetwork() {
        return "http://schemaregistry:8081";
    }

    private class EnhancedKafkaContainer
    extends KafkaContainer {
        public EnhancedKafkaContainer(DockerImageName dockerImageName) {
            super(dockerImageName);
        }

        public String getBootstrapServers() {
            return "PLAINTEXT://kafkacontainer:9093";
        }
    }

    public class SchemaRegistryContainer
    extends GenericContainer<SchemaRegistryContainer> {
        private static final int SCHEMA_REGISTRY_INTERNAL_PORT = 8081;

        public SchemaRegistryContainer(String boostrapServers) throws Exception {
            super("confluentinc/cp-schema-registry:" + CONFLUENT_PLATFORM_VERSION);
            this.addEnv("SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS", boostrapServers);
            this.addEnv("SCHEMA_REGISTRY_HOST_NAME", "schemaregistry");
            this.withExposedPorts(new Integer[]{8081});
            this.withLogConsumer(o -> log.info("schemaregistry> {}", (Object)o.getUtf8String()));
            this.waitingFor((WaitStrategy)Wait.forHttp((String)"/subjects"));
        }
    }

    public static final class MyBean {
        private String field;

        public String getField() {
            return this.field;
        }

        public void setField(String field) {
            this.field = field;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof MyBean)) {
                return false;
            }
            MyBean other = (MyBean)o;
            String this$field = this.getField();
            String other$field = other.getField();
            return !(this$field == null ? other$field != null : !this$field.equals(other$field));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $field = this.getField();
            result = result * 59 + ($field == null ? 43 : $field.hashCode());
            return result;
        }

        public String toString() {
            return "AvroKafkaSourceTest.MyBean(field=" + this.getField() + ")";
        }
    }
}

