/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.kafka.schema.confluent;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import io.confluent.kafka.serializers.subject.RecordNameStrategy;
import io.confluent.kafka.serializers.subject.TopicRecordNameStrategy;
import io.trino.plugin.kafka.schema.confluent.KafkaWithConfluentSchemaRegistryQueryRunner;
import io.trino.sql.query.QueryAssertions;
import io.trino.testing.AbstractTestQueryFramework;
import io.trino.testing.QueryRunner;
import io.trino.testing.kafka.TestingKafka;
import io.trino.testing.sql.TestTable;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Objects;
import java.util.stream.IntStream;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.Policy;
import net.jodah.failsafe.RetryPolicy;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.LongSerializer;
import org.assertj.core.api.Assertions;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(singleThreaded=true)
public class TestKafkaWithConfluentSchemaRegistryMinimalFunctionality
extends AbstractTestQueryFramework {
    private static final String RECORD_NAME = "test_record";
    private static final int MESSAGE_COUNT = 100;
    private static final Schema INITIAL_SCHEMA = (Schema)SchemaBuilder.record((String)"test_record").fields().name("col_1").type().longType().noDefault().name("col_2").type().stringType().noDefault().endRecord();
    private static final Schema EVOLVED_SCHEMA = (Schema)((SchemaBuilder.FieldAssembler)SchemaBuilder.record((String)"test_record").fields().name("col_1").type().longType().noDefault().name("col_2").type().stringType().noDefault().name("col_3").type().optional().doubleType()).endRecord();
    private TestingKafka testingKafka;

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = (TestingKafka)this.closeAfterClass((AutoCloseable)TestingKafka.createWithSchemaRegistry());
        return KafkaWithConfluentSchemaRegistryQueryRunner.builder(this.testingKafka).setExtraKafkaProperties((Map<String, String>)ImmutableMap.builder().put((Object)"kafka.confluent-subjects-cache-refresh-interval", (Object)"1ms").buildOrThrow()).build();
    }

    @Test
    public void testBasicTopic() {
        String topic = "topic-basic-MixedCase-" + TestTable.randomTableSuffix();
        this.assertTopic(this.testingKafka, topic, String.format("SELECT col_1, col_2 FROM %s", TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topic)), String.format("SELECT col_1, col_2, col_3 FROM %s", TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topic)), false, (Map<String, String>)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.schemaRegistryAwareProducer(this.testingKafka).put((Object)"key.serializer", (Object)LongSerializer.class.getName()).put((Object)"value.serializer", (Object)KafkaAvroSerializer.class.getName()).buildOrThrow());
    }

    @Test
    public void testTopicWithKeySubject() {
        String topic = "topic-Key-Subject-" + TestTable.randomTableSuffix();
        this.assertTopic(this.testingKafka, topic, String.format("SELECT \"%s-key\", col_1, col_2 FROM %s", topic, TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topic)), String.format("SELECT \"%s-key\", col_1, col_2, col_3 FROM %s", topic, TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topic)), true, (Map<String, String>)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.schemaRegistryAwareProducer(this.testingKafka).put((Object)"key.serializer", (Object)KafkaAvroSerializer.class.getName()).put((Object)"value.serializer", (Object)KafkaAvroSerializer.class.getName()).buildOrThrow());
    }

    @Test
    public void testTopicWithRecordNameStrategy() {
        String topic = "topic-Record-Name-Strategy-" + TestTable.randomTableSuffix();
        this.assertTopic(this.testingKafka, topic, String.format("SELECT \"%1$s-key\", col_1, col_2 FROM \"%1$s&value-subject=%2$s\"", topic, RECORD_NAME), String.format("SELECT \"%1$s-key\", col_1, col_2, col_3 FROM \"%1$s&value-subject=%2$s\"", topic, RECORD_NAME), true, (Map<String, String>)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.schemaRegistryAwareProducer(this.testingKafka).put((Object)"key.serializer", (Object)KafkaAvroSerializer.class.getName()).put((Object)"value.serializer", (Object)KafkaAvroSerializer.class.getName()).put((Object)"value.subject.name.strategy", (Object)RecordNameStrategy.class.getName()).buildOrThrow());
    }

    @Test
    public void testTopicWithTopicRecordNameStrategy() {
        String topic = "topic-Topic-Record-Name-Strategy-" + TestTable.randomTableSuffix();
        this.assertTopic(this.testingKafka, topic, String.format("SELECT \"%1$s-key\", col_1, col_2 FROM \"%1$s&value-subject=%1$s-%2$s\"", topic, RECORD_NAME), String.format("SELECT \"%1$s-key\", col_1, col_2, col_3 FROM \"%1$s&value-subject=%1$s-%2$s\"", topic, RECORD_NAME), true, (Map<String, String>)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.schemaRegistryAwareProducer(this.testingKafka).put((Object)"key.serializer", (Object)KafkaAvroSerializer.class.getName()).put((Object)"value.serializer", (Object)KafkaAvroSerializer.class.getName()).put((Object)"value.subject.name.strategy", (Object)TopicRecordNameStrategy.class.getName()).buildOrThrow());
    }

    @Test
    public void testUnsupportedInsert() {
        String topicName = "topic-unsupported-insert-" + TestTable.randomTableSuffix();
        this.assertNotExists(topicName);
        List<ProducerRecord<Long, GenericRecord>> messages = TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.createMessages(topicName, 100, true);
        this.testingKafka.sendMessages(messages.stream(), (Map)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.schemaRegistryAwareProducer(this.testingKafka).put((Object)"key.serializer", (Object)KafkaAvroSerializer.class.getName()).put((Object)"value.serializer", (Object)KafkaAvroSerializer.class.getName()).buildOrThrow());
        this.waitUntilTableExists(topicName);
        Assertions.assertThatThrownBy(() -> this.getQueryRunner().execute(String.format("INSERT INTO %s VALUES(0, 0, '')", TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topicName)))).hasMessage("Insert not supported");
    }

    @Test
    public void testUnsupportedFormat() {
        String topicName = "topic-unsupported-format-" + TestTable.randomTableSuffix();
        this.assertNotExists(topicName);
        this.testingKafka.sendMessages(IntStream.range(0, 100).mapToObj(id -> new ProducerRecord(topicName, (Object)id, (Object)new JsonValue(id, "value_" + id))), (Map)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.schemaRegistryAwareProducer(this.testingKafka).put((Object)"key.serializer", (Object)LongSerializer.class.getName()).put((Object)"value.serializer", (Object)KafkaJsonSchemaSerializer.class.getName()).buildOrThrow());
        Assert.assertTrue((boolean)this.tableExists(topicName));
        String errorMessage = "Not supported schema: JSON";
        Assertions.assertThatThrownBy(() -> this.getQueryRunner().execute("SHOW COLUMNS FROM " + TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topicName))).hasMessage(errorMessage);
        Assertions.assertThatThrownBy(() -> this.getQueryRunner().execute("SELECT * FROM " + TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topicName))).hasMessage(errorMessage);
        Assertions.assertThatThrownBy(() -> this.getQueryRunner().execute(String.format("INSERT INTO %s VALUES(0, 0, '')", TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(topicName)))).hasMessage(errorMessage);
    }

    private static ImmutableMap.Builder<String, String> schemaRegistryAwareProducer(TestingKafka testingKafka) {
        return ImmutableMap.builder().put((Object)"schema.registry.url", (Object)testingKafka.getSchemaRegistryConnectString());
    }

    private void assertTopic(TestingKafka testingKafka, String topicName, String initialQuery, String evolvedQuery, boolean isKeyIncluded, Map<String, String> producerConfig) {
        this.assertNotExists(topicName);
        List<ProducerRecord<Long, GenericRecord>> messages = TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.createMessages(topicName, 100, true);
        testingKafka.sendMessages(messages.stream(), producerConfig);
        this.waitUntilTableExists(topicName);
        this.assertCount(topicName, 100L);
        QueryAssertions queryAssertions = new QueryAssertions(this.getQueryRunner());
        ((QueryAssertions.QueryAssert)queryAssertions.query(initialQuery).assertThat()).containsAll(TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.getExpectedValues(messages, INITIAL_SCHEMA, isKeyIncluded));
        List<ProducerRecord<Long, GenericRecord>> newMessages = TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.createMessages(topicName, 100, false);
        testingKafka.sendMessages(newMessages.stream(), producerConfig);
        ImmutableList allMessages = ImmutableList.builder().addAll(messages).addAll(newMessages).build();
        this.assertCount(topicName, allMessages.size());
        ((QueryAssertions.QueryAssert)queryAssertions.query(evolvedQuery).assertThat()).containsAll(TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.getExpectedValues(messages, EVOLVED_SCHEMA, isKeyIncluded));
    }

    private static String getExpectedValues(List<ProducerRecord<Long, GenericRecord>> messages, Schema schema, boolean isKeyIncluded) {
        StringBuilder valuesBuilder = new StringBuilder("VALUES ");
        ImmutableList.Builder rowsBuilder = ImmutableList.builder();
        for (ProducerRecord<Long, GenericRecord> message : messages) {
            ImmutableList.Builder columnsBuilder = ImmutableList.builder();
            if (isKeyIncluded) {
                columnsBuilder.add((Object)String.format("CAST(%s as bigint)", message.key()));
            }
            TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.addExpectedColumns(schema, (GenericRecord)message.value(), (ImmutableList.Builder<String>)columnsBuilder);
            rowsBuilder.add((Object)String.format("(%s)", String.join((CharSequence)", ", (Iterable<? extends CharSequence>)columnsBuilder.build())));
        }
        valuesBuilder.append(String.join((CharSequence)", ", (Iterable<? extends CharSequence>)rowsBuilder.build()));
        return valuesBuilder.toString();
    }

    private static void addExpectedColumns(Schema schema, GenericRecord record, ImmutableList.Builder<String> columnsBuilder) {
        for (Schema.Field field : schema.getFields()) {
            Object value = record.get(field.name());
            if (value == null && field.schema().getType().equals((Object)Schema.Type.UNION) && field.schema().getTypes().contains(Schema.create((Schema.Type)Schema.Type.NULL))) {
                if (field.schema().getTypes().contains(Schema.create((Schema.Type)Schema.Type.DOUBLE))) {
                    columnsBuilder.add((Object)"CAST(null AS double)");
                    continue;
                }
                throw new IllegalArgumentException("Unsupported field: " + field);
            }
            if (field.schema().getType().equals((Object)Schema.Type.STRING)) {
                columnsBuilder.add((Object)String.format("VARCHAR '%s'", value));
                continue;
            }
            if (field.schema().getType().equals((Object)Schema.Type.LONG)) {
                columnsBuilder.add((Object)String.format("CAST(%s AS bigint)", value));
                continue;
            }
            throw new IllegalArgumentException("Unsupported field: " + field);
        }
    }

    private void assertNotExists(String tableName) {
        if (this.schemaExists()) {
            Assertions.assertThat((int)this.getQueryRunner().execute("SHOW TABLES LIKE " + TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toSingleQuoted(tableName)).getRowCount()).isZero();
        }
    }

    private void waitUntilTableExists(String tableName) {
        Failsafe.with((Policy[])new RetryPolicy[]{new RetryPolicy().withMaxAttempts(10).withDelay(Duration.ofMillis(100L))}).run(() -> Assert.assertTrue((boolean)this.schemaExists()));
        Failsafe.with((Policy[])new RetryPolicy[]{new RetryPolicy().withMaxAttempts(10).withDelay(Duration.ofMillis(100L))}).run(() -> Assert.assertTrue((boolean)this.tableExists(tableName)));
    }

    private boolean schemaExists() {
        return this.getQueryRunner().execute(String.format("SHOW SCHEMAS FROM %s LIKE '%s'", this.getSession().getCatalog().orElseThrow(), this.getSession().getSchema().orElseThrow())).getRowCount() == 1;
    }

    private boolean tableExists(String tableName) {
        return this.getQueryRunner().execute(String.format("SHOW TABLES LIKE '%s'", tableName.toLowerCase(Locale.ENGLISH))).getRowCount() == 1;
    }

    private void assertCount(String tableName, long count) {
        Assertions.assertThat((Object)this.getQueryRunner().execute("SELECT count(*) FROM " + TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.toDoubleQuoted(tableName)).getOnlyValue()).isEqualTo((Object)count);
    }

    private static String toDoubleQuoted(String tableName) {
        return String.format("\"%s\"", tableName);
    }

    private static String toSingleQuoted(Object value) {
        Objects.requireNonNull(value, "value is null");
        return String.format("'%s'", value);
    }

    private static List<ProducerRecord<Long, GenericRecord>> createMessages(String topicName, int messageCount, boolean useInitialSchema) {
        ImmutableList.Builder producerRecordBuilder = ImmutableList.builder();
        if (useInitialSchema) {
            for (long key = 0L; key < (long)messageCount; ++key) {
                producerRecordBuilder.add((Object)new ProducerRecord(topicName, (Object)key, (Object)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.createRecordWithInitialSchema(key)));
            }
        } else {
            for (long key = 0L; key < (long)messageCount; ++key) {
                producerRecordBuilder.add((Object)new ProducerRecord(topicName, (Object)key, (Object)TestKafkaWithConfluentSchemaRegistryMinimalFunctionality.createRecordWithEvolvedSchema(key)));
            }
        }
        return producerRecordBuilder.build();
    }

    private static GenericRecord createRecordWithInitialSchema(long key) {
        return new GenericRecordBuilder(INITIAL_SCHEMA).set("col_1", (Object)Math.multiplyExact(key, 100)).set("col_2", (Object)String.format("string-%s", key)).build();
    }

    private static GenericRecord createRecordWithEvolvedSchema(long key) {
        return new GenericRecordBuilder(EVOLVED_SCHEMA).set("col_1", (Object)Math.multiplyExact(key, 100)).set("col_2", (Object)String.format("string-%s", key)).set("col_3", (Object)(((double)key + 10.1) / 10.0)).build();
    }

    private static class JsonValue {
        private final int id;
        private final String value;

        @JsonCreator
        public JsonValue(@JsonProperty(value="id") int id, @JsonProperty(value="value") String value) {
            this.id = id;
            this.value = Objects.requireNonNull(value, "value is null");
        }

        @JsonProperty(value="id")
        public int getId() {
            return this.id;
        }

        @JsonProperty(value="value")
        public String getValue() {
            return this.value;
        }
    }
}

