package org.apache.flink.streaming.connectors.kafka.table;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.Consumer;
import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.Catalog;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.SinkFunctionProvider;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestFormatFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.types.DataType;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTestBase.class */
public abstract class KafkaDynamicTableFactoryTestBase extends TestLogger {

    @Rule
    public ExpectedException thrown = ExpectedException.none();
    private static final String TOPIC = "myTopic";
    private static final int PARTITION_0 = 0;
    private static final long OFFSET_0 = 100;
    private static final int PARTITION_1 = 1;
    private static final long OFFSET_1 = 123;
    private static final String NAME = "name";
    private static final String COUNT = "count";
    private static final String TIME = "time";
    private static final String WATERMARK_EXPRESSION = "time - INTERVAL '5' SECOND";
    private static final String COMPUTED_COLUMN_NAME = "computed-column";
    private static final String COMPUTED_COLUMN_EXPRESSION = "count + 1.0";
    private static final String PROPS_SCAN_OFFSETS;
    private static final TableSchema SOURCE_SCHEMA;
    private static final TableSchema SINK_SCHEMA;
    private static final DataType WATERMARK_DATATYPE = DataTypes.TIMESTAMP(3);
    private static final DataType COMPUTED_COLUMN_DATATYPE = DataTypes.DECIMAL(10, 3);
    private static final Properties KAFKA_PROPERTIES = new Properties();

    @Test
    public void testTableSource() {
        DataType physicalRowDataType = SOURCE_SCHEMA.toPhysicalRowDataType();
        HashMap hashMap = new HashMap();
        hashMap.put(new KafkaTopicPartition(TOPIC, PARTITION_0), Long.valueOf(OFFSET_0));
        hashMap.put(new KafkaTopicPartition(TOPIC, PARTITION_1), Long.valueOf(OFFSET_1));
        TestFormatFactory.DecodingFormatMock decodingFormatMock = new TestFormatFactory.DecodingFormatMock(",", true);
        KafkaDynamicSourceBase createTableSource = FactoryUtil.createTableSource((Catalog) null, ObjectIdentifier.of("default", "default", "scanTable"), createKafkaSourceCatalogTable(), new Configuration(), Thread.currentThread().getContextClassLoader());
        KafkaDynamicSourceBase expectedScanSource = getExpectedScanSource(physicalRowDataType, TOPIC, KAFKA_PROPERTIES, decodingFormatMock, StartupMode.SPECIFIC_OFFSETS, hashMap, 0L);
        KafkaDynamicSourceBase kafkaDynamicSourceBase = createTableSource;
        Assert.assertEquals(kafkaDynamicSourceBase, expectedScanSource);
        SourceFunctionProvider scanRuntimeProvider = kafkaDynamicSourceBase.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
        Assert.assertThat(scanRuntimeProvider, CoreMatchers.instanceOf(SourceFunctionProvider.class));
        FlinkKafkaConsumerBase createSourceFunction = scanRuntimeProvider.createSourceFunction();
        Assert.assertThat(createSourceFunction, CoreMatchers.instanceOf(getExpectedConsumerClass()));
        Assert.assertTrue(createSourceFunction.getEnableCommitOnCheckpoints());
    }

    @Test
    public void testTableSourceCommitOnCheckpointsDisabled() {
        ObjectIdentifier of = ObjectIdentifier.of("default", "default", "scanTable");
        Map<String, String> fullSourceOptions = getFullSourceOptions();
        fullSourceOptions.remove("properties.group.id");
        KafkaDynamicSourceBase createTableSource = FactoryUtil.createTableSource((Catalog) null, of, createKafkaSourceCatalogTable(fullSourceOptions), new Configuration(), Thread.currentThread().getContextClassLoader());
        Assert.assertThat(createTableSource, CoreMatchers.instanceOf(KafkaDynamicSourceBase.class));
        SourceFunctionProvider scanRuntimeProvider = createTableSource.getScanRuntimeProvider(ScanRuntimeProviderContext.INSTANCE);
        Assert.assertThat(scanRuntimeProvider, CoreMatchers.instanceOf(SourceFunctionProvider.class));
        Assert.assertFalse(scanRuntimeProvider.createSourceFunction().getEnableCommitOnCheckpoints());
    }

    @Test
    public void testTableSink() {
        DataType physicalRowDataType = SINK_SCHEMA.toPhysicalRowDataType();
        TestFormatFactory.EncodingFormatMock encodingFormatMock = new TestFormatFactory.EncodingFormatMock(",");
        KafkaDynamicSinkBase createTableSink = FactoryUtil.createTableSink((Catalog) null, ObjectIdentifier.of("default", "default", "sinkTable"), createKafkaSinkCatalogTable(), new Configuration(), Thread.currentThread().getContextClassLoader());
        Assert.assertEquals(getExpectedSink(physicalRowDataType, TOPIC, KAFKA_PROPERTIES, Optional.of(new FlinkFixedPartitioner()), encodingFormatMock), createTableSink);
        KafkaDynamicSinkBase kafkaDynamicSinkBase = createTableSink;
        Assert.assertEquals(encodingFormatMock, kafkaDynamicSinkBase.encodingFormat);
        SinkFunctionProvider sinkRuntimeProvider = kafkaDynamicSinkBase.getSinkRuntimeProvider(new SinkRuntimeProviderContext(false));
        Assert.assertThat(sinkRuntimeProvider, CoreMatchers.instanceOf(SinkFunctionProvider.class));
        Assert.assertThat(sinkRuntimeProvider.createSinkFunction(), CoreMatchers.instanceOf(getExpectedProducerClass()));
    }

    @Test
    public void testInvalidScanStartupMode() {
        ObjectIdentifier of = ObjectIdentifier.of("default", "default", "scanTable");
        CatalogTable createKafkaSourceCatalogTable = createKafkaSourceCatalogTable(getModifiedOptions(getFullSourceOptions(), map -> {
            map.put("scan.startup.mode", "abc");
        }));
        this.thrown.expect(ValidationException.class);
        this.thrown.expect(org.apache.flink.util.CoreMatchers.containsCause(new ValidationException("Invalid value for option 'scan.startup.mode'. Supported values are [earliest-offset, latest-offset, group-offsets, specific-offsets, timestamp], but was: abc")));
        FactoryUtil.createTableSource((Catalog) null, of, createKafkaSourceCatalogTable, new Configuration(), Thread.currentThread().getContextClassLoader());
    }

    @Test
    public void testMissingStartupTimestamp() {
        ObjectIdentifier of = ObjectIdentifier.of("default", "default", "scanTable");
        CatalogTable createKafkaSourceCatalogTable = createKafkaSourceCatalogTable(getModifiedOptions(getFullSourceOptions(), map -> {
            map.put("scan.startup.mode", "timestamp");
        }));
        this.thrown.expect(ValidationException.class);
        this.thrown.expect(org.apache.flink.util.CoreMatchers.containsCause(new ValidationException("'scan.startup.timestamp-millis' is required in 'timestamp' startup mode but missing.")));
        FactoryUtil.createTableSource((Catalog) null, of, createKafkaSourceCatalogTable, new Configuration(), Thread.currentThread().getContextClassLoader());
    }

    @Test
    public void testMissingSpecificOffsets() {
        ObjectIdentifier of = ObjectIdentifier.of("default", "default", "scanTable");
        CatalogTable createKafkaSourceCatalogTable = createKafkaSourceCatalogTable(getModifiedOptions(getFullSourceOptions(), map -> {
            map.remove("scan.startup.specific-offsets");
        }));
        this.thrown.expect(ValidationException.class);
        this.thrown.expect(org.apache.flink.util.CoreMatchers.containsCause(new ValidationException("'scan.startup.specific-offsets' is required in 'specific-offsets' startup mode but missing.")));
        FactoryUtil.createTableSource((Catalog) null, of, createKafkaSourceCatalogTable, new Configuration(), Thread.currentThread().getContextClassLoader());
    }

    @Test
    public void testInvalidSinkPartitioner() {
        ObjectIdentifier of = ObjectIdentifier.of("default", "default", "sinkTable");
        CatalogTable createKafkaSinkCatalogTable = createKafkaSinkCatalogTable(getModifiedOptions(getFullSourceOptions(), map -> {
            map.put("sink.partitioner", "abc");
        }));
        this.thrown.expect(ValidationException.class);
        this.thrown.expect(org.apache.flink.util.CoreMatchers.containsCause(new ValidationException("Could not find and instantiate partitioner class 'abc'")));
        FactoryUtil.createTableSink((Catalog) null, of, createKafkaSinkCatalogTable, new Configuration(), Thread.currentThread().getContextClassLoader());
    }

    private CatalogTable createKafkaSourceCatalogTable() {
        return createKafkaSourceCatalogTable(getFullSourceOptions());
    }

    private CatalogTable createKafkaSinkCatalogTable() {
        return createKafkaSinkCatalogTable(getFullSinkOptions());
    }

    private CatalogTable createKafkaSourceCatalogTable(Map<String, String> map) {
        return new CatalogTableImpl(SOURCE_SCHEMA, map, "scanTable");
    }

    private CatalogTable createKafkaSinkCatalogTable(Map<String, String> map) {
        return new CatalogTableImpl(SINK_SCHEMA, map, "sinkTable");
    }

    private static Map<String, String> getModifiedOptions(Map<String, String> map, Consumer<Map<String, String>> consumer) {
        consumer.accept(map);
        return map;
    }

    private Map<String, String> getFullSourceOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", factoryIdentifier());
        hashMap.put("topic", TOPIC);
        hashMap.put("properties.group.id", "dummy");
        hashMap.put("properties.bootstrap.servers", "dummy");
        hashMap.put("scan.startup.mode", "specific-offsets");
        hashMap.put("scan.startup.specific-offsets", PROPS_SCAN_OFFSETS);
        hashMap.put("format", "test-format");
        String format = String.format("%s.%s", "test-format", TestFormatFactory.DELIMITER.key());
        String format2 = String.format("%s.%s", "test-format", TestFormatFactory.FAIL_ON_MISSING.key());
        hashMap.put(format, ",");
        hashMap.put(format2, "true");
        return hashMap;
    }

    private Map<String, String> getFullSinkOptions() {
        HashMap hashMap = new HashMap();
        hashMap.put("connector", factoryIdentifier());
        hashMap.put("topic", TOPIC);
        hashMap.put("properties.group.id", "dummy");
        hashMap.put("properties.bootstrap.servers", "dummy");
        hashMap.put("sink.partitioner", "fixed");
        hashMap.put("format", "test-format");
        hashMap.put(String.format("%s.%s", "test-format", TestFormatFactory.DELIMITER.key()), ",");
        return hashMap;
    }

    protected abstract String factoryIdentifier();

    protected abstract Class<?> getExpectedConsumerClass();

    protected abstract Class<?> getExpectedProducerClass();

    protected abstract KafkaDynamicSourceBase getExpectedScanSource(DataType dataType, String str, Properties properties, DecodingFormat<DeserializationSchema<RowData>> decodingFormat, StartupMode startupMode, Map<KafkaTopicPartition, Long> map, long j);

    protected abstract KafkaDynamicSinkBase getExpectedSink(DataType dataType, String str, Properties properties, Optional<FlinkKafkaPartitioner<RowData>> optional, EncodingFormat<SerializationSchema<RowData>> encodingFormat);

    static {
        KAFKA_PROPERTIES.setProperty("group.id", "dummy");
        KAFKA_PROPERTIES.setProperty("bootstrap.servers", "dummy");
        PROPS_SCAN_OFFSETS = String.format("partition:%d,offset:%d;partition:%d,offset:%d", Integer.valueOf(PARTITION_0), Long.valueOf(OFFSET_0), Integer.valueOf(PARTITION_1), Long.valueOf(OFFSET_1));
        SOURCE_SCHEMA = TableSchema.builder().field(NAME, DataTypes.STRING()).field(COUNT, DataTypes.DECIMAL(38, 18)).field(TIME, DataTypes.TIMESTAMP(3)).field(COMPUTED_COLUMN_NAME, COMPUTED_COLUMN_DATATYPE, COMPUTED_COLUMN_EXPRESSION).watermark(TIME, WATERMARK_EXPRESSION, WATERMARK_DATATYPE).build();
        SINK_SCHEMA = TableSchema.builder().field(NAME, DataTypes.STRING()).field(COUNT, DataTypes.DECIMAL(38, 18)).field(TIME, DataTypes.TIMESTAMP(3)).build();
    }
}
