/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io.sources;

import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.common.policies.data.SourceStatus;
import org.apache.pulsar.tests.integration.containers.StandaloneContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class GenericRecordSourceTest
extends PulsarStandaloneTestSuite {
    private static final Logger log = LoggerFactory.getLogger(GenericRecordSourceTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"source"})
    public void testGenericRecordSource() throws Exception {
        String outputTopicName = "test-state-source-output-" + GenericRecordSourceTest.randomName(8);
        String sourceName = "test-state-source-" + GenericRecordSourceTest.randomName(8);
        int numMessages = 10;
        try {
            this.submitSourceConnector(sourceName, outputTopicName, "org.apache.pulsar.tests.integration.io.GenericRecordSource", "/pulsar/examples/java-test-functions.jar");
            GenericRecordSourceTest.getSourceInfoSuccess(this.container, sourceName);
            GenericRecordSourceTest.getSourceStatus(this.container, sourceName);
            try (PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.container.getHttpServiceUrl()).build();){
                PulsarTestSuite.retryStrategically(test -> {
                    try {
                        SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
                        return status.getInstances().size() > 0 && ((SourceStatus.SourceInstanceStatus)status.getInstances().get((int)0)).getStatus().numWritten >= 10L;
                    }
                    catch (PulsarAdminException e) {
                        return false;
                    }
                }, 10, 200L);
                SourceStatus status = admin.sources().getSourceStatus("public", "default", sourceName);
                Assert.assertEquals((int)status.getInstances().size(), (int)1);
                Assert.assertTrue((((SourceStatus.SourceInstanceStatus)status.getInstances().get((int)0)).getStatus().numWritten >= 10L ? 1 : 0) != 0);
            }
            GenericRecordSourceTest.consumeMessages(this.container, outputTopicName, numMessages);
            GenericRecordSourceTest.deleteSource(this.container, sourceName);
            GenericRecordSourceTest.getSourceInfoNotFound(this.container, sourceName);
        }
        finally {
            this.dumpFunctionLogs(sourceName);
        }
    }

    private void submitSourceConnector(String sourceName, String outputTopicName, String className, String archive) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sources", "create", "--name", sourceName, "--destinationTopicName", outputTopicName, "--archive", archive, "--classname", className};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("\"Created successfully\""), (String)result.getStdout());
    }

    private static void getSourceInfoSuccess(StandaloneContainer container, String sourceName) throws Exception {
        ContainerExecResult result = container.execCmd("/pulsar/bin/pulsar-admin", "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + sourceName + "\""));
    }

    private static void getSourceStatus(StandaloneContainer container, String sourceName) throws Exception {
        PulsarTestSuite.retryStrategically(test -> {
            try {
                ContainerExecResult result = container.execCmd("/pulsar/bin/pulsar-admin", "sources", "status", "--tenant", "public", "--namespace", "default", "--name", sourceName);
                return result.getStdout().contains("\"running\" : true");
            }
            catch (Exception e) {
                log.error("Encountered error when getting source status", (Throwable)e);
                return false;
            }
        }, 10, 200L);
        ContainerExecResult result = container.execCmd("/pulsar/bin/pulsar-admin", "sources", "status", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    private static void consumeMessages(StandaloneContainer container, String outputTopic, int numMessages) throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(container.getPlainTextServiceUrl()).build();
        try {
            Consumer consumer = client.newConsumer(Schema.AUTO_CONSUME()).topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).startMessageIdInclusive().subscribe();
            try {
                for (int i = 0; i < numMessages; ++i) {
                    Message msg = consumer.receive(10, TimeUnit.SECONDS);
                    if (msg == null) {
                        Assert.fail((String)("message " + i + " not received in time"));
                        return;
                    }
                    log.info("received {}", msg.getValue());
                    ((GenericRecord)msg.getValue()).getFields().forEach(f -> log.info("field {} {}", f, ((GenericRecord)msg.getValue()).getField(f)));
                    String text = (String)((GenericRecord)msg.getValue()).getField("text");
                    int number = (Integer)((GenericRecord)msg.getValue()).getField("number");
                    Assert.assertEquals((String)text, (String)("value-" + number));
                }
                Consumer typedConsumer = client.newConsumer(Schema.AVRO(MyBean.class)).topic(new String[]{outputTopic}).subscriptionType(SubscriptionType.Exclusive).subscriptionName("test-sub-typed").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).startMessageIdInclusive().subscribe();
                try {
                    int i = 0;
                    while (i < numMessages) {
                        Message msg = typedConsumer.receive(10, TimeUnit.SECONDS);
                        if (msg == null) {
                            Assert.fail((String)("message " + i + " not received in time"));
                            return;
                        }
                        log.info("received {}", msg.getValue());
                        String text = ((MyBean)msg.getValue()).getText();
                        int number = ((MyBean)msg.getValue()).getNumber();
                        Assert.assertEquals((String)text, (String)("value-" + number));
                        ++i;
                    }
                    return;
                }
                finally {
                    if (Collections.singletonList(typedConsumer).get(0) != null) {
                        typedConsumer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(consumer).get(0) != null) {
                    consumer.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private static void deleteSource(StandaloneContainer container, String sourceName) throws Exception {
        ContainerExecResult result = container.execCmd("/pulsar/bin/pulsar-admin", "sources", "delete", "--tenant", "public", "--namespace", "default", "--name", sourceName);
        Assert.assertTrue((boolean)result.getStdout().contains("Delete source successfully"));
        Assert.assertTrue((boolean)result.getStderr().isEmpty());
    }

    private static void getSourceInfoNotFound(StandaloneContainer container, String sourceName) throws Exception {
        try {
            container.execCmd("/pulsar/bin/pulsar-admin", "sources", "get", "--tenant", "public", "--namespace", "default", "--name", sourceName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains("Reason: Source " + sourceName + " doesn't exist"));
        }
    }

    public static class MyBean {
        String text;
        int number;

        public String getText() {
            return this.text;
        }

        public int getNumber() {
            return this.number;
        }

        public void setText(String text) {
            this.text = text;
        }

        public void setNumber(int number) {
            this.number = number;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof MyBean)) {
                return false;
            }
            MyBean other = (MyBean)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getNumber() != other.getNumber()) {
                return false;
            }
            String this$text = this.getText();
            String other$text = other.getText();
            return !(this$text == null ? other$text != null : !this$text.equals(other$text));
        }

        protected boolean canEqual(Object other) {
            return other instanceof MyBean;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getNumber();
            String $text = this.getText();
            result = result * 59 + ($text == null ? 43 : $text.hashCode());
            return result;
        }

        public String toString() {
            return "GenericRecordSourceTest.MyBean(text=" + this.getText() + ", number=" + this.getNumber() + ")";
        }
    }
}

