/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources;

import com.google.gson.Gson;
import java.util.Collections;
import java.util.Map;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.Policy;
import net.jodah.failsafe.RetryPolicy;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.common.policies.data.SourceStatusUtil;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.io.PulsarIOTestRunner;
import org.apache.pulsar.tests.integration.io.sources.SourceTester;
import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
import org.apache.pulsar.tests.integration.topologies.PulsarTestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.GenericContainer;
import org.testng.Assert;

public class PulsarIOSourceRunner
extends PulsarIOTestRunner {
    private static final Logger log = LoggerFactory.getLogger(PulsarIOSourceRunner.class);

    public PulsarIOSourceRunner(PulsarCluster cluster, String functionRuntimeType) {
        super(cluster, functionRuntimeType);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public <T extends GenericContainer> void testSource(SourceTester<T> tester) throws Exception {
        String tenant = "public";
        String namespace = "default";
        String outputTopicName = "test-source-connector-" + this.functionRuntimeType + "-output-topic-" + PulsarTestBase.randomName(8);
        String sourceName = "test-source-connector-" + this.functionRuntimeType + "-name-" + PulsarTestBase.randomName(8);
        int numMessages = 20;
        PulsarClient client = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.pulsarCluster.getHttpServiceUrl()).build();
            try {
                admin.topics().createNonPartitionedTopic(outputTopicName);
                Consumer consumer = client.newConsumer(Schema.STRING).topic(new String[]{outputTopicName}).subscriptionName("source-tester").subscriptionType(SubscriptionType.Exclusive).subscribe();
                try {
                    this.prepareSource(tester);
                    this.submitSourceConnector(tester, "public", "default", sourceName, outputTopicName);
                    this.getSourceInfoSuccess(tester, "public", "default", sourceName);
                    Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.getSourceStatus("public", "default", sourceName));
                    Map<String, String> kvs = tester.produceSourceMessages(20);
                    Failsafe.with((Policy[])new RetryPolicy[]{this.statusRetryPolicy}).run(() -> this.waitForProcessingSourceMessages("public", "default", sourceName, 20));
                    this.validateSourceResult((Consumer<String>)consumer, kvs);
                    this.updateSourceConnector(tester, "public", "default", sourceName, outputTopicName);
                    this.deleteSource("public", "default", sourceName);
                    this.getSourceInfoNotFound("public", "default", sourceName);
                }
                finally {
                    if (Collections.singletonList(consumer).get(0) != null) {
                        consumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    protected void prepareSource(SourceTester tester) throws Exception {
        tester.prepareSource();
    }

    protected void submitSourceConnector(SourceTester tester, String tenant, String namespace, String sourceName, String outputTopicName) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "create", "--tenant", tenant, "--namespace", namespace, "--name", sourceName, "--source-type", tester.sourceType(), "--sourceConfig", new Gson().toJson(tester.sourceConfig()), "--destinationTopicName", outputTopicName};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""), (String)result.getStdout());
    }

    protected void updateSourceConnector(SourceTester tester, String tenant, String namespace, String sourceName, String outputTopicName) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "update", "--tenant", tenant, "--namespace", namespace, "--name", sourceName, "--source-type", tester.sourceType(), "--sourceConfig", new Gson().toJson(tester.sourceConfig()), "--destinationTopicName", outputTopicName, "--parallelism", "2"};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Updated successfully\""), (String)result.getStdout());
    }

    protected void getSourceInfoSuccess(SourceTester tester, String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "get", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get source info : {}", (Object)result.getStdout());
        Assert.assertTrue((boolean)result.getStdout().contains("\"archive\": \"builtin://" + tester.getSourceType() + "\""), (String)result.getStdout());
    }

    protected void getSourceStatus(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get source status : {}", (Object)result.getStdout());
        Assert.assertEquals((int)result.getExitCode(), (int)0);
        SourceStatus sourceStatus = SourceStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)sourceStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)sourceStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)sourceStatus.getInstances().size(), (int)1);
        Assert.assertEquals((boolean)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    protected void validateSourceResult(Consumer<String> consumer, Map<String, String> kvs) throws Exception {
        for (Map.Entry<String, String> kv : kvs.entrySet()) {
            Message msg = consumer.receive();
            Assert.assertEquals((String)kv.getKey(), (String)msg.getKey());
            Assert.assertEquals((String)kv.getValue(), (String)((String)msg.getValue()));
        }
    }

    protected void waitForProcessingSourceMessages(String tenant, String namespace, String sourceName, int numMessages) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "status", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        log.info("Get source status : {}", (Object)result.getStdout());
        Assert.assertEquals((int)result.getExitCode(), (int)0);
        SourceStatus sourceStatus = SourceStatusUtil.decode((String)result.getStdout());
        Assert.assertEquals((int)sourceStatus.getNumInstances(), (int)1);
        Assert.assertEquals((int)sourceStatus.getNumRunning(), (int)1);
        Assert.assertEquals((int)sourceStatus.getInstances().size(), (int)1);
        Assert.assertEquals((int)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getInstanceId(), (int)0);
        Assert.assertEquals((boolean)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().isRunning(), (boolean)true);
        Assert.assertTrue((((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getLastReceivedTime() > 0L ? 1 : 0) != 0);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumReceivedFromSource(), (long)numMessages);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumWritten(), (long)numMessages);
        Assert.assertEquals((long)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getNumRestarts(), (long)0L);
        Assert.assertEquals((int)((SourceStatus.SourceInstanceStatus)sourceStatus.getInstances().get(0)).getStatus().getLatestSystemExceptions().size(), (int)0);
    }

    protected void deleteSource(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "delete", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        ContainerExecResult result = this.pulsarCluster.getAnyWorker().execCmd(commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Delete source successfully"), (String)result.getStdout());
        result.assertNoStderr();
    }

    protected void getSourceInfoNotFound(String tenant, String namespace, String sourceName) throws Exception {
        String[] commands = new String[]{"/pulsar/bin/pulsar-admin", "source", "get", "--tenant", tenant, "--namespace", namespace, "--name", sourceName};
        try {
            this.pulsarCluster.getAnyWorker().execCmd(commands);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
        }
    }
}

