/*
 * Decompiled with CFR 0.152.
 */
package io.trino.plugin.kafka;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.trino.plugin.kafka.KafkaQueryRunner;
import io.trino.plugin.kafka.KafkaTopicDescription;
import io.trino.plugin.kafka.KafkaTopicFieldDescription;
import io.trino.plugin.kafka.KafkaTopicFieldGroup;
import io.trino.plugin.kafka.encoder.json.format.DateTimeFormat;
import io.trino.spi.connector.SchemaTableName;
import io.trino.spi.type.ArrayType;
import io.trino.spi.type.BigintType;
import io.trino.spi.type.BooleanType;
import io.trino.spi.type.DateType;
import io.trino.spi.type.DoubleType;
import io.trino.spi.type.TimeType;
import io.trino.spi.type.TimeWithTimeZoneType;
import io.trino.spi.type.TimestampType;
import io.trino.spi.type.TimestampWithTimeZoneType;
import io.trino.spi.type.Type;
import io.trino.spi.type.VarcharType;
import io.trino.testing.BaseConnectorTest;
import io.trino.testing.DataProviders;
import io.trino.testing.DistributedQueryRunner;
import io.trino.testing.QueryRunner;
import io.trino.testing.TestingConnectorBehavior;
import io.trino.testing.assertions.Assert;
import io.trino.testing.kafka.TestingKafka;
import io.trino.testing.sql.TestTable;
import io.trino.tpch.TpchTable;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.assertj.core.api.Assertions;
import org.testng.SkipException;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

public class TestKafkaConnectorTest
extends BaseConnectorTest {
    private TestingKafka testingKafka;
    private String rawFormatTopic;
    private String headersTopic;
    private static final String JSON_CUSTOM_DATE_TIME_TABLE_NAME = "custom_date_time_table";
    private static final String JSON_ISO8601_TABLE_NAME = "iso8601_table";
    private static final String JSON_RFC2822_TABLE_NAME = "rfc2822_table";
    private static final String JSON_MILLISECONDS_TABLE_NAME = "milliseconds_since_epoch_table";
    private static final String JSON_SECONDS_TABLE_NAME = "seconds_since_epoch_table";
    private static final SchemaTableName TABLE_INSERT_NEGATIVE_DATE = new SchemaTableName("write_test", "test_insert_negative_date_" + TestTable.randomTableSuffix());
    private static final SchemaTableName TABLE_INSERT_CUSTOMER = new SchemaTableName("write_test", "test_insert_customer_" + TestTable.randomTableSuffix());
    private static final SchemaTableName TABLE_INSERT_ARRAY = new SchemaTableName("write_test", "test_insert_array_" + TestTable.randomTableSuffix());
    private static final SchemaTableName TABLE_INSERT_UNICODE_1 = new SchemaTableName("write_test", "test_unicode_1_" + TestTable.randomTableSuffix());
    private static final SchemaTableName TABLE_INSERT_UNICODE_2 = new SchemaTableName("write_test", "test_unicode_2_" + TestTable.randomTableSuffix());
    private static final SchemaTableName TABLE_INSERT_UNICODE_3 = new SchemaTableName("write_test", "test_unicode_3_" + TestTable.randomTableSuffix());
    private static final SchemaTableName TABLE_INSERT_HIGHEST_UNICODE = new SchemaTableName("write_test", "test_highest_unicode_" + TestTable.randomTableSuffix());

    protected QueryRunner createQueryRunner() throws Exception {
        this.testingKafka = (TestingKafka)this.closeAfterClass((AutoCloseable)TestingKafka.create());
        this.rawFormatTopic = "test_raw_" + UUID.randomUUID().toString().replaceAll("-", "_");
        this.headersTopic = "test_header_" + UUID.randomUUID().toString().replaceAll("-", "_");
        ImmutableMap extraTopicDescriptions = ImmutableMap.builder().put((Object)new SchemaTableName("default", this.rawFormatTopic), (Object)TestKafkaConnectorTest.createDescription(this.rawFormatTopic, "default", this.rawFormatTopic, TestKafkaConnectorTest.createFieldGroup("raw", (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("bigint_long", (Type)BigintType.BIGINT, "0", "LONG"), (Object)TestKafkaConnectorTest.createOneFieldDescription("bigint_int", (Type)BigintType.BIGINT, "8", "INT"), (Object)TestKafkaConnectorTest.createOneFieldDescription("bigint_short", (Type)BigintType.BIGINT, "12", "SHORT"), (Object)TestKafkaConnectorTest.createOneFieldDescription("bigint_byte", (Type)BigintType.BIGINT, "14", "BYTE"), (Object)TestKafkaConnectorTest.createOneFieldDescription("double_double", (Type)DoubleType.DOUBLE, "15", "DOUBLE"), (Object)TestKafkaConnectorTest.createOneFieldDescription("double_float", (Type)DoubleType.DOUBLE, "23", "FLOAT"), (Object)TestKafkaConnectorTest.createOneFieldDescription("varchar_byte", (Type)VarcharType.createVarcharType((int)6), "27:33", "BYTE"), (Object)TestKafkaConnectorTest.createOneFieldDescription("boolean_long", (Type)BooleanType.BOOLEAN, "33", "LONG"), (Object)TestKafkaConnectorTest.createOneFieldDescription("boolean_int", (Type)BooleanType.BOOLEAN, "41", "INT"), (Object)TestKafkaConnectorTest.createOneFieldDescription("boolean_short", (Type)BooleanType.BOOLEAN, "45", "SHORT"), (Object)TestKafkaConnectorTest.createOneFieldDescription("boolean_byte", (Type)BooleanType.BOOLEAN, "47", "BYTE"))))).put((Object)new SchemaTableName("default", this.headersTopic), (Object)new KafkaTopicDescription(this.headersTopic, Optional.empty(), this.headersTopic, Optional.empty(), Optional.empty())).putAll(TestKafkaConnectorTest.createJsonDateTimeTestTopic()).put((Object)TABLE_INSERT_NEGATIVE_DATE, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_NEGATIVE_DATE, TestKafkaConnectorTest.createOneFieldDescription("key", (Type)BigintType.BIGINT), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("dt", (Type)DateType.DATE, DateTimeFormat.ISO8601.toString())))).put((Object)TABLE_INSERT_CUSTOMER, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_CUSTOMER, TestKafkaConnectorTest.createOneFieldDescription("phone", (Type)VarcharType.createVarcharType((int)15)), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("custkey", (Type)BigintType.BIGINT), (Object)TestKafkaConnectorTest.createOneFieldDescription("acctbal", (Type)DoubleType.DOUBLE)))).put((Object)TABLE_INSERT_ARRAY, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_ARRAY, TestKafkaConnectorTest.createOneFieldDescription("a", (Type)new ArrayType((Type)DoubleType.DOUBLE)), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("b", (Type)new ArrayType((Type)DoubleType.DOUBLE))))).put((Object)TABLE_INSERT_UNICODE_1, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_UNICODE_1, TestKafkaConnectorTest.createOneFieldDescription("key", (Type)BigintType.BIGINT), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("test", (Type)VarcharType.createVarcharType((int)50))))).put((Object)TABLE_INSERT_UNICODE_2, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_UNICODE_2, TestKafkaConnectorTest.createOneFieldDescription("key", (Type)BigintType.BIGINT), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("test", (Type)VarcharType.createVarcharType((int)50))))).put((Object)TABLE_INSERT_UNICODE_3, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_UNICODE_3, TestKafkaConnectorTest.createOneFieldDescription("key", (Type)BigintType.BIGINT), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("test", (Type)VarcharType.createVarcharType((int)50))))).put((Object)TABLE_INSERT_HIGHEST_UNICODE, (Object)TestKafkaConnectorTest.createDescription(TABLE_INSERT_HIGHEST_UNICODE, TestKafkaConnectorTest.createOneFieldDescription("key", (Type)BigintType.BIGINT), (List<KafkaTopicFieldDescription>)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("test", (Type)VarcharType.createVarcharType((int)50))))).buildOrThrow();
        DistributedQueryRunner queryRunner = KafkaQueryRunner.builder(this.testingKafka).setTables(TpchTable.getTables()).setExtraTopicDescription((Map<SchemaTableName, KafkaTopicDescription>)extraTopicDescriptions).build();
        return queryRunner;
    }

    protected boolean hasBehavior(TestingConnectorBehavior connectorBehavior) {
        switch (connectorBehavior) {
            case SUPPORTS_ADD_COLUMN: 
            case SUPPORTS_DROP_COLUMN: 
            case SUPPORTS_CREATE_SCHEMA: 
            case SUPPORTS_CREATE_TABLE: 
            case SUPPORTS_CREATE_TABLE_WITH_DATA: 
            case SUPPORTS_DELETE: 
            case SUPPORTS_COMMENT_ON_TABLE: 
            case SUPPORTS_COMMENT_ON_COLUMN: 
            case SUPPORTS_RENAME_TABLE: 
            case SUPPORTS_RENAME_COLUMN: 
            case SUPPORTS_TOPN_PUSHDOWN: {
                return false;
            }
        }
        return super.hasBehavior(connectorBehavior);
    }

    protected TestTable createTableWithDefaultColumns() {
        throw new SkipException("Kafka connector does not support column default values");
    }

    @Test
    public void testColumnReferencedTwice() {
        ByteBuffer buf = ByteBuffer.allocate(48);
        buf.putLong(1234567890123L);
        buf.putInt(123456789);
        buf.putShort((short)12345);
        buf.put((byte)127);
        buf.putDouble(1.23456789123E8);
        buf.putFloat(123456.79f);
        buf.put("abcdef".getBytes(StandardCharsets.UTF_8));
        buf.putLong(1234567890123L);
        buf.putInt(123456789);
        buf.putShort((short)12345);
        buf.put((byte)127);
        this.insertData(this.rawFormatTopic, buf.array());
        this.assertQuery("SELECT bigint_long, bigint_int, bigint_short, bigint_byte, double_double, double_float, varchar_byte, boolean_long, boolean_int, boolean_short, boolean_byte FROM default." + this.rawFormatTopic + " WHERE bigint_long = 1234567890123 AND bigint_int = 123456789 AND bigint_short = 12345 AND bigint_byte = 127 AND double_double = 123456789.123 AND double_float != 1.0 AND varchar_byte = 'abcdef' AND boolean_long = TRUE AND boolean_int = TRUE AND boolean_short = TRUE AND boolean_byte = TRUE", "VALUES (1234567890123, 123456789, 12345, 127, 123456789.123, 123456.789, 'abcdef', TRUE, TRUE, TRUE, TRUE)");
        this.assertQuery("SELECT bigint_long, bigint_int, bigint_short, bigint_byte, double_double, double_float, varchar_byte, boolean_long, boolean_int, boolean_short, boolean_byte FROM default." + this.rawFormatTopic + " WHERE bigint_long < 1234567890124 AND bigint_int < 123456790 AND bigint_short < 12346 AND bigint_byte < 128 AND double_double < 123456789.124 AND double_float > 2 AND varchar_byte <= 'abcdef' AND boolean_long != FALSE AND boolean_int != FALSE AND boolean_short != FALSE AND boolean_byte != FALSE", "VALUES (1234567890123, 123456789, 12345, 127, 123456789.123, 123456.789, 'abcdef', TRUE, TRUE, TRUE, TRUE)");
    }

    private void insertData(String topic, byte[] data) {
        this.testingKafka.sendMessages(Stream.of(new ProducerRecord(topic, (Object)data)), this.getProducerProperties());
    }

    private void createMessagesWithHeader(String topicName) {
        this.testingKafka.sendMessages(Stream.of(new ProducerRecord(topicName, null, (Object)"1".getBytes(StandardCharsets.UTF_8)), new ProducerRecord(topicName, null, (Object)"2".getBytes(StandardCharsets.UTF_8)), TestKafkaConnectorTest.setHeader(new ProducerRecord(topicName, null, (Object)"3".getBytes(StandardCharsets.UTF_8)), "notfoo", "some value"), TestKafkaConnectorTest.setHeader(TestKafkaConnectorTest.setHeader(TestKafkaConnectorTest.setHeader(new ProducerRecord(topicName, null, (Object)"4".getBytes(StandardCharsets.UTF_8)), "foo", "bar"), "foo", null), "foo", "baz")), this.getProducerProperties());
    }

    private static <K, V> ProducerRecord<K, V> setHeader(ProducerRecord<K, V> record, String key, String value) {
        record.headers().add(key, value != null ? value.getBytes(StandardCharsets.UTF_8) : null);
        return record;
    }

    private Map<String, String> getProducerProperties() {
        return ImmutableMap.builder().put((Object)"bootstrap.servers", (Object)this.testingKafka.getConnectString()).put((Object)"acks", (Object)"all").put((Object)"key.serializer", (Object)ByteArraySerializer.class.getName()).put((Object)"value.serializer", (Object)ByteArraySerializer.class.getName()).buildOrThrow();
    }

    @Test
    public void testReadAllDataTypes() {
        String json = "{\"j_varchar\"                              : \"ala ma kota\"                    ,\"j_bigint\"                               : \"9223372036854775807\"            ,\"j_integer\"                              : \"2147483647\"                     ,\"j_smallint\"                             : \"32767\"                          ,\"j_tinyint\"                              : \"127\"                            ,\"j_double\"                               : \"1234567890.123456789\"           ,\"j_boolean\"                              : \"true\"                           ,\"j_timestamp_milliseconds_since_epoch\"   : \"1518182116000\"                  ,\"j_timestamp_seconds_since_epoch\"        : \"1518182117\"                     ,\"j_timestamp_iso8601\"                    : \"2018-02-09T13:15:18\"            ,\"j_timestamp_rfc2822\"                    : \"Fri Feb 09 13:15:19 Z 2018\"     ,\"j_timestamp_custom\"                     : \"02/2018/09 13:15:20\"            ,\"j_date_iso8601\"                         : \"2018-02-11\"                     ,\"j_date_custom\"                          : \"2018/13/02\"                     ,\"j_time_milliseconds_since_epoch\"        : \"47716000\"                       ,\"j_time_seconds_since_epoch\"             : \"47717\"                          ,\"j_time_iso8601\"                         : \"13:15:18\"                       ,\"j_time_custom\"                          : \"15:13:20\"                       ,\"j_timestamptz_milliseconds_since_epoch\" : \"1518182116000\"                  ,\"j_timestamptz_seconds_since_epoch\"      : \"1518182117\"                     ,\"j_timestamptz_iso8601\"                  : \"2018-02-09T13:15:18Z\"           ,\"j_timestamptz_rfc2822\"                  : \"Fri Feb 09 13:15:19 Z 2018\"     ,\"j_timestamptz_custom\"                   : \"02/2018/09 13:15:20\"            ,\"j_timetz_milliseconds_since_epoch\"      : \"47716000\"                       ,\"j_timetz_seconds_since_epoch\"           : \"47717\"                          ,\"j_timetz_iso8601\"                       : \"13:15:18+00:00\"                 ,\"j_timetz_custom\"                        : \"15:13:20\"                       }";
        this.insertData("read_test.all_datatypes_json", json.getBytes(StandardCharsets.UTF_8));
        this.assertQuery("SELECT   c_varchar , c_bigint , c_integer , c_smallint , c_tinyint , c_double , c_boolean , c_timestamp_milliseconds_since_epoch , c_timestamp_seconds_since_epoch , c_timestamp_iso8601 , c_timestamp_rfc2822 , c_timestamp_custom , c_date_iso8601 , c_date_custom , c_time_milliseconds_since_epoch , c_time_seconds_since_epoch , c_time_iso8601 , c_time_custom , cast(c_timestamptz_milliseconds_since_epoch as VARCHAR) , cast(c_timestamptz_seconds_since_epoch as VARCHAR) , cast(c_timestamptz_iso8601 as VARCHAR) , cast(c_timestamptz_rfc2822 as VARCHAR) , cast(c_timestamptz_custom as VARCHAR) , cast(c_timetz_milliseconds_since_epoch as VARCHAR) , cast(c_timetz_seconds_since_epoch as VARCHAR) , cast(c_timetz_iso8601 as VARCHAR) , cast(c_timetz_custom as VARCHAR) FROM read_test.all_datatypes_json ", "VALUES (  'ala ma kota', 9223372036854775807, 2147483647, 32767, 127, 1234567890.123456789, true, TIMESTAMP '2018-02-09 13:15:16', TIMESTAMP '2018-02-09 13:15:17', TIMESTAMP '2018-02-09 13:15:18', TIMESTAMP '2018-02-09 13:15:19', TIMESTAMP '2018-02-09 13:15:20', DATE '2018-02-11', DATE '2018-02-13', TIME '13:15:16', TIME '13:15:17', TIME '13:15:18', TIME '13:15:20', '2018-02-09 13:15:16.000 UTC', '2018-02-09 13:15:17.000 UTC', '2018-02-09 13:15:18.000 UTC', '2018-02-09 13:15:19.000 UTC', '2018-02-09 13:15:20.000 UTC', '13:15:16.000+00:00', '13:15:17.000+00:00', '13:15:18.000+00:00', '13:15:20.000+00:00')");
    }

    @Test
    public void testInsert() {
        org.testng.Assert.assertFalse((boolean)this.hasBehavior(TestingConnectorBehavior.SUPPORTS_CREATE_TABLE_WITH_DATA));
        String query = "SELECT phone, custkey, acctbal FROM customer";
        this.assertQuery("SELECT count(*) FROM " + TABLE_INSERT_CUSTOMER, "SELECT 0");
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " " + query, "SELECT count(*) FROM customer");
        this.assertQuery("SELECT * FROM " + TABLE_INSERT_CUSTOMER, query);
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey) VALUES (-1)", 1L);
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey) VALUES (null)", 1L);
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (phone) VALUES ('3283-2001-01-01')", 1L);
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey, phone) VALUES (-2, '3283-2001-01-02')", 1L);
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (phone, custkey) VALUES ('3283-2001-01-03', -3)", 1L);
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (acctbal) VALUES (1234)", 1L);
        this.assertQuery("SELECT * FROM " + TABLE_INSERT_CUSTOMER, query + " UNION ALL SELECT null, -1, null UNION ALL SELECT null, null, null UNION ALL SELECT '3283-2001-01-01', null, null UNION ALL SELECT '3283-2001-01-02', -2, null UNION ALL SELECT '3283-2001-01-03', -3, null UNION ALL SELECT null, null, 1234");
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_CUSTOMER + " (custkey, phone, acctbal) SELECT custkey, phone, acctbal FROM customer UNION ALL SELECT custkey, phone, acctbal FROM customer", "SELECT 2 * count(*) FROM customer");
    }

    @Test
    public void testInsertNegativeDate() {
        this.assertQueryReturnsEmptyResult("SELECT dt FROM " + TABLE_INSERT_NEGATIVE_DATE);
        this.assertUpdate(String.format("INSERT INTO %s (dt) VALUES (DATE '-0001-01-01')", TABLE_INSERT_NEGATIVE_DATE), 1L);
        this.assertQuery("SELECT dt FROM " + TABLE_INSERT_NEGATIVE_DATE, "VALUES date '-0001-01-01'");
        this.assertQuery(String.format("SELECT dt FROM %s WHERE dt = date '-0001-01-01'", TABLE_INSERT_NEGATIVE_DATE), "VALUES date '-0001-01-01'");
    }

    @Test
    public void testInsertArray() {
        Assertions.assertThatThrownBy(() -> this.query("INSERT INTO " + TABLE_INSERT_ARRAY + " (a) VALUES (ARRAY[null])")).hasMessage("Unsupported column type 'array(double)' for column 'a'");
        throw new SkipException("not supported");
    }

    @Test
    public void testInsertUnicode() {
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_UNICODE_1 + "(test) VALUES 'Hello', U&'hello\\6d4B\\8Bd5world\\7F16\\7801' ", 2L);
        Assertions.assertThat((Iterable)this.computeActual("SELECT test FROM " + TABLE_INSERT_UNICODE_1).getOnlyColumnAsSet()).containsExactlyInAnyOrder(new Object[]{"Hello", "hello\u6d4b\u8bd5world\u7f16\u7801"});
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_UNICODE_2 + "(test) VALUES 'aa', 'b\u00e9'", 2L);
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2, "VALUES 'aa', 'b\u00e9'");
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test = 'aa'", "VALUES 'aa'");
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test > 'ba'", "VALUES 'b\u00e9'");
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test < 'ba'", "VALUES 'aa'");
        this.assertQueryReturnsEmptyResult("SELECT test FROM " + TABLE_INSERT_UNICODE_2 + " WHERE test = 'ba'");
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_UNICODE_3 + "(test) VALUES 'a', '\u00e9'", 2L);
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3, "VALUES 'a', '\u00e9'");
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test = 'a'", "VALUES 'a'");
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test > 'b'", "VALUES '\u00e9'");
        this.assertQuery("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test < 'b'", "VALUES 'a'");
        this.assertQueryReturnsEmptyResult("SELECT test FROM " + TABLE_INSERT_UNICODE_3 + " WHERE test = 'b'");
    }

    @Test
    public void testInsertHighestUnicodeCharacter() {
        this.assertUpdate("INSERT INTO " + TABLE_INSERT_HIGHEST_UNICODE + "(test) VALUES 'Hello', U&'hello\\6d4B\\8Bd5\\+10FFFFworld\\7F16\\7801' ", 2L);
        Assertions.assertThat((Iterable)this.computeActual("SELECT test FROM " + TABLE_INSERT_HIGHEST_UNICODE).getOnlyColumnAsSet()).containsExactlyInAnyOrder(new Object[]{"Hello", "hello\u6d4b\u8bd5\udbff\udfffworld\u7f16\u7801"});
    }

    public void testInsertRowConcurrently() {
        throw new SkipException("TODO Prepare a topic in Kafka and enable this test");
    }

    private static KafkaTopicDescription createDescription(SchemaTableName schemaTableName, KafkaTopicFieldDescription key, List<KafkaTopicFieldDescription> fields) {
        return new KafkaTopicDescription(schemaTableName.getTableName(), Optional.of(schemaTableName.getSchemaName()), schemaTableName.getTableName(), Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), (List)ImmutableList.of((Object)key))), Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), fields)));
    }

    private static KafkaTopicDescription createDescription(String name, String schema, String topic, Optional<KafkaTopicFieldGroup> message) {
        return new KafkaTopicDescription(name, Optional.of(schema), topic, Optional.empty(), message);
    }

    private static Optional<KafkaTopicFieldGroup> createFieldGroup(String dataFormat, List<KafkaTopicFieldDescription> fields) {
        return Optional.of(new KafkaTopicFieldGroup(dataFormat, Optional.empty(), Optional.empty(), fields));
    }

    private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type) {
        return new KafkaTopicFieldDescription(name, type, name, null, null, null, false);
    }

    private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat) {
        return new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false);
    }

    private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String dataFormat, Optional<String> formatHint) {
        return formatHint.map(s -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, s, false)).orElseGet(() -> new KafkaTopicFieldDescription(name, type, name, null, dataFormat, null, false));
    }

    private static KafkaTopicFieldDescription createOneFieldDescription(String name, Type type, String mapping, String dataFormat) {
        return new KafkaTopicFieldDescription(name, type, mapping, null, dataFormat, null, false);
    }

    @Test
    public void testKafkaHeaders() {
        this.createMessagesWithHeader(this.headersTopic);
        this.assertQuery("SELECT _message FROM default." + this.headersTopic + " WHERE cardinality(_headers) = 0", "VALUES ('1'),('2')");
        this.assertQuery("SELECT from_utf8(value) FROM default." + this.headersTopic + " CROSS JOIN UNNEST(_headers['foo']) AS arr (value) WHERE _message = '4'", "VALUES ('bar'), (null), ('baz')");
    }

    @Test(dataProvider="jsonDateTimeFormatsDataProvider")
    public void testJsonDateTimeFormatsRoundTrip(JsonDateTimeTestCase testCase) {
        this.assertUpdate("INSERT into write_test." + testCase.getTopicName() + " (" + testCase.getFieldNames() + ") VALUES " + testCase.getFieldValues(), 1L);
        for (JsonDateTimeTestCase.Field field : testCase.getFields()) {
            Object actual = this.computeScalar("SELECT " + field.getFieldName() + " FROM write_test." + testCase.getTopicName());
            Object expected = this.computeScalar("SELECT " + field.getFieldValue());
            try {
                Assert.assertEquals((Object)actual, (Object)expected, (String)("Equality assertion failed for field: " + field.getFieldName()));
            }
            catch (AssertionError e) {
                throw new AssertionError(String.format("Equality assertion failed for field '%s'\n%s", field.getFieldName(), ((Throwable)((Object)e)).getMessage()), (Throwable)((Object)e));
            }
        }
    }

    @DataProvider
    public static Object[][] jsonDateTimeFormatsDataProvider() {
        return (Object[][])TestKafkaConnectorTest.jsonDateTimeFormatsData().stream().collect(DataProviders.toDataProvider());
    }

    private static List<JsonDateTimeTestCase> jsonDateTimeFormatsData() {
        return ImmutableList.builder().add((Object)JsonDateTimeTestCase.builder().setTopicName(JSON_CUSTOM_DATE_TIME_TABLE_NAME).addField((Type)DateType.DATE, DateTimeFormat.CUSTOM_DATE_TIME.toString(), "yyyy-MM-dd", "DATE '2020-07-15'").addField((Type)TimeType.TIME, DateTimeFormat.CUSTOM_DATE_TIME.toString(), "HH:mm:ss.SSS", "TIME '01:02:03.456'").addField((Type)TimeWithTimeZoneType.TIME_WITH_TIME_ZONE, DateTimeFormat.CUSTOM_DATE_TIME.toString(), "HH:mm:ss.SSS Z", "TIME '01:02:03.456 -04:00'").addField((Type)TimestampType.TIMESTAMP, DateTimeFormat.CUSTOM_DATE_TIME.toString(), "yyyy-dd-MM HH:mm:ss.SSS", "TIMESTAMP '2020-07-15 01:02:03.456'").addField((Type)TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE, DateTimeFormat.CUSTOM_DATE_TIME.toString(), "yyyy-dd-MM HH:mm:ss.SSS Z", "TIMESTAMP '2020-07-15 01:02:03.456 -04:00'").build()).add((Object)JsonDateTimeTestCase.builder().setTopicName(JSON_ISO8601_TABLE_NAME).addField((Type)DateType.DATE, DateTimeFormat.ISO8601.toString(), "DATE '2020-07-15'").addField((Type)TimeType.TIME, DateTimeFormat.ISO8601.toString(), "TIME '01:02:03.456'").addField((Type)TimeWithTimeZoneType.TIME_WITH_TIME_ZONE, DateTimeFormat.ISO8601.toString(), "TIME '01:02:03.456 -04:00'").addField((Type)TimestampType.TIMESTAMP, DateTimeFormat.ISO8601.toString(), "TIMESTAMP '2020-07-15 01:02:03.456'").addField((Type)TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE, DateTimeFormat.ISO8601.toString(), "TIMESTAMP '2020-07-15 01:02:03.456 -04:00'").build()).add((Object)JsonDateTimeTestCase.builder().setTopicName(JSON_RFC2822_TABLE_NAME).addField((Type)TimestampType.TIMESTAMP, DateTimeFormat.RFC2822.toString(), "TIMESTAMP '2020-07-15 01:02:03'").addField((Type)TimestampWithTimeZoneType.TIMESTAMP_WITH_TIME_ZONE, DateTimeFormat.RFC2822.toString(), "TIMESTAMP '2020-07-15 01:02:03 -04:00'").build()).add((Object)JsonDateTimeTestCase.builder().setTopicName(JSON_MILLISECONDS_TABLE_NAME).addField((Type)TimeType.TIME, DateTimeFormat.MILLISECONDS_SINCE_EPOCH.toString(), "TIME '01:02:03.456'").addField((Type)TimestampType.TIMESTAMP, DateTimeFormat.MILLISECONDS_SINCE_EPOCH.toString(), "TIMESTAMP '2020-07-15 01:02:03.456'").build()).add((Object)JsonDateTimeTestCase.builder().setTopicName(JSON_SECONDS_TABLE_NAME).addField((Type)TimeType.TIME, DateTimeFormat.SECONDS_SINCE_EPOCH.toString(), "TIME '01:02:03'").addField((Type)TimestampType.TIMESTAMP, DateTimeFormat.SECONDS_SINCE_EPOCH.toString(), "TIMESTAMP '2020-07-15 01:02:03'").build()).build();
    }

    private static Map<SchemaTableName, KafkaTopicDescription> createJsonDateTimeTestTopic() {
        return (Map)TestKafkaConnectorTest.jsonDateTimeFormatsData().stream().collect(ImmutableMap.toImmutableMap(testCase -> new SchemaTableName("write_test", testCase.getTopicName()), testCase -> new KafkaTopicDescription(testCase.getTopicName(), Optional.of("write_test"), testCase.getTopicName(), Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), (List)ImmutableList.of((Object)TestKafkaConnectorTest.createOneFieldDescription("key", (Type)BigintType.BIGINT, "key", (String)null)))), Optional.of(new KafkaTopicFieldGroup("json", Optional.empty(), Optional.empty(), (List)testCase.getFields().stream().map(field -> TestKafkaConnectorTest.createOneFieldDescription(field.getFieldName(), field.getType(), field.getDataFormat(), field.getFormatHint())).collect(ImmutableList.toImmutableList()))))));
    }

    @Test(dataProvider="roundTripAllFormatsDataProvider")
    public void testRoundTripAllFormats(RoundTripTestCase testCase) {
        this.assertUpdate("INSERT into write_test." + testCase.getTableName() + " (" + testCase.getFieldNames() + ") VALUES " + testCase.getRowValues(), testCase.getNumRows());
        this.assertQuery("SELECT " + testCase.getFieldNames() + " FROM write_test." + testCase.getTableName() + " WHERE f_bigint > 1", "VALUES " + testCase.getRowValues());
    }

    @DataProvider
    public static Object[][] roundTripAllFormatsDataProvider() {
        return (Object[][])TestKafkaConnectorTest.roundTripAllFormatsData().stream().collect(DataProviders.toDataProvider());
    }

    private static List<RoundTripTestCase> roundTripAllFormatsData() {
        return ImmutableList.builder().add((Object)new RoundTripTestCase("all_datatypes_avro", (List<String>)ImmutableList.of((Object)"f_bigint", (Object)"f_float", (Object)"f_double", (Object)"f_boolean", (Object)"f_varchar"), (List<List<Object>>)ImmutableList.of((Object)ImmutableList.of((Object)100000, (Object)Float.valueOf(999.999f), (Object)1000.001, (Object)true, (Object)"'test'"), (Object)ImmutableList.of((Object)123456, (Object)Float.valueOf(-123.456f), (Object)1234.123, (Object)false, (Object)"'abcd'")))).add((Object)new RoundTripTestCase("all_datatypes_csv", (List<String>)ImmutableList.of((Object)"f_bigint", (Object)"f_int", (Object)"f_smallint", (Object)"f_tinyint", (Object)"f_double", (Object)"f_boolean", (Object)"f_varchar"), (List<List<Object>>)ImmutableList.of((Object)ImmutableList.of((Object)100000, (Object)1000, (Object)100, (Object)10, (Object)1000.001, (Object)true, (Object)"'test'"), (Object)ImmutableList.of((Object)123456, (Object)1234, (Object)123, (Object)12, (Object)12345.123, (Object)false, (Object)"'abcd'")))).add((Object)new RoundTripTestCase("all_datatypes_raw", (List<String>)ImmutableList.of((Object)"kafka_key", (Object)"f_varchar", (Object)"f_bigint", (Object)"f_int", (Object)"f_smallint", (Object)"f_tinyint", (Object)"f_double", (Object)"f_boolean"), (List<List<Object>>)ImmutableList.of((Object)ImmutableList.of((Object)1, (Object)"'test'", (Object)100000, (Object)1000, (Object)100, (Object)10, (Object)1000.001, (Object)true), (Object)ImmutableList.of((Object)1, (Object)"'abcd'", (Object)123456, (Object)1234, (Object)123, (Object)12, (Object)12345.123, (Object)false)))).add((Object)new RoundTripTestCase("all_datatypes_json", (List<String>)ImmutableList.of((Object)"f_bigint", (Object)"f_int", (Object)"f_smallint", (Object)"f_tinyint", (Object)"f_double", (Object)"f_boolean", (Object)"f_varchar"), (List<List<Object>>)ImmutableList.of((Object)ImmutableList.of((Object)100000, (Object)1000, (Object)100, (Object)10, (Object)1000.001, (Object)true, (Object)"'test'"), (Object)ImmutableList.of((Object)123748, (Object)1234, (Object)123, (Object)12, (Object)12345.123, (Object)false, (Object)"'abcd'")))).build();
    }

    private static final class RoundTripTestCase {
        private final String tableName;
        private final List<String> fieldNames;
        private final List<List<Object>> rowValues;
        private final int numRows;

        public RoundTripTestCase(String tableName, List<String> fieldNames, List<List<Object>> rowValues) {
            for (List<Object> row : rowValues) {
                Preconditions.checkArgument((fieldNames.size() == row.size() ? 1 : 0) != 0, (Object)"sizes of fieldNames and rowValues are not equal");
            }
            this.tableName = Objects.requireNonNull(tableName, "tableName is null");
            this.fieldNames = ImmutableList.copyOf(fieldNames);
            this.rowValues = ImmutableList.copyOf(rowValues);
            this.numRows = this.rowValues.size();
        }

        public String getTableName() {
            return this.tableName;
        }

        public String getFieldNames() {
            return String.join((CharSequence)", ", this.fieldNames);
        }

        public String getRowValues() {
            CharSequence[] rows = new String[this.numRows];
            for (int i = 0; i < this.numRows; ++i) {
                rows[i] = this.rowValues.get(i).stream().map(Object::toString).collect(Collectors.joining(", ", "(", ")"));
            }
            return String.join((CharSequence)", ", rows);
        }

        public int getNumRows() {
            return this.numRows;
        }

        public String toString() {
            return this.tableName;
        }
    }

    private static final class JsonDateTimeTestCase {
        private final String topicName;
        private final List<Field> fields;

        public JsonDateTimeTestCase(String topicName, List<Field> fields) {
            this.topicName = Objects.requireNonNull(topicName, "topicName is null");
            Objects.requireNonNull(fields, "fields is null");
            this.fields = ImmutableList.copyOf(fields);
        }

        public static Builder builder() {
            return new Builder();
        }

        public String getTopicName() {
            return this.topicName;
        }

        public String getFieldNames() {
            return this.fields.stream().map(Field::getFieldName).collect(Collectors.joining(", "));
        }

        public String getFieldValues() {
            return this.fields.stream().map(Field::getFieldValue).collect(Collectors.joining(", ", "(", ")"));
        }

        public List<Field> getFields() {
            return this.fields;
        }

        public String toString() {
            return this.topicName;
        }

        public static class Field {
            private final String fieldName;
            private final Type type;
            private final String dataFormat;
            private final Optional<String> formatHint;
            private final String fieldValue;

            public Field(String fieldName, Type type, String dataFormat, Optional<String> formatHint, String fieldValue) {
                this.fieldName = Objects.requireNonNull(fieldName, "fieldName is null");
                this.type = Objects.requireNonNull(type, "type is null");
                this.dataFormat = Objects.requireNonNull(dataFormat, "dataFormat is null");
                this.formatHint = Objects.requireNonNull(formatHint, "formatHint is null");
                this.fieldValue = Objects.requireNonNull(fieldValue, "fieldValue is null");
            }

            public String getFieldName() {
                return this.fieldName;
            }

            public Type getType() {
                return this.type;
            }

            public String getDataFormat() {
                return this.dataFormat;
            }

            public Optional<String> getFormatHint() {
                return this.formatHint;
            }

            public String getFieldValue() {
                return this.fieldValue;
            }
        }

        public static class Builder {
            private String topicName = "";
            private final ImmutableList.Builder<Field> fields = ImmutableList.builder();

            public Builder setTopicName(String topicName) {
                this.topicName = topicName;
                return this;
            }

            public Builder addField(Type type, String dataFormat, String fieldValue) {
                String fieldName = Builder.getFieldName(type, dataFormat);
                this.fields.add((Object)new Field(fieldName, type, dataFormat, Optional.empty(), fieldValue));
                return this;
            }

            public Builder addField(Type type, String dataFormat, String formatHint, String fieldValue) {
                String fieldName = Builder.getFieldName(type, dataFormat);
                this.fields.add((Object)new Field(fieldName, type, dataFormat, Optional.of(formatHint), fieldValue));
                return this;
            }

            private static String getFieldName(Type type, String dataFormat) {
                return String.join((CharSequence)"_", dataFormat.replaceAll("-", "_"), type.getDisplayName().replaceAll("\\s|[(]|[)]", "_"));
            }

            public JsonDateTimeTestCase build() {
                return new JsonDateTimeTestCase(this.topicName, (List<Field>)this.fields.build());
            }
        }
    }
}

