package org.apache.paimon.flink.source;

import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkConnectorOptions;
import org.apache.paimon.flink.PaimonDataStreamScanProvider;
import org.apache.paimon.flink.log.LogStoreTableFactory;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableMap;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.sink.TableWriteImpl;
import org.apache.paimon.table.system.ReadOptimizedTable;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.AssertionsForClassTypes;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/source/DataTableSourceTest.class */
class DataTableSourceTest {

    @TempDir
    Path path;

    DataTableSourceTest() {
    }

    @Test
    void testInferScanParallelism() throws Exception {
        FileStoreTable createTable = createTable(ImmutableMap.of("bucket", "1"));
        writeData(createTable);
        PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(new DataTableSource(ObjectIdentifier.of("cat", "db", "table"), createTable, true, (DynamicTableFactory.Context) null, (LogStoreTableFactory) null));
        AssertionsForClassTypes.assertThat(runtimeProvider.produceDataStream(str -> {
            return Optional.empty();
        }, StreamExecutionEnvironment.createLocalEnvironment()).getParallelism()).isEqualTo(1);
        LocalStreamEnvironment createLocalEnvironment = StreamExecutionEnvironment.createLocalEnvironment(Configuration.fromMap(Collections.singletonMap(String.format("%s%s", "paimon.", FlinkConnectorOptions.INFER_SCAN_PARALLELISM.key()), "false")));
        DataStream produceDataStream = runtimeProvider.produceDataStream(str2 -> {
            return Optional.empty();
        }, createLocalEnvironment);
        AssertionsForClassTypes.assertThat(produceDataStream.getParallelism()).isNotEqualTo(1);
        AssertionsForClassTypes.assertThat(produceDataStream.getParallelism()).isEqualTo(createLocalEnvironment.getParallelism());
    }

    @Test
    public void testInferStreamParallelism() throws Exception {
        AssertionsForClassTypes.assertThat(runtimeProvider(new DataTableSource(ObjectIdentifier.of("cat", "db", "table"), createTable(ImmutableMap.of("bucket", "-1")), true, (DynamicTableFactory.Context) null, (LogStoreTableFactory) null)).produceDataStream(str -> {
            return Optional.empty();
        }, StreamExecutionEnvironment.createLocalEnvironment()).getParallelism()).isEqualTo(1);
    }

    @Test
    public void testSystemTableParallelism() throws Exception {
        PaimonDataStreamScanProvider runtimeProvider = runtimeProvider(new SystemTableSource(new ReadOptimizedTable(createTable(ImmutableMap.of("bucket", "1", "scan.parallelism", "3"))), false, ObjectIdentifier.of("cat", "db", "table")));
        new Configuration().set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
        AssertionsForClassTypes.assertThat(runtimeProvider.produceDataStream(str -> {
            return Optional.empty();
        }, StreamExecutionEnvironment.createLocalEnvironment()).getParallelism()).isEqualTo(3);
    }

    private PaimonDataStreamScanProvider runtimeProvider(FlinkTableSource flinkTableSource) {
        return flinkTableSource.getScanRuntimeProvider(new ScanTableSource.ScanContext() { // from class: org.apache.paimon.flink.source.DataTableSourceTest.1
            public <T> TypeInformation<T> createTypeInformation(DataType dataType) {
                throw new UnsupportedOperationException();
            }

            public <T> TypeInformation<T> createTypeInformation(LogicalType logicalType) {
                throw new UnsupportedOperationException();
            }

            public DynamicTableSource.DataStructureConverter createDataStructureConverter(DataType dataType) {
                throw new UnsupportedOperationException();
            }
        });
    }

    private FileStoreTable createTable(Map<String, String> map) throws Exception {
        LocalFileIO create = LocalFileIO.create();
        org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path(this.path.toString());
        return FileStoreTableFactory.create(create, path, new SchemaManager(create, path).createTable(Schema.newBuilder().column("a", DataTypes.INT()).column("b", DataTypes.BIGINT()).options(map).build()));
    }

    private void writeData(FileStoreTable fileStoreTable) throws Exception {
        TableWriteImpl newWrite = fileStoreTable.newWrite("test");
        TableCommitImpl newCommit = fileStoreTable.newCommit("test");
        newWrite.write(GenericRow.of(new Object[]{1, 2L}));
        newCommit.commit(newWrite.prepareCommit());
        newCommit.close();
        newWrite.close();
    }
}
