/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.tests.integration.cli;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.HashMap;
import java.util.UUID;
import org.apache.avro.reflect.AvroAlias;
import org.apache.avro.reflect.AvroDefault;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.apache.pulsar.tests.integration.suites.PulsarTestSuite;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class SchemaUpdateStrategyTest
extends PulsarTestSuite {
    private static final Logger log = LoggerFactory.getLogger(SchemaUpdateStrategyTest.class);

    private void testAutoUpdateBackward(String namespace, String topicName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-schema-autoupdate-strategy", namespace);
        Assert.assertEquals((String)result.getStdout().trim(), (String)"FULL");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy", "--compatibility", "BACKWARD", namespace);
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            V1Data v1Data = new V1Data("test1", 1);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create();){
                p.send((Object)v1Data);
            }
            log.info("try with forward compat, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Forward compat schema should be rejected");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("try with backward compat, should succeed");
            V2Data v2Data = new V2Data("test2");
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create();){
                p.send((Object)v2Data);
            }
            Schema schema = Schema.AUTO_CONSUME();
            try (Consumer consumer = pulsarClient.newConsumer(schema).topic(new String[]{topicName}).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscriptionName("sub").subscribe();){
                log.info("Schema Info : {}", (Object)schema.getSchemaInfo().getSchemaDefinition());
                Message msg1 = consumer.receive();
                v1Data.assertEqualToRecord((GenericRecord)msg1.getValue());
                Message msg2 = consumer.receive();
                v2Data.assertEqualToRecord((GenericRecord)msg2.getValue());
            }
        }
    }

    private void testNone(String namespace, String topicName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-schema-autoupdate-strategy", namespace);
        Assert.assertEquals((String)result.getStdout().trim(), (String)"FULL");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy", "--compatibility", "NONE", namespace);
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            V1Data v1Data = new V1Data("test1", 1);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create();){
                p.send((Object)v1Data);
            }
            log.info("try with forward compat, should succeed");
            V3Data v3Data = new V3Data("test3", 1, 2L);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create();){
                p.send((Object)v3Data);
            }
            log.info("try with backward compat, should succeed");
            V2Data v2Data = new V2Data("test2");
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create();){
                p.send((Object)v2Data);
            }
            Schema schema = Schema.AUTO_CONSUME();
            try (Consumer consumer = pulsarClient.newConsumer(schema).topic(new String[]{topicName}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
                log.info("Schema Info : {}", (Object)schema.getSchemaInfo().getSchemaDefinition());
                Message msg1 = consumer.receive();
                v1Data.assertEqualToRecord((GenericRecord)msg1.getValue());
                Message msg2 = consumer.receive();
                v3Data.assertEqualToRecord((GenericRecord)msg2.getValue());
                Message msg3 = consumer.receive();
                v2Data.assertEqualToRecord((GenericRecord)msg3.getValue());
            }
        }
    }

    private void testAutoUpdateForward(String namespace, String topicName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-schema-autoupdate-strategy", namespace);
        Assert.assertEquals((String)result.getStdout().trim(), (String)"FULL");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy", "--compatibility", "FORWARD", namespace);
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            V1Data v1Data = new V1Data("test1", 1);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create();){
                p.send((Object)v1Data);
            }
            log.info("try with backward compat, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Backward compat schema should be rejected");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("try with forward compat, should succeed");
            V3Data v3Data = new V3Data("test2", 1, 2L);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create();){
                p.send((Object)v3Data);
            }
            Schema schema = Schema.AUTO_CONSUME();
            try (Consumer consumer = pulsarClient.newConsumer(schema).topic(new String[]{topicName}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
                log.info("Schema Info : {}", (Object)schema.getSchemaInfo().getSchemaDefinition());
                Message msg1 = consumer.receive();
                v1Data.assertEqualToRecord((GenericRecord)msg1.getValue());
                Message msg2 = consumer.receive();
                v3Data.assertEqualToRecord((GenericRecord)msg2.getValue());
            }
        }
    }

    private void testAutoUpdateFull(String namespace, String topicName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-schema-autoupdate-strategy", namespace);
        Assert.assertEquals((String)result.getStdout().trim(), (String)"FULL");
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            V1Data v1Data = new V1Data("test1", 1);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create();){
                p.send((Object)v1Data);
            }
            log.info("try with backward compat only, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Backward compat only schema should fail");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("try with forward compat only, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Forward compat only schema should fail");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("try with fully compat");
            V4Data v4Data = new V4Data("test2", 1, 100);
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create();){
                p.send((Object)v4Data);
            }
            Schema schema = Schema.AUTO_CONSUME();
            try (Consumer consumer = pulsarClient.newConsumer(schema).topic(new String[]{topicName}).subscriptionName("sub").subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();){
                log.info("Schema Info : {}", (Object)schema.getSchemaInfo().getSchemaDefinition());
                Message msg1 = consumer.receive();
                v1Data.assertEqualToRecord((GenericRecord)msg1.getValue());
                Message msg2 = consumer.receive();
                v4Data.assertEqualToRecord((GenericRecord)msg2.getValue());
            }
        }
    }

    private void testAutoUpdateDisabled(String namespace, String topicName) throws Exception {
        ContainerExecResult result = this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "get-schema-autoupdate-strategy", namespace);
        Assert.assertEquals((String)result.getStdout().trim(), (String)"FULL");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "set-schema-autoupdate-strategy", "--disabled", namespace);
        try (PulsarClient pulsarClient = PulsarClient.builder().serviceUrl(this.pulsarCluster.getPlainTextServiceUrl()).build();){
            try (Producer p = pulsarClient.newProducer(Schema.AVRO(V1Data.class)).topic(topicName).create();){
                p.send((Object)new V1Data("test1", 1));
            }
            log.info("try with backward compat only, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V2Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Backward compat only schema should fail");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("try with forward compat only, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V3Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Forward compat only schema should fail");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("try with fully compat, should fail");
            try {
                p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create();
                try {
                    Assert.fail((String)"Fully compat schema should fail, autoupdate disabled");
                }
                finally {
                    if (p != null) {
                        p.close();
                    }
                }
            }
            catch (PulsarClientException e) {
                Assert.assertTrue((boolean)e.getMessage().contains("IncompatibleSchemaException"));
            }
            log.info("Manually set new schema");
            ObjectMapper mapper = new ObjectMapper();
            HashMap<String, String> schema = new HashMap<String, String>();
            schema.put("type", "AVRO");
            schema.put("schema", Schema.AVRO(V4Data.class).getSchemaInfo().getSchemaDefinition());
            BrokerContainer b = this.pulsarCluster.getAnyBroker();
            String schemaFile = String.format("/tmp/schema-%s", UUID.randomUUID().toString());
            b.putFile(schemaFile, mapper.writeValueAsBytes(schema));
            b.execCmd("/pulsar/bin/pulsar-admin", "schemas", "upload", "-f", schemaFile, topicName);
            boolean success = false;
            for (int i = 0; i < 50; ++i) {
                try (Producer p = pulsarClient.newProducer(Schema.AVRO(V4Data.class)).topic(topicName).create();){
                    p.send((Object)new V4Data("test2", 1, 100));
                    success = true;
                    break;
                }
                catch (Throwable throwable) {
                    Thread.sleep(100L);
                    continue;
                }
            }
            Assert.assertTrue((boolean)success, (String)"Should have been able to use new schema");
        }
    }

    @Test
    public void testBackwardV2() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/bw-p-v2");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/bw-np-v2");
        this.testAutoUpdateBackward("public/bw-p-v2", "persistent://public/bw-p-v2/topic1");
        this.testAutoUpdateBackward("public/bw-np-v2", "non-persistent://public/bw-np-v2/topic1");
    }

    @Test
    public void testForwardV2() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/fw-p-v2");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/fw-np-v2");
        this.testAutoUpdateForward("public/fw-p-v2", "persistent://public/fw-p-v2/topic1");
        this.testAutoUpdateForward("public/fw-np-v2", "non-persistent://public/fw-np-v2/topic1");
    }

    @Test
    public void testFullV2() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/full-p-v2");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/full-np-v2");
        this.testAutoUpdateFull("public/full-p-v2", "persistent://public/full-p-v2/topic1");
        this.testAutoUpdateFull("public/full-np-v2", "non-persistent://public/full-np-v2/topic1");
    }

    @Test
    public void testNoneV2() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/none-p-v2");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/none-np-v2");
        this.testNone("public/none-p-v2", "persistent://public/none-p-v2/topic1");
        this.testNone("public/none-np-v2", "non-persistent://public/none-np-v2/topic1");
    }

    @Test
    public void testDisabledV2() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/dis-p-v2");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "-c", this.pulsarCluster.getClusterName(), "public/dis-np-v2");
        this.testAutoUpdateDisabled("public/dis-p-v2", "persistent://public/dis-p-v2/topic1");
        this.testAutoUpdateDisabled("public/dis-np-v2", "non-persistent://public/dis-np-v2/topic1");
    }

    @Test
    public void testBackwardV1() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/b-p-v1");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/b-np-v1");
        this.testAutoUpdateBackward("public/" + this.pulsarCluster.getClusterName() + "/b-p-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/b-p-v1/topic1");
        this.testAutoUpdateBackward("public/" + this.pulsarCluster.getClusterName() + "/b-np-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/b-np-v1/topic1");
    }

    @Test
    public void testForwardV1() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/f-p-v1");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/f-np-v1");
        this.testAutoUpdateForward("public/" + this.pulsarCluster.getClusterName() + "/f-p-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/f-p-v1/topic1");
        this.testAutoUpdateForward("public/" + this.pulsarCluster.getClusterName() + "/f-np-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/f-np-v1/topic1");
    }

    @Test
    public void testFullV1() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/full-p-v1");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/full-np-v1");
        this.testAutoUpdateFull("public/" + this.pulsarCluster.getClusterName() + "/full-p-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/full-p-v1/topic1");
        this.testAutoUpdateFull("public/" + this.pulsarCluster.getClusterName() + "/full-np-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/full-np-v1/topic1");
    }

    @Test
    public void testNoneV1() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/none-p-v1");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/none-np-v1");
        this.testNone("public/" + this.pulsarCluster.getClusterName() + "/none-p-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/none-p-v1/topic1");
        this.testNone("public/" + this.pulsarCluster.getClusterName() + "/none-np-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/none-np-v1/topic1");
    }

    @Test
    public void testDisabledV1() throws Exception {
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/dis-p-v1");
        this.pulsarCluster.runAdminCommandOnAnyBroker("namespaces", "create", "public/" + this.pulsarCluster.getClusterName() + "/dis-np-v1");
        this.testAutoUpdateDisabled("public/" + this.pulsarCluster.getClusterName() + "/dis-p-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/dis-p-v1/topic1");
        this.testAutoUpdateDisabled("public/" + this.pulsarCluster.getClusterName() + "/dis-np-v1", "persistent://public/" + this.pulsarCluster.getClusterName() + "/dis-np-v1/topic1");
    }

    @AvroAlias(space="blah", alias="data")
    static class V1Data {
        String foo;
        int bar;

        V1Data(String foo, int bar) {
            this.foo = foo;
            this.bar = bar;
        }

        void assertEqualToRecord(GenericRecord record) {
            Assert.assertEquals((int)2, (int)record.getFields().size(), (String)(record.getFields().size() + " fields in found : " + record.getFields()));
            Assert.assertEquals((Object)this.foo, (Object)record.getField("foo"));
            Assert.assertEquals((Object)this.bar, (Object)record.getField("bar"));
        }
    }

    @AvroAlias(space="blah", alias="data")
    static class V3Data {
        String foo;
        int bar;
        long baz;

        V3Data(String foo, int bar, long baz) {
            this.foo = foo;
            this.bar = bar;
            this.baz = baz;
        }

        void assertEqualToRecord(GenericRecord record) {
            Assert.assertEquals((int)3, (int)record.getFields().size(), (String)(record.getFields().size() + " fields in found : " + record.getFields()));
            Assert.assertEquals((Object)this.foo, (Object)record.getField("foo"));
            Assert.assertEquals((Object)this.bar, (Object)record.getField("bar"));
            Assert.assertEquals((Object)this.baz, (Object)record.getField("baz"));
        }
    }

    @AvroAlias(space="blah", alias="data")
    static class V2Data {
        String foo;

        V2Data(String foo) {
            this.foo = foo;
        }

        void assertEqualToRecord(GenericRecord record) {
            Assert.assertEquals((int)1, (int)record.getFields().size(), (String)(record.getFields().size() + " fields in found : " + record.getFields()));
            Assert.assertEquals((Object)this.foo, (Object)record.getField("foo"));
        }
    }

    @AvroAlias(space="blah", alias="data")
    static class V4Data {
        String foo;
        int bar;
        @AvroDefault(value="10")
        short blah;

        V4Data(String foo, int bar, short blah) {
            this.foo = foo;
            this.bar = bar;
            this.blah = blah;
        }

        void assertEqualToRecord(GenericRecord record) {
            Assert.assertEquals((int)3, (int)record.getFields().size(), (String)(record.getFields().size() + " fields in found : " + record.getFields()));
            Assert.assertEquals((Object)this.foo, (Object)record.getField("foo"));
            Assert.assertEquals((Object)this.bar, (Object)record.getField("bar"));
            Assert.assertEquals((Object)this.blah, (Object)record.getField("blah"));
        }
    }
}

