package org.apache.paimon.flink.kafka;

import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.Optional;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.flink.AbstractFlinkTableFactory;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.flink.sink.FlinkTableSink;
import org.apache.paimon.flink.source.DataTableSource;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/paimon/flink/kafka/KafkaLogStoreFactoryTest.class */
public class KafkaLogStoreFactoryTest {
    @EnumSource(CoreOptions.StartupMode.class)
    @ParameterizedTest
    public void testCreateKafkaLogStoreFactory(CoreOptions.StartupMode startupMode) {
        HashMap hashMap = new HashMap();
        hashMap.put(FlinkConnectorOptions.LOG_SYSTEM.key(), "kafka");
        hashMap.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
        if (startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT || startupMode == CoreOptions.StartupMode.FROM_SNAPSHOT_FULL) {
            hashMap.put(CoreOptions.SCAN_SNAPSHOT_ID.key(), "1");
        } else if (startupMode == CoreOptions.StartupMode.FROM_TIMESTAMP) {
            hashMap.put(CoreOptions.SCAN_TIMESTAMP_MILLIS.key(), String.valueOf(System.currentTimeMillis()));
        }
        hashMap.put(CoreOptions.SCAN_MODE.key(), startupMode.toString());
        try {
            Optional createOptionalLogStoreFactory = AbstractFlinkTableFactory.createOptionalLogStoreFactory(KafkaLogTestUtils.testContext("table", "", CoreOptions.LogChangelogMode.AUTO, CoreOptions.LogConsistency.TRANSACTIONAL, RowType.of(new LogicalType[]{new IntType(), new IntType()}), new int[]{0}, hashMap));
            Assertions.assertThat(startupMode).isNotIn(new Object[]{CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.StartupMode.FROM_SNAPSHOT_FULL});
            Assertions.assertThat(createOptionalLogStoreFactory.isPresent()).isTrue();
            Assertions.assertThat(createOptionalLogStoreFactory.get()).isInstanceOf(KafkaLogStoreFactory.class);
        } catch (ValidationException e) {
            Assertions.assertThat(startupMode).isIn(new Object[]{CoreOptions.StartupMode.FROM_SNAPSHOT, CoreOptions.StartupMode.FROM_SNAPSHOT_FULL});
        }
    }

    @Test
    public void testInputChangelogProducerWithKafkaLog(@TempDir Path path) throws Exception {
        Options options = new Options();
        options.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        org.apache.paimon.fs.Path path2 = new org.apache.paimon.fs.Path(path.toUri().toString());
        new SchemaManager(LocalFileIO.create(), path2).createTable(new Schema(org.apache.paimon.types.RowType.of(new DataType[]{new org.apache.paimon.types.IntType(), new org.apache.paimon.types.IntType()}).getFields(), Collections.emptyList(), Collections.singletonList("f0"), options.toMap(), ""));
        FileStoreTable create = FileStoreTableFactory.create(LocalFileIO.create(), path2);
        ObjectIdentifier of = ObjectIdentifier.of("c", "d", "t");
        Assertions.assertThat(new DataTableSource(of, create, true, (DynamicTableFactory.Context) null, new KafkaLogStoreFactory()).getChangelogMode()).isEqualTo(ChangelogMode.upsert());
        Assertions.assertThat(new FlinkTableSink(of, create, (DynamicTableFactory.Context) null, (LogStoreTableFactory) null).getChangelogMode(ChangelogMode.all())).isEqualTo(ChangelogMode.all());
    }
}
