/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.kafka.schema.confluent;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.KafkaTopicFieldDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.plugin.kafka.schema.TableDescriptionSupplier;
import io.trino.plugin.kafka.schema.confluent.AvroSchemaParser;
import io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryConfig;
import io.trino.plugin.kafka.schema.confluent.ConfluentSchemaRegistryTableDescriptionSupplier;
import io.trino.plugin.kafka.schema.confluent.ConfluentSessionProperties;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.TestingTypeManager;
import io.trino.spi.type.Type;
import io.trino.spi.type.TypeManager;
import io.trino.spi.type.VarcharType;
import io.trino.testing.TestingConnectorSession;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestConfluentSchemaRegistryTableDescriptionSupplier {
    private static final String DEFAULT_NAME = "tests";
    private static final SchemaRegistryClient SCHEMA_REGISTRY_CLIENT = new MockSchemaRegistryClient();

    @Test
    public void testTopicDescription() throws Exception {
        TableDescriptionSupplier tableDescriptionSupplier = this.createTableDescriptionSupplier();
        String topicName = "simple_topic";
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, topicName);
        String subject = topicName + "-value";
        SCHEMA_REGISTRY_CLIENT.register(subject, TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), ""));
        Assert.assertTrue((boolean)tableDescriptionSupplier.listTables().contains(schemaTableName));
        Assertions.assertThat((Object)this.getKafkaTopicDescription(tableDescriptionSupplier, schemaTableName)).isEqualTo((Object)new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName(), Optional.empty(), Optional.of(TestConfluentSchemaRegistryTableDescriptionSupplier.getTopicFieldGroup(subject, (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("col1", (Type)IntegerType.INTEGER), (Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("col2", (Type)VarcharType.createUnboundedVarcharType()))))));
    }

    @Test
    public void testCaseInsensitiveSubjectMapping() throws Exception {
        TableDescriptionSupplier tableDescriptionSupplier = this.createTableDescriptionSupplier();
        String topicName = "camelCase_Topic";
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, topicName);
        String subject = topicName + "-key";
        SCHEMA_REGISTRY_CLIENT.register(subject, TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), ""));
        Assert.assertTrue((boolean)tableDescriptionSupplier.listTables().contains(schemaTableName));
        Assertions.assertThat((Object)this.getKafkaTopicDescription(tableDescriptionSupplier, schemaTableName)).isEqualTo((Object)new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, Optional.of(TestConfluentSchemaRegistryTableDescriptionSupplier.getTopicFieldGroup(subject, (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("col1", (Type)IntegerType.INTEGER), (Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("col2", (Type)VarcharType.createUnboundedVarcharType())))), Optional.empty()));
    }

    @Test
    public void testAmbiguousSubject() throws Exception {
        TableDescriptionSupplier tableDescriptionSupplier = this.createTableDescriptionSupplier();
        String topicName = "topic_one";
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, topicName);
        SCHEMA_REGISTRY_CLIENT.register(topicName + "-key", TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), ""));
        SCHEMA_REGISTRY_CLIENT.register(topicName.toUpperCase(Locale.ENGLISH) + "-key", TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), ""));
        Assert.assertTrue((boolean)tableDescriptionSupplier.listTables().contains(schemaTableName));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> tableDescriptionSupplier.getTopicDescription((ConnectorSession)TestingConnectorSession.builder().setPropertyMetadata(new ConfluentSessionProperties(new ConfluentSchemaRegistryConfig()).getSessionProperties()).build(), schemaTableName)).isInstanceOf(TrinoException.class)).hasMessage("Unable to access 'topic_one' table. Subject is ambiguous, and may refer to one of the following: TOPIC_ONE, topic_one");
    }

    @Test
    public void testOverriddenSubject() throws Exception {
        TableDescriptionSupplier tableDescriptionSupplier = this.createTableDescriptionSupplier();
        String topicName = "base_Topic";
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, topicName);
        String subject = topicName + "-value";
        SCHEMA_REGISTRY_CLIENT.register(subject, TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), ""));
        String overriddenSubject = "overriddenSubject";
        SCHEMA_REGISTRY_CLIENT.register(overriddenSubject, TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), "overridden_"));
        Assert.assertTrue((boolean)tableDescriptionSupplier.listTables().contains(schemaTableName));
        Assertions.assertThat((Object)this.getKafkaTopicDescription(tableDescriptionSupplier, schemaTableName)).isEqualTo((Object)new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, Optional.empty(), Optional.of(TestConfluentSchemaRegistryTableDescriptionSupplier.getTopicFieldGroup(subject, (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("col1", (Type)IntegerType.INTEGER), (Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("col2", (Type)VarcharType.createUnboundedVarcharType()))))));
        Assertions.assertThat((Object)this.getKafkaTopicDescription(tableDescriptionSupplier, new SchemaTableName(DEFAULT_NAME, String.format("%s&value-subject=%s", topicName, overriddenSubject)))).isEqualTo((Object)new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), topicName, Optional.empty(), Optional.of(TestConfluentSchemaRegistryTableDescriptionSupplier.getTopicFieldGroup(overriddenSubject, (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("overridden_col1", (Type)IntegerType.INTEGER), (Object)TestConfluentSchemaRegistryTableDescriptionSupplier.getFieldDescription("overridden_col2", (Type)VarcharType.createUnboundedVarcharType()))))));
    }

    @Test
    public void testAmbiguousOverriddenSubject() throws Exception {
        TableDescriptionSupplier tableDescriptionSupplier = this.createTableDescriptionSupplier();
        String topicName = "base_Topic";
        SchemaTableName schemaTableName = new SchemaTableName(DEFAULT_NAME, topicName);
        String overriddenSubject = "ambiguousOverriddenSubject";
        SCHEMA_REGISTRY_CLIENT.register(overriddenSubject, TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), "overridden_"));
        SCHEMA_REGISTRY_CLIENT.register(overriddenSubject.toUpperCase(Locale.ENGLISH), TestConfluentSchemaRegistryTableDescriptionSupplier.getAvroSchema(schemaTableName.getTableName(), "overridden_"));
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> tableDescriptionSupplier.getTopicDescription((ConnectorSession)TestingConnectorSession.builder().setPropertyMetadata(new ConfluentSessionProperties(new ConfluentSchemaRegistryConfig()).getSessionProperties()).build(), new SchemaTableName(DEFAULT_NAME, String.format("%s&value-subject=%s", topicName, overriddenSubject)))).isInstanceOf(TrinoException.class)).hasMessage("Subject 'ambiguousoverriddensubject' is ambiguous, and may refer to one of the following: ambiguousOverriddenSubject, AMBIGUOUSOVERRIDDENSUBJECT");
    }

    private KafkaTopicDescription getKafkaTopicDescription(TableDescriptionSupplier tableDescriptionSupplier, SchemaTableName schemaTableName) {
        return (KafkaTopicDescription)tableDescriptionSupplier.getTopicDescription((ConnectorSession)TestingConnectorSession.builder().setPropertyMetadata(new ConfluentSessionProperties(new ConfluentSchemaRegistryConfig()).getSessionProperties()).build(), schemaTableName).orElseThrow();
    }

    private TableDescriptionSupplier createTableDescriptionSupplier() {
        return new ConfluentSchemaRegistryTableDescriptionSupplier(SCHEMA_REGISTRY_CLIENT, (Map)ImmutableMap.of((Object)"AVRO", (Object)new AvroSchemaParser((TypeManager)new TestingTypeManager())), DEFAULT_NAME, new Duration(1.0, TimeUnit.SECONDS));
    }

    private static Schema getAvroSchema(String topicName, String columnNamePrefix) {
        return (Schema)SchemaBuilder.record((String)topicName).fields().name(columnNamePrefix + "col1").type().intType().noDefault().name(columnNamePrefix + "col2").type().stringType().noDefault().endRecord();
    }

    private static KafkaTopicFieldGroup getTopicFieldGroup(String topicName, List<KafkaTopicFieldDescription> fieldDescriptions) {
        return new KafkaTopicFieldGroup("avro", Optional.empty(), Optional.of(topicName), fieldDescriptions);
    }

    private static KafkaTopicFieldDescription getFieldDescription(String name, Type type) {
        return new KafkaTopicFieldDescription(name, type, name, null, null, null, false);
    }
}

