package org.apache.pulsar.schema.compatibility;

import com.google.common.collect.Sets;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SchemaSerializationException;
import org.apache.pulsar.client.api.schema.SchemaDefinition;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.schema.Schemas;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Test(groups = {"schema"})
/* loaded from: input_file:org/apache/pulsar/schema/compatibility/SchemaCompatibilityCheckTest.class */
public class SchemaCompatibilityCheckTest extends MockedPulsarServiceBaseTest {
    private static final Logger log = LoggerFactory.getLogger(SchemaCompatibilityCheckTest.class);
    private static final String CLUSTER_NAME = "test";

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @BeforeMethod
    public void setup() throws Exception {
        super.internalSetup();
        this.admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(this.pulsar.getWebServiceAddress()).build());
        this.admin.tenants().createTenant("public", TenantInfo.builder().allowedClusters(Collections.singleton("test")).build());
    }

    @Override // org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest
    @AfterMethod(alwaysRun = true)
    public void cleanup() throws Exception {
        super.internalCleanup();
    }

    @DataProvider(name = "CanReadLastSchemaCompatibilityStrategy")
    public Object[] canReadLastSchemaCompatibilityStrategy() {
        return new Object[]{SchemaCompatibilityStrategy.BACKWARD, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE, SchemaCompatibilityStrategy.FORWARD, SchemaCompatibilityStrategy.FULL};
    }

    @DataProvider(name = "ReadAllCheckSchemaCompatibilityStrategy")
    public Object[] readAllCheckSchemaCompatibilityStrategy() {
        return new Object[]{SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE, SchemaCompatibilityStrategy.FULL_TRANSITIVE};
    }

    @DataProvider(name = "AllCheckSchemaCompatibilityStrategy")
    public Object[] allCheckSchemaCompatibilityStrategy() {
        return new Object[]{SchemaCompatibilityStrategy.BACKWARD, SchemaCompatibilityStrategy.FORWARD_TRANSITIVE, SchemaCompatibilityStrategy.FORWARD, SchemaCompatibilityStrategy.FULL, SchemaCompatibilityStrategy.BACKWARD_TRANSITIVE, SchemaCompatibilityStrategy.FULL_TRANSITIVE};
    }

    @Test(dataProvider = "CanReadLastSchemaCompatibilityStrategy")
    public void testConsumerCompatibilityCheckCanReadLastTest(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(topicName, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(topicName, Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonThree.class).build())).subscriptionName("test").topic(new String[]{topicName}).subscribe();
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName).create();
        Schemas.PersonOne personOne = new Schemas.PersonOne();
        personOne.setId(1);
        create.send(personOne);
        Message message = null;
        try {
            message = subscribe.receive();
            message.getValue();
        } catch (Exception e) {
            Assert.assertTrue(e instanceof SchemaSerializationException);
            subscribe.acknowledge(message);
        }
        Producer create2 = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(topicName).create();
        Schemas.PersonTwo personTwo = new Schemas.PersonTwo();
        personTwo.setId(1);
        personTwo.setName("Jerry");
        create2.send(personTwo);
        Message receive = subscribe.receive();
        Schemas.PersonThree personThree = (Schemas.PersonThree) receive.getValue();
        subscribe.acknowledge(receive);
        Assert.assertEquals(personThree.getId(), 1);
        Assert.assertEquals(personThree.getName(), "Jerry");
        subscribe.close();
        create.close();
        create2.close();
    }

    @Test(dataProvider = "ReadAllCheckSchemaCompatibilityStrategy")
    public void testConsumerCompatibilityReadAllCheckTest(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(topicName, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(topicName, Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        try {
            this.pulsarClient.newConsumer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonThree.class).build())).subscriptionName("test").topic(new String[]{topicName}).subscribe();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Unable to read schema"));
        }
    }

    @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
    public void testBrokerAllowAutoUpdateSchemaDisabled(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), SchemaCompatibilityStrategy.UNDEFINED);
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(topicName, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(topicName);
        try {
            producerBuilder.create();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
        }
        this.pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(true);
        Assert.assertTrue(this.admin.namespaces().getPolicies(namespaceName.toString()).is_allow_auto_update_schema.booleanValue());
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).subscriptionName("test").topic(new String[]{topicName});
        Producer create = producerBuilder.create();
        Consumer subscribe = consumerBuilder.subscribe();
        create.send(new Schemas.PersonTwo(2, "Lucy"));
        Message receive = subscribe.receive();
        Schemas.PersonTwo personTwo = (Schemas.PersonTwo) receive.getValue();
        subscribe.acknowledge(receive);
        Assert.assertEquals(personTwo.getId(), 2);
        Assert.assertEquals(personTwo.getName(), "Lucy");
        create.close();
        subscribe.close();
        this.pulsar.getConfig().setAllowAutoUpdateSchemaEnabled(false);
        Producer create2 = producerBuilder.create();
        Consumer subscribe2 = consumerBuilder.subscribe();
        create2.send(new Schemas.PersonTwo(2, "Lucy"));
        Message receive2 = subscribe2.receive();
        Schemas.PersonTwo personTwo2 = (Schemas.PersonTwo) receive2.getValue();
        subscribe2.acknowledge(receive2);
        Assert.assertEquals(personTwo2.getId(), 2);
        Assert.assertEquals(personTwo2.getName(), "Lucy");
        subscribe2.close();
        create2.close();
    }

    @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
    public void testIsAutoUpdateSchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), SchemaCompatibilityStrategy.UNDEFINED);
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(topicName, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
        ProducerBuilder producerBuilder = this.pulsarClient.newProducer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).topic(topicName);
        try {
            producerBuilder.create();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
        }
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), true);
        ConsumerBuilder consumerBuilder = this.pulsarClient.newConsumer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonTwo.class).build())).subscriptionName("test").topic(new String[]{topicName});
        Producer create = producerBuilder.create();
        Consumer subscribe = consumerBuilder.subscribe();
        create.send(new Schemas.PersonTwo(2, "Lucy"));
        Message receive = subscribe.receive();
        Schemas.PersonTwo personTwo = (Schemas.PersonTwo) receive.getValue();
        subscribe.acknowledge(receive);
        Assert.assertEquals(personTwo.getId(), 2);
        Assert.assertEquals(personTwo.getName(), "Lucy");
        create.close();
        subscribe.close();
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
        Producer create2 = producerBuilder.create();
        Consumer subscribe2 = consumerBuilder.subscribe();
        create2.send(new Schemas.PersonTwo(2, "Lucy"));
        Message receive2 = subscribe2.receive();
        Schemas.PersonTwo personTwo2 = (Schemas.PersonTwo) receive2.getValue();
        subscribe2.acknowledge(receive2);
        Assert.assertEquals(personTwo2.getId(), 2);
        Assert.assertEquals(personTwo2.getName(), "Lucy");
        subscribe2.close();
        create2.close();
    }

    @Test
    public void testSchemaComparison() throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-schema-comparison").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        Assert.assertEquals(this.admin.namespaces().getSchemaCompatibilityStrategy(namespaceName.toString()), SchemaCompatibilityStrategy.UNDEFINED);
        byte[] bytes = (new String(Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo().getSchema(), StandardCharsets.UTF_8) + "/n   /n   /n").getBytes();
        this.admin.schemas().createSchema(topicName, SchemaInfo.builder().type(SchemaType.AVRO).schema(bytes).build());
        this.admin.namespaces().setIsAllowAutoUpdateSchema(namespaceName.toString(), false);
        this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName).create().close();
        Assert.assertEquals(bytes, this.admin.schemas().getSchemaInfo(topicName).getSchema());
        try {
            this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonThree.class)).topic(topicName).create();
            Assert.fail();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Schema not found and schema auto updating is disabled."));
        }
    }

    @Test(dataProvider = "AllCheckSchemaCompatibilityStrategy")
    public void testProducerSendWithOldSchemaAndConsumerCanRead(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(topicName, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(topicName, Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        Producer create = this.pulsarClient.newProducer(Schema.AVRO(Schemas.PersonOne.class)).topic(topicName).create();
        Schemas.PersonOne personOne = new Schemas.PersonOne(10);
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonOne.class).build())).subscriptionName("test").topic(new String[]{topicName}).subscribe();
        create.send(personOne);
        Assert.assertEquals(((Schemas.PersonOne) subscribe.receive().getValue()).getId(), 10);
        subscribe.close();
        create.close();
    }

    @Test
    public void testSchemaLedgerAutoRelease() throws Exception {
        String str = "persistent://" + "public/default" + "/tp";
        this.admin.namespaces().createNamespace("public/default", Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy("public/default", SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        for (int i = 0; i < 100; i++) {
            this.pulsarClient.newProducer(Schema.JSON(SchemaDefinition.builder().withJsonDef(String.format("{\n\t\"type\": \"record\",\n\t\"name\": \"Test_Pojo\",\n\t\"namespace\": \"org.apache.pulsar.schema.compatibility\",\n\t\"fields\": [{\n\t\t\"name\": \"prop_%s\",\n\t\t\"type\": [\"null\", \"string\"],\n\t\t\"default\": null\n\t}]\n}\n", Integer.valueOf(i))).build())).topic(str).create().close();
        }
        Assert.assertTrue(((List) this.mockBookKeeper.getLedgerMap().values().stream().filter(pulsarMockLedgerHandle -> {
            return !pulsarMockLedgerHandle.isFenced();
        }).collect(Collectors.toList())).size() < 20);
        this.admin.topics().delete(str, true);
    }

    @Test
    public void testAutoProduceSchemaAlwaysCompatible() throws Exception {
        String str = "topic" + randomName(16);
        String str2 = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str2, str).toString();
        NamespaceName namespaceName = NamespaceName.get("public", str2);
        this.admin.namespaces().createNamespace("public/" + str2, Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        Producer create = this.pulsarClient.newProducer(Schema.AUTO_PRODUCE_BYTES()).topic(topicName).create();
        Consumer subscribe = this.pulsarClient.newConsumer(Schema.STRING).subscriptionName("my-sub").topic(new String[]{topicName}).subscribe();
        create.close();
        subscribe.close();
    }

    @Test(dataProvider = "CanReadLastSchemaCompatibilityStrategy")
    public void testConsumerWithNotCompatibilitySchema(SchemaCompatibilityStrategy schemaCompatibilityStrategy) throws Exception {
        String str = "test-namespace-" + randomName(16);
        String topicName = TopicName.get(TopicDomain.persistent.value(), "public", str, "test-consumer-compatibility").toString();
        NamespaceName namespaceName = NamespaceName.get("public", str);
        this.admin.namespaces().createNamespace("public/" + str, Sets.newHashSet(new String[]{"test"}));
        this.admin.namespaces().setSchemaCompatibilityStrategy(namespaceName.toString(), schemaCompatibilityStrategy);
        this.admin.schemas().createSchema(topicName, Schema.AVRO(Schemas.PersonOne.class).getSchemaInfo());
        this.admin.schemas().createSchema(topicName, Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withPojo(Schemas.PersonTwo.class).build()).getSchemaInfo());
        try {
            this.pulsarClient.newConsumer(Schema.AVRO(SchemaDefinition.builder().withAlwaysAllowNull(false).withSupportSchemaVersioning(true).withPojo(Schemas.PersonFour.class).build())).subscriptionName("test").topic(new String[]{topicName}).subscribe();
        } catch (Exception e) {
            Assert.assertTrue(e.getMessage().contains("Unable to read schema"));
        }
    }

    public static String randomName(int i) {
        StringBuilder sb = new StringBuilder();
        for (int i2 = 0; i2 < i; i2++) {
            sb.append((char) (ThreadLocalRandom.current().nextInt(26) + 97));
        }
        return sb.toString();
    }
}
