/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.kafka.schema.confluent;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
import io.trino.decoder.DecoderColumnHandle;
import io.trino.decoder.FieldValueProvider;
import io.trino.decoder.RowDecoder;
import io.trino.decoder.avro.AvroBytesDeserializer;
import io.trino.decoder.avro.AvroDeserializer;
import io.trino.decoder.avro.AvroReaderSupplier;
import io.trino.decoder.avro.AvroRowDecoderFactory;
import io.trino.plugin.kafka.KafkaColumnHandle;
import io.trino.plugin.kafka.schema.confluent.ConfluentAvroReaderSupplier;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.IntegerType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarbinaryType;
import io.trino.spi.type.VarcharType;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.testng.Assert;
import org.testng.annotations.Test;

public class TestAvroConfluentRowDecoder {
    private static final String TOPIC = "test";

    @Test
    public void testDecodingRows() throws Exception {
        MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
        Schema initialSchema = (Schema)SchemaBuilder.record((String)TOPIC).fields().name("col1").type().intType().noDefault().name("col2").type().stringType().noDefault().name("col3").type().intType().intDefault(42).name("col4").type().nullable().intType().noDefault().name("col5").type().nullable().bytesType().noDefault().endRecord();
        Schema evolvedSchema = (Schema)((SchemaBuilder.FieldAssembler)SchemaBuilder.record((String)TOPIC).fields().name("col1").type().intType().noDefault().name("col2").type().stringType().noDefault().name("col3").type().intType().intDefault(3).name("col4").type().nullable().intType().noDefault().name("col5").type().nullable().bytesType().noDefault().name("col6").type().optional().longType()).endRecord();
        mockSchemaRegistryClient.register("test-value", initialSchema);
        mockSchemaRegistryClient.register("test-value", evolvedSchema);
        ImmutableSet columnHandles = ImmutableSet.builder().add((Object)new KafkaColumnHandle("col1", (Type)IntegerType.INTEGER, "col1", null, null, false, false, false)).add((Object)new KafkaColumnHandle("col2", (Type)VarcharType.VARCHAR, "col2", null, null, false, false, false)).add((Object)new KafkaColumnHandle("col3", (Type)IntegerType.INTEGER, "col3", null, null, false, false, false)).add((Object)new KafkaColumnHandle("col4", (Type)IntegerType.INTEGER, "col4", null, null, false, false, false)).add((Object)new KafkaColumnHandle("col5", (Type)VarbinaryType.VARBINARY, "col5", null, null, false, false, false)).add((Object)new KafkaColumnHandle("col6", (Type)BigintType.BIGINT, "col6", null, null, false, false, false)).build();
        RowDecoder rowDecoder = TestAvroConfluentRowDecoder.getRowDecoder((SchemaRegistryClient)mockSchemaRegistryClient, (Set<DecoderColumnHandle>)columnHandles, evolvedSchema);
        TestAvroConfluentRowDecoder.testRow(rowDecoder, TestAvroConfluentRowDecoder.generateRecord(initialSchema, Arrays.asList(3, "string-3", 30, 300, ByteBuffer.wrap(new byte[]{1, 2, 3}))), 1);
        TestAvroConfluentRowDecoder.testRow(rowDecoder, TestAvroConfluentRowDecoder.generateRecord(initialSchema, Arrays.asList(3, "", 30, null, null)), 1);
        TestAvroConfluentRowDecoder.testRow(rowDecoder, TestAvroConfluentRowDecoder.generateRecord(initialSchema, Arrays.asList(3, "\u0394\u66f4\u6539", 30, null, ByteBuffer.wrap(new byte[]{1, 2, 3}))), 1);
        TestAvroConfluentRowDecoder.testRow(rowDecoder, TestAvroConfluentRowDecoder.generateRecord(evolvedSchema, Arrays.asList(4, "string-4", 40, 400, null, 4L)), 2);
        TestAvroConfluentRowDecoder.testRow(rowDecoder, TestAvroConfluentRowDecoder.generateRecord(evolvedSchema, Arrays.asList(5, "string-5", 50, 500, ByteBuffer.wrap(new byte[]{1, 2, 3}), null)), 2);
    }

    @Test
    public void testSingleValueRow() throws Exception {
        MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();
        Schema schema = Schema.create((Schema.Type)Schema.Type.LONG);
        mockSchemaRegistryClient.register(String.format("%s-key", TOPIC), schema);
        ImmutableSet columnHandles = ImmutableSet.builder().add((Object)new KafkaColumnHandle("col1", (Type)BigintType.BIGINT, "col1", null, null, false, false, false)).build();
        RowDecoder rowDecoder = TestAvroConfluentRowDecoder.getRowDecoder((SchemaRegistryClient)mockSchemaRegistryClient, (Set<DecoderColumnHandle>)columnHandles, schema);
        TestAvroConfluentRowDecoder.testSingleValueRow(rowDecoder, 3L, schema, 1);
    }

    private static void testRow(RowDecoder rowDecoder, GenericRecord record, int schemaId) {
        byte[] serializedRecord = TestAvroConfluentRowDecoder.serializeRecord(record, record.getSchema(), schemaId);
        Optional decodedRow = rowDecoder.decodeRow(serializedRecord);
        TestAvroConfluentRowDecoder.assertRowsAreEqual(decodedRow, record);
    }

    private static void testSingleValueRow(RowDecoder rowDecoder, Object value, Schema schema, int schemaId) {
        byte[] serializedRecord = TestAvroConfluentRowDecoder.serializeRecord(value, schema, schemaId);
        Optional decodedRow = rowDecoder.decodeRow(serializedRecord);
        Preconditions.checkState((boolean)decodedRow.isPresent(), (Object)"decodedRow is not present");
        Map.Entry entry = (Map.Entry)Iterables.getOnlyElement(((Map)decodedRow.get()).entrySet());
        TestAvroConfluentRowDecoder.assertValuesAreEqual((FieldValueProvider)entry.getValue(), value, schema);
    }

    private static byte[] serializeRecord(Object record, Schema schema, int schemaId) {
        try {
            ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
            outputStream.write(0);
            outputStream.write(ByteBuffer.allocate(4).putInt(schemaId).array());
            BinaryEncoder encoder = EncoderFactory.get().directBinaryEncoder((OutputStream)outputStream, null);
            GenericDatumWriter avroRecordWriter = new GenericDatumWriter(schema);
            avroRecordWriter.write(record, (Encoder)encoder);
            encoder.flush();
            byte[] serializedRecord = outputStream.toByteArray();
            outputStream.close();
            return serializedRecord;
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }

    private static RowDecoder getRowDecoder(SchemaRegistryClient schemaRegistryClient, Set<DecoderColumnHandle> columnHandles, Schema schema) {
        ImmutableMap decoderParams = ImmutableMap.builder().put((Object)"dataSchema", (Object)schema.toString()).buildOrThrow();
        return TestAvroConfluentRowDecoder.getAvroRowDecoderyFactory(schemaRegistryClient).create((Map)decoderParams, columnHandles);
    }

    public static AvroRowDecoderFactory getAvroRowDecoderyFactory(SchemaRegistryClient schemaRegistryClient) {
        return new AvroRowDecoderFactory((AvroReaderSupplier.Factory)new ConfluentAvroReaderSupplier.Factory(schemaRegistryClient), (AvroDeserializer.Factory)new AvroBytesDeserializer.Factory());
    }

    private static void assertRowsAreEqual(Optional<Map<DecoderColumnHandle, FieldValueProvider>> decodedRow, GenericRecord expected) {
        Preconditions.checkState((boolean)decodedRow.isPresent(), (Object)"decoded row is not present");
        for (Map.Entry<DecoderColumnHandle, FieldValueProvider> entry : decodedRow.get().entrySet()) {
            String columnName = entry.getKey().getName();
            if (expected.get(columnName) == null) {
                Assert.assertTrue((boolean)entry.getValue().isNull());
                continue;
            }
            TestAvroConfluentRowDecoder.assertValuesAreEqual(entry.getValue(), expected.get(columnName), expected.getSchema().getField(columnName).schema());
        }
    }

    private static void assertValuesAreEqual(FieldValueProvider actual, Object expected, Schema schema) {
        if (actual.isNull()) {
            Assert.assertNull((Object)expected);
        } else {
            switch (schema.getType()) {
                case INT: 
                case LONG: {
                    Assert.assertEquals((long)actual.getLong(), (long)((Number)expected).longValue());
                    break;
                }
                case STRING: {
                    Assert.assertEquals((Object)actual.getSlice().toStringUtf8(), (Object)expected);
                    break;
                }
                case BYTES: {
                    Assert.assertEquals((Object)actual.getSlice().getBytes(), (Object)((ByteBuffer)expected).array());
                    break;
                }
                case UNION: {
                    Optional<Schema> nonNullSchema = schema.getTypes().stream().filter(type -> type.getType() != Schema.Type.NULL).findFirst();
                    Assert.assertTrue((boolean)nonNullSchema.isPresent());
                    if (expected == null) {
                        expected = ((Schema.Field)Iterables.getOnlyElement((Iterable)schema.getFields())).defaultVal();
                    }
                    TestAvroConfluentRowDecoder.assertValuesAreEqual(actual, expected, nonNullSchema.get());
                    break;
                }
                default: {
                    throw new IllegalStateException("Unexpected type");
                }
            }
        }
    }

    private static GenericRecord generateRecord(Schema schema, List<Object> values) {
        GenericRecordBuilder recordBuilder = new GenericRecordBuilder(schema);
        for (int i = 0; i < values.size(); ++i) {
            recordBuilder.set((Schema.Field)schema.getFields().get(i), values.get(i));
        }
        return recordBuilder.build();
    }
}

