/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.io;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.policies.data.SinkStatus;
import org.apache.pulsar.common.schema.KeyValue;
import org.apache.pulsar.common.schema.KeyValueEncodingType;
import org.apache.pulsar.tests.integration.docker.ContainerExecException;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarStandaloneTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class PulsarGenericObjectSinkTest
extends PulsarStandaloneTestSuite {
    private static final Logger log = LoggerFactory.getLogger(PulsarGenericObjectSinkTest.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"sink"})
    public void testGenericObjectSink() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.container.getHttpServiceUrl()).build();
            try {
                List<SinkSpec> specs = Arrays.asList(new SinkSpec<String>("test-kv-sink-input-string-" + PulsarGenericObjectSinkTest.randomName(8), Schema.STRING, "foo"), new SinkSpec<Pojo>("test-kv-sink-input-avro-" + PulsarGenericObjectSinkTest.randomName(8), Schema.AVRO(Pojo.class), Pojo.builder().field1("a").field2(2).build()), new SinkSpec<Pojo>("test-kv-sink-input-json-" + PulsarGenericObjectSinkTest.randomName(8), Schema.JSON(Pojo.class), Pojo.builder().field1("a").field2(2).build()), new SinkSpec<KeyValue>("test-kv-sink-input-kv-string-int-" + PulsarGenericObjectSinkTest.randomName(8), Schema.KeyValue((Schema)Schema.STRING, (Schema)Schema.INT32), new KeyValue((Object)"foo", (Object)123)), new SinkSpec<KeyValue>("test-kv-sink-input-kv-avro-json-inl-" + PulsarGenericObjectSinkTest.randomName(8), Schema.KeyValue((Schema)Schema.AVRO(PojoKey.class), (Schema)Schema.JSON(Pojo.class), (KeyValueEncodingType)KeyValueEncodingType.INLINE), new KeyValue((Object)PojoKey.builder().field1("a").build(), (Object)Pojo.builder().field1("a").field2(2).build())), new SinkSpec<KeyValue>("test-kv-sink-input-kv-avro-json-sep-" + PulsarGenericObjectSinkTest.randomName(8), Schema.KeyValue((Schema)Schema.AVRO(PojoKey.class), (Schema)Schema.JSON(Pojo.class), (KeyValueEncodingType)KeyValueEncodingType.SEPARATED), new KeyValue((Object)PojoKey.builder().field1("a").build(), (Object)Pojo.builder().field1("a").field2(2).build())));
                int numRecordsPerTopic = 2;
                String sinkName = "genericobject-sink";
                String topicNames = specs.stream().map(SinkSpec::getOutputTopicName).collect(Collectors.joining(","));
                this.submitSinkConnector(sinkName, topicNames, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", "/pulsar/examples/java-test-functions.jar");
                this.getSinkInfoSuccess(sinkName);
                this.getSinkStatus(sinkName);
                for (SinkSpec spec : specs) {
                    this.sendMessages(client, 2, spec);
                }
                try {
                    log.info("waiting for sink {}", (Object)sinkName);
                    for (int i = 0; i < 120; ++i) {
                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
                        log.info("sink {} status {}", (Object)sinkName, (Object)status);
                        Assert.assertEquals((int)status.getInstances().size(), (int)1);
                        SinkStatus.SinkInstanceStatus instance = (SinkStatus.SinkInstanceStatus)status.getInstances().get(0);
                        if (instance.getStatus().numWrittenToSink >= (long)(2 * specs.size()) || instance.getStatus().numSinkExceptions > 0L || instance.getStatus().numSystemExceptions > 0L || instance.getStatus().numRestarts > 0L) break;
                        Thread.sleep(1000L);
                    }
                    SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
                    log.info("sink {} status {}", (Object)sinkName, (Object)status);
                    Assert.assertEquals((int)status.getInstances().size(), (int)1);
                    Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numWrittenToSink >= (long)(2 * specs.size()) ? 1 : 0) != 0);
                    Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numSinkExceptions == 0L ? 1 : 0) != 0);
                    Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numSystemExceptions == 0L ? 1 : 0) != 0);
                    log.info("sink {} is okay", (Object)sinkName);
                }
                finally {
                    this.dumpFunctionLogs(sinkName);
                }
                this.deleteSink(sinkName);
                this.getSinkInfoNotFound(sinkName);
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private <T> void sendMessages(PulsarClient client, int numRecordsPerTopic, SinkSpec<T> spec) throws PulsarClientException {
        Producer producer = client.newProducer(spec.schema).topic(spec.outputTopicName).create();
        try {
            for (int i = 0; i < numRecordsPerTopic; ++i) {
                MessageId messageId = producer.newMessage().value(spec.testValue).property("expectedType", spec.schema.getSchemaInfo().getType().toString()).property("recordNumber", "" + i).send();
                log.info("sent message {} {}  with ID {}", new Object[]{spec.testValue, spec.schema.getSchemaInfo().getType().toString(), messageId});
            }
        }
        finally {
            if (Collections.singletonList(producer).get(0) != null) {
                producer.close();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(groups={"sink"})
    public void testGenericObjectSinkWithSchemaChange() throws Exception {
        PulsarClient client = PulsarClient.builder().serviceUrl(this.container.getPlainTextServiceUrl()).build();
        try {
            PulsarAdmin admin = PulsarAdmin.builder().serviceHttpUrl(this.container.getHttpServiceUrl()).build();
            try {
                int numRecords = 2;
                String sinkName = "genericobject-sink";
                String topicName = "test-genericobject-sink-schema-change";
                this.submitSinkConnector(sinkName, topicName, "org.apache.pulsar.tests.integration.io.TestGenericObjectSink", "/pulsar/examples/java-test-functions.jar");
                this.getSinkInfoSuccess(sinkName);
                this.getSinkStatus(sinkName);
                Producer producer = client.newProducer().topic(topicName).create();
                try {
                    Schema schemav1 = Schema.AVRO(Pojo.class);
                    Pojo record1 = Pojo.builder().field1("foo").field2(23).build();
                    producer.newMessage(schemav1).value((Object)record1).property("expectedType", schemav1.getSchemaInfo().getType().toString()).property("expectedSchemaDefinition", schemav1.getSchemaInfo().getSchemaDefinition()).property("recordNumber", "1").send();
                    Schema schemav2 = Schema.AVRO(PojoV2.class);
                    PojoV2 record2 = PojoV2.builder().field1("foo").field2(23).field3(42.5).build();
                    producer.newMessage(schemav2).value((Object)record2).property("expectedType", schemav2.getSchemaInfo().getType().toString()).property("expectedSchemaDefinition", schemav2.getSchemaInfo().getSchemaDefinition()).property("recordNumber", "2").send();
                    try {
                        log.info("waiting for sink {}", (Object)sinkName);
                        for (int i = 0; i < 120; ++i) {
                            SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
                            log.info("sink {} status {}", (Object)sinkName, (Object)status);
                            Assert.assertEquals((int)status.getInstances().size(), (int)1);
                            SinkStatus.SinkInstanceStatus instance = (SinkStatus.SinkInstanceStatus)status.getInstances().get(0);
                            if (instance.getStatus().numWrittenToSink >= 2L || instance.getStatus().numSinkExceptions > 0L || instance.getStatus().numSystemExceptions > 0L || instance.getStatus().numRestarts > 0L) break;
                            Thread.sleep(1000L);
                        }
                        SinkStatus status = admin.sinks().getSinkStatus("public", "default", sinkName);
                        log.info("sink {} status {}", (Object)sinkName, (Object)status);
                        Assert.assertEquals((int)status.getInstances().size(), (int)1);
                        Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numWrittenToSink >= 2L ? 1 : 0) != 0);
                        Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numSinkExceptions == 0L ? 1 : 0) != 0);
                        Assert.assertTrue((((SinkStatus.SinkInstanceStatus)status.getInstances().get((int)0)).getStatus().numSystemExceptions == 0L ? 1 : 0) != 0);
                        log.info("sink {} is okay", (Object)sinkName);
                    }
                    finally {
                        this.dumpFunctionLogs(sinkName);
                    }
                    this.deleteSink(sinkName);
                    this.getSinkInfoNotFound(sinkName);
                }
                finally {
                    if (Collections.singletonList(producer).get(0) != null) {
                        producer.close();
                    }
                }
            }
            finally {
                if (Collections.singletonList(admin).get(0) != null) {
                    admin.close();
                }
            }
        }
        finally {
            if (Collections.singletonList(client).get(0) != null) {
                client.close();
            }
        }
    }

    private void submitSinkConnector(String sinkName, String inputTopicName, String className, String archive) throws Exception {
        Object[] commands = new String[]{"/pulsar/bin/pulsar-admin", "sinks", "create", "--name", sinkName, "-i", inputTopicName, "--archive", archive, "--classname", className};
        log.info("Run command : {}", (Object)StringUtils.join((Object[])commands, (char)' '));
        ContainerExecResult result = this.container.execCmd((String[])commands);
        Assert.assertTrue((boolean)result.getStdout().contains("Created successfully"), (String)result.getStdout());
    }

    private void getSinkInfoSuccess(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("\"name\": \"" + sinkName + "\""));
    }

    private void getSinkStatus(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "status", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        log.info(result.getStdout());
        log.info(result.getStderr());
        Assert.assertTrue((boolean)result.getStdout().contains("\"running\" : true"));
    }

    private void deleteSink(String sinkName) throws Exception {
        ContainerExecResult result = this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "delete", "--tenant", "public", "--namespace", "default", "--name", sinkName);
        Assert.assertTrue((boolean)result.getStdout().contains("successfully"));
        result.assertNoStderr();
    }

    private void getSinkInfoNotFound(String sinkName) throws Exception {
        try {
            this.container.execCmd("/pulsar/bin/pulsar-admin", "sinks", "get", "--tenant", "public", "--namespace", "default", "--name", sinkName);
            Assert.fail((String)"Command should have exited with non-zero");
        }
        catch (ContainerExecException e) {
            Assert.assertTrue((boolean)e.getResult().getStderr().contains(sinkName + " doesn't exist"));
        }
    }

    private static final class SinkSpec<T> {
        final String outputTopicName;
        final Schema<T> schema;
        final T testValue;

        public SinkSpec(String outputTopicName, Schema<T> schema, T testValue) {
            this.outputTopicName = outputTopicName;
            this.schema = schema;
            this.testValue = testValue;
        }

        public String getOutputTopicName() {
            return this.outputTopicName;
        }

        public Schema<T> getSchema() {
            return this.schema;
        }

        public T getTestValue() {
            return this.testValue;
        }
    }

    public static class Pojo {
        private String field1;
        private int field2;

        Pojo(String field1, int field2) {
            this.field1 = field1;
            this.field2 = field2;
        }

        public static PojoBuilder builder() {
            return new PojoBuilder();
        }

        public String getField1() {
            return this.field1;
        }

        public int getField2() {
            return this.field2;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public void setField2(int field2) {
            this.field2 = field2;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof Pojo)) {
                return false;
            }
            Pojo other = (Pojo)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getField2() != other.getField2()) {
                return false;
            }
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            return !(this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1));
        }

        protected boolean canEqual(Object other) {
            return other instanceof Pojo;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getField2();
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            return result;
        }

        public String toString() {
            return "PulsarGenericObjectSinkTest.Pojo(field1=" + this.getField1() + ", field2=" + this.getField2() + ")";
        }

        public static class PojoBuilder {
            private String field1;
            private int field2;

            PojoBuilder() {
            }

            public PojoBuilder field1(String field1) {
                this.field1 = field1;
                return this;
            }

            public PojoBuilder field2(int field2) {
                this.field2 = field2;
                return this;
            }

            public Pojo build() {
                return new Pojo(this.field1, this.field2);
            }

            public String toString() {
                return "PulsarGenericObjectSinkTest.Pojo.PojoBuilder(field1=" + this.field1 + ", field2=" + this.field2 + ")";
            }
        }
    }

    public static final class PojoKey {
        private String field1;

        PojoKey(String field1) {
            this.field1 = field1;
        }

        public static PojoKeyBuilder builder() {
            return new PojoKeyBuilder();
        }

        public String getField1() {
            return this.field1;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PojoKey)) {
                return false;
            }
            PojoKey other = (PojoKey)o;
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            return !(this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1));
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            return result;
        }

        public String toString() {
            return "PulsarGenericObjectSinkTest.PojoKey(field1=" + this.getField1() + ")";
        }

        public static class PojoKeyBuilder {
            private String field1;

            PojoKeyBuilder() {
            }

            public PojoKeyBuilder field1(String field1) {
                this.field1 = field1;
                return this;
            }

            public PojoKey build() {
                return new PojoKey(this.field1);
            }

            public String toString() {
                return "PulsarGenericObjectSinkTest.PojoKey.PojoKeyBuilder(field1=" + this.field1 + ")";
            }
        }
    }

    public static class PojoV2 {
        private String field1;
        private int field2;
        private Double field3;

        PojoV2(String field1, int field2, Double field3) {
            this.field1 = field1;
            this.field2 = field2;
            this.field3 = field3;
        }

        public static PojoV2Builder builder() {
            return new PojoV2Builder();
        }

        public String getField1() {
            return this.field1;
        }

        public int getField2() {
            return this.field2;
        }

        public Double getField3() {
            return this.field3;
        }

        public void setField1(String field1) {
            this.field1 = field1;
        }

        public void setField2(int field2) {
            this.field2 = field2;
        }

        public void setField3(Double field3) {
            this.field3 = field3;
        }

        public boolean equals(Object o) {
            if (o == this) {
                return true;
            }
            if (!(o instanceof PojoV2)) {
                return false;
            }
            PojoV2 other = (PojoV2)o;
            if (!other.canEqual(this)) {
                return false;
            }
            if (this.getField2() != other.getField2()) {
                return false;
            }
            Double this$field3 = this.getField3();
            Double other$field3 = other.getField3();
            if (this$field3 == null ? other$field3 != null : !((Object)this$field3).equals(other$field3)) {
                return false;
            }
            String this$field1 = this.getField1();
            String other$field1 = other.getField1();
            return !(this$field1 == null ? other$field1 != null : !this$field1.equals(other$field1));
        }

        protected boolean canEqual(Object other) {
            return other instanceof PojoV2;
        }

        public int hashCode() {
            int PRIME = 59;
            int result = 1;
            result = result * 59 + this.getField2();
            Double $field3 = this.getField3();
            result = result * 59 + ($field3 == null ? 43 : ((Object)$field3).hashCode());
            String $field1 = this.getField1();
            result = result * 59 + ($field1 == null ? 43 : $field1.hashCode());
            return result;
        }

        public String toString() {
            return "PulsarGenericObjectSinkTest.PojoV2(field1=" + this.getField1() + ", field2=" + this.getField2() + ", field3=" + this.getField3() + ")";
        }

        public static class PojoV2Builder {
            private String field1;
            private int field2;
            private Double field3;

            PojoV2Builder() {
            }

            public PojoV2Builder field1(String field1) {
                this.field1 = field1;
                return this;
            }

            public PojoV2Builder field2(int field2) {
                this.field2 = field2;
                return this;
            }

            public PojoV2Builder field3(Double field3) {
                this.field3 = field3;
                return this;
            }

            public PojoV2 build() {
                return new PojoV2(this.field1, this.field2, this.field3);
            }

            public String toString() {
                return "PulsarGenericObjectSinkTest.PojoV2.PojoV2Builder(field1=" + this.field1 + ", field2=" + this.field2 + ", field3=" + this.field3 + ")";
            }
        }
    }
}

