package org.apache.paimon.flink.sink;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.operators.collect.utils.MockOperatorStateStore;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.fs.FileIOFinder;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.lineage.LineageMeta;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/FlinkSinkTest.class */
public class FlinkSinkTest {

    @TempDir
    Path tempPath;
    protected static final RowType ROW_TYPE = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT()}, new String[]{"pk", "pt0"});

    @Test
    public void testOptimizeKeyValueWriterForBatch() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        Assertions.assertThat(testSpillable(executionEnvironment, createFileStoreTable)).isTrue();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        Assertions.assertThat(testSpillable(executionEnvironment, createFileStoreTable)).isFalse();
    }

    private boolean testSpillable(StreamExecutionEnvironment streamExecutionEnvironment, FileStoreTable fileStoreTable) throws Exception {
        RowDataStoreWriteOperator operator = new FileStoreSink(fileStoreTable, (Map) null, (LogSinkFunction) null).doWrite(streamExecutionEnvironment.fromCollection(Collections.singletonList(new FlinkRowData(GenericRow.of(new Object[]{1, 1})))), "123", 1).getTransformation().getOperatorFactory().getOperator();
        operator.initStateAndWriter(new StateInitializationContextImpl((Long) null, new MockOperatorStateStore() { // from class: org.apache.paimon.flink.sink.FlinkSinkTest.1
            public <S> ListState<S> getUnionListState(ListStateDescriptor<S> listStateDescriptor) throws Exception {
                return getListState(listStateDescriptor);
            }
        }, (KeyedStateStore) null, (Iterable) null, (Iterable) null), (str, binaryRow, i) -> {
            return true;
        }, new IOManagerAsync(), "123");
        return operator.write.write.getWrite().bufferSpillable();
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        org.apache.paimon.fs.Path path = new org.apache.paimon.fs.Path(this.tempPath.toString());
        Options options = new Options();
        options.set(CoreOptions.PATH, path.toString());
        return FileStoreTableFactory.create(FileIOFinder.find(path), path, SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), path), new Schema(ROW_TYPE.getFields(), Collections.emptyList(), Arrays.asList("pk"), options.toMap(), "")), options, new CatalogEnvironment(Lock.emptyFactory(), (MetastoreClient.Factory) null, (LineageMeta) null));
    }
}
