package org.apache.flink.table.store.connector;

import java.nio.file.Path;
import java.util.Collections;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.sink.TableStoreSink;
import org.apache.flink.table.store.connector.source.TableStoreSource;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.kafka.KafkaLogStoreFactory;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/table/store/connector/ChangelogModeTest.class */
public class ChangelogModeTest {

    @TempDir
    Path temp;
    private final ObjectIdentifier identifier = ObjectIdentifier.of("c", "d", "t");
    private org.apache.flink.core.fs.Path path;

    @BeforeEach
    public void beforeEach() {
        this.path = new org.apache.flink.core.fs.Path(this.temp.toUri().toString());
    }

    private void test(Configuration configuration, ChangelogMode changelogMode, ChangelogMode changelogMode2) throws Exception {
        test(configuration, changelogMode, changelogMode2, null);
    }

    private void test(Configuration configuration, ChangelogMode changelogMode, ChangelogMode changelogMode2, @Nullable LogStoreTableFactory logStoreTableFactory) throws Exception {
        new SchemaManager(this.path).commitNewVersion(new UpdateSchema(RowType.of(new LogicalType[]{new IntType(), new IntType()}), Collections.emptyList(), Collections.singletonList("f0"), configuration.toMap(), ""));
        FileStoreTable create = FileStoreTableFactory.create(this.path);
        Assertions.assertThat(new TableStoreSource(this.identifier, create, true, (DynamicTableFactory.Context) null, logStoreTableFactory).getChangelogMode()).isEqualTo(changelogMode);
        Assertions.assertThat(new TableStoreSink(this.identifier, create, (DynamicTableFactory.Context) null, (LogStoreTableFactory) null).getChangelogMode(ChangelogMode.all())).isEqualTo(changelogMode2);
    }

    @Test
    public void testDefault() throws Exception {
        test(new Configuration(), ChangelogMode.upsert(), ChangelogMode.upsert());
    }

    @Test
    public void testInputChangelogProducer() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        test(configuration, ChangelogMode.all(), ChangelogMode.all());
    }

    @Test
    public void testChangelogModeAll() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.LOG_CHANGELOG_MODE, CoreOptions.LogChangelogMode.ALL);
        test(configuration, ChangelogMode.all(), ChangelogMode.all());
    }

    @Test
    public void testInputChangelogProducerWithLog() throws Exception {
        Configuration configuration = new Configuration();
        configuration.set(CoreOptions.CHANGELOG_PRODUCER, CoreOptions.ChangelogProducer.INPUT);
        test(configuration, ChangelogMode.upsert(), ChangelogMode.all(), new KafkaLogStoreFactory());
    }
}
