package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.SourceTransformation;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.Projection;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.conversion.DataStructureConverter;
import org.apache.flink.table.data.conversion.DataStructureConverters;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.types.logical.VarCharType;
import org.apache.flink.table.types.utils.TypeConversions;
import org.apache.flink.testutils.junit.extensions.parameterized.ParameterizedTestExtension;
import org.apache.flink.testutils.junit.extensions.parameterized.Parameters;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.CloseableIterator;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.WriteMode;
import org.apache.paimon.flink.sink.FlinkSinkBuilder;
import org.apache.paimon.flink.source.FlinkSourceBuilder;
import org.apache.paimon.flink.util.AbstractTestBase;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.paimon.utils.FailingFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Assumptions;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;

@ExtendWith({ParameterizedTestExtension.class})
/* loaded from: input_file:org/apache/paimon/flink/FileStoreITCase.class */
public class FileStoreITCase extends AbstractTestBase {
    public static final RowType TABLE_TYPE = new RowType(Arrays.asList(new RowType.RowField("v", new IntType()), new RowType.RowField("p", new VarCharType(10)), new RowType.RowField("_k", new IntType())));
    public static final ObjectIdentifier IDENTIFIER = ObjectIdentifier.of(FlinkTestBase.CURRENT_CATALOG, "db", "t");
    public static final DataStructureConverter<RowData, Row> CONVERTER = DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(TABLE_TYPE));
    private static final int NUM_BUCKET = 3;
    public static final List<RowData> SOURCE_DATA = Arrays.asList(wrap(GenericRowData.of(new Object[]{0, StringData.fromString("p1"), 1})), wrap(GenericRowData.of(new Object[]{0, StringData.fromString("p1"), 2})), wrap(GenericRowData.of(new Object[]{0, StringData.fromString("p1"), 1})), wrap(GenericRowData.of(new Object[]{5, StringData.fromString("p1"), 1})), wrap(GenericRowData.of(new Object[]{6, StringData.fromString("p2"), 1})), wrap(GenericRowData.of(new Object[]{Integer.valueOf(NUM_BUCKET), StringData.fromString("p2"), 5})), wrap(GenericRowData.of(new Object[]{5, StringData.fromString("p2"), 1})));
    private final boolean isBatch;
    private final StreamExecutionEnvironment env;

    public FileStoreITCase(boolean z) {
        this.isBatch = z;
        this.env = z ? buildBatchEnv() : buildStreamEnv();
    }

    @Parameters(name = "isBatch-{0}")
    public static List<Boolean> getVarSeg() {
        return Arrays.asList(true, false);
    }

    private static SerializableRowData wrap(RowData rowData) {
        return new SerializableRowData(rowData, InternalSerializers.create(TABLE_TYPE));
    }

    @TestTemplate
    public void testPartitioned() throws Exception {
        FileStoreTable buildFileStoreTable = buildFileStoreTable(new int[]{1}, new int[]{1, 2});
        new FlinkSinkBuilder(buildFileStoreTable).withInput(buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        Assertions.assertThat(executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable).withEnv(this.env).build())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{5, "p2", 1}), Row.of(new Object[]{Integer.valueOf(NUM_BUCKET), "p2", 5}), Row.of(new Object[]{5, "p1", 1}), Row.of(new Object[]{0, "p1", 2})});
    }

    @TestTemplate
    public void testNonPartitioned() throws Exception {
        FileStoreTable buildFileStoreTable = buildFileStoreTable(new int[0], new int[]{2});
        new FlinkSinkBuilder(buildFileStoreTable).withInput(buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        Assertions.assertThat(executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable).withEnv(this.env).build())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{5, "p2", 1}), Row.of(new Object[]{0, "p1", 2}), Row.of(new Object[]{Integer.valueOf(NUM_BUCKET), "p2", 5})});
    }

    @TestTemplate
    public void testOverwrite() throws Exception {
        Assumptions.assumeTrue(this.isBatch);
        FileStoreTable buildFileStoreTable = buildFileStoreTable(new int[]{1}, new int[]{1, 2});
        new FlinkSinkBuilder(buildFileStoreTable).withInput(buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        DataStreamSource fromCollection = this.env.fromCollection(Collections.singletonList(wrap(GenericRowData.of(new Object[]{9, StringData.fromString("p2"), 5}))), InternalTypeInfo.of(TABLE_TYPE));
        HashMap hashMap = new HashMap();
        hashMap.put("p", "p2");
        new FlinkSinkBuilder(buildFileStoreTable).withInput(fromCollection).withOverwritePartition(hashMap).build();
        this.env.execute();
        Assertions.assertThat(executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable).withEnv(this.env).build())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{9, "p2", 5}), Row.of(new Object[]{5, "p1", 1}), Row.of(new Object[]{0, "p1", 2})});
        new FlinkSinkBuilder(buildFileStoreTable).withInput(this.env.fromCollection(Collections.singletonList(wrap(GenericRowData.of(new Object[]{19, StringData.fromString("p2"), 6}))), InternalTypeInfo.of(TABLE_TYPE))).withOverwritePartition(new HashMap()).build();
        this.env.execute();
        Assertions.assertThat(executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable).withEnv(this.env).build())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{19, "p2", 6}), Row.of(new Object[]{5, "p1", 1}), Row.of(new Object[]{0, "p1", 2})});
        new FlinkSinkBuilder(buildFileStoreTable.copy(Collections.singletonMap(CoreOptions.DYNAMIC_PARTITION_OVERWRITE.key(), "false"))).withInput(this.env.fromCollection(Collections.singletonList(wrap(GenericRowData.of(new Object[]{20, StringData.fromString("p2"), Integer.valueOf(NUM_BUCKET)}))), InternalTypeInfo.of(TABLE_TYPE))).withOverwritePartition(new HashMap()).build();
        this.env.execute();
        Assertions.assertThat(executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable).withEnv(this.env).build())).containsExactlyInAnyOrder(new Row[]{Row.of(new Object[]{20, "p2", Integer.valueOf(NUM_BUCKET)})});
    }

    @TestTemplate
    public void testPartitionedNonKey() throws Exception {
        FileStoreTable buildFileStoreTable = buildFileStoreTable(new int[]{1}, new int[0]);
        new FlinkSinkBuilder(buildFileStoreTable).withInput(buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        List<Row> executeAndCollect = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable).withEnv(this.env).build());
        Stream<RowData> stream = this.isBatch ? SOURCE_DATA.stream() : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
        DataStructureConverter<RowData, Row> dataStructureConverter = CONVERTER;
        dataStructureConverter.getClass();
        Assertions.assertThat(executeAndCollect).containsExactlyInAnyOrder((Row[]) stream.map((v1) -> {
            return r1.toExternal(v1);
        }).toArray(i -> {
            return new Row[i];
        }));
    }

    @TestTemplate
    public void testKeyedProjection() throws Exception {
        testProjection(buildFileStoreTable(new int[0], new int[]{2}));
    }

    @TestTemplate
    public void testNonKeyedProjection() throws Exception {
        testProjection(buildFileStoreTable(new int[0], new int[0]));
    }

    private void testProjection(FileStoreTable fileStoreTable) throws Exception {
        new FlinkSinkBuilder(fileStoreTable).withInput(buildTestSource(this.env, this.isBatch)).build();
        this.env.execute();
        Projection of = Projection.of(new int[]{1, 2});
        List<Row> executeAndCollect = executeAndCollect(new FlinkSourceBuilder(IDENTIFIER, fileStoreTable).withProjection(of.toNestedIndexes()).withEnv(this.env).build(), DataStructureConverters.getConverter(TypeConversions.fromLogicalToDataType(of.project(TABLE_TYPE))));
        Row[] rowArr = {Row.of(new Object[]{"p2", 1}), Row.of(new Object[]{"p1", 2}), Row.of(new Object[]{"p2", 5})};
        if (fileStoreTable.schema().trimmedPrimaryKeys().isEmpty()) {
            Stream<RowData> stream = this.isBatch ? SOURCE_DATA.stream() : Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream());
            DataStructureConverter<RowData, Row> dataStructureConverter = CONVERTER;
            dataStructureConverter.getClass();
            rowArr = (Row[]) stream.map((v1) -> {
                return r1.toExternal(v1);
            }).map(row -> {
                return Row.of(new Object[]{row.getField(1), row.getField(2)});
            }).toArray(i -> {
                return new Row[i];
            });
        }
        Assertions.assertThat(executeAndCollect).containsExactlyInAnyOrder(rowArr);
    }

    @TestTemplate
    public void testContinuous() throws Exception {
        innerTestContinuous(buildFileStoreTable(new int[0], new int[]{2}));
    }

    @TestTemplate
    public void testContinuousWithoutPK() throws Exception {
        innerTestContinuous(buildFileStoreTable(new int[0], new int[0]));
    }

    @TestTemplate
    public void testContinuousBounded() throws Exception {
        SourceTransformation transformation = new FlinkSourceBuilder(IDENTIFIER, buildFileStoreTable(new int[0], new int[]{2}).copy(Collections.singletonMap(CoreOptions.SCAN_BOUNDED_WATERMARK.key(), "1024"))).withContinuousMode(true).withEnv(this.env).build().getTransformation();
        Assertions.assertThat(transformation).isInstanceOf(SourceTransformation.class);
        Assertions.assertThat(transformation.getSource().getBoundedness()).isEqualTo(Boundedness.BOUNDED);
    }

    private void innerTestContinuous(FileStoreTable fileStoreTable) throws Exception {
        Assumptions.assumeFalse(this.isBatch);
        CloseableIterator executeAndCollect = new FlinkSourceBuilder(IDENTIFIER, fileStoreTable).withContinuousMode(true).withEnv(this.env).build().executeAndCollect();
        DataStructureConverter<RowData, Row> dataStructureConverter = CONVERTER;
        dataStructureConverter.getClass();
        BlockingIterator<RowData, Row> of = BlockingIterator.of(executeAndCollect, (v1) -> {
            return r1.toExternal(v1);
        });
        Thread.sleep(ThreadLocalRandom.current().nextInt(1000));
        sinkAndValidate(fileStoreTable, Arrays.asList(srcRow(RowKind.INSERT, 1, "p1", 1), srcRow(RowKind.INSERT, 2, "p2", 2)), of, Row.ofKind(RowKind.INSERT, new Object[]{1, "p1", 1}), Row.ofKind(RowKind.INSERT, new Object[]{2, "p2", 2}));
        sinkAndValidate(fileStoreTable, Arrays.asList(srcRow(RowKind.DELETE, 1, "p1", 1), srcRow(RowKind.INSERT, NUM_BUCKET, "p3", NUM_BUCKET)), of, Row.ofKind(RowKind.DELETE, new Object[]{1, "p1", 1}), Row.ofKind(RowKind.INSERT, new Object[]{Integer.valueOf(NUM_BUCKET), "p3", Integer.valueOf(NUM_BUCKET)}));
    }

    private void sinkAndValidate(FileStoreTable fileStoreTable, List<RowData> list, BlockingIterator<RowData, Row> blockingIterator, Row... rowArr) throws Exception {
        if (this.isBatch) {
            throw new UnsupportedOperationException();
        }
        new FlinkSinkBuilder(fileStoreTable).withInput(this.env.addSource(new FiniteTestSource(list, true), InternalTypeInfo.of(TABLE_TYPE))).build();
        this.env.execute();
        Assertions.assertThat(blockingIterator.collect(rowArr.length)).containsExactlyInAnyOrder(rowArr);
    }

    public FileStoreTable buildFileStoreTable(int[] iArr, int[] iArr2) throws Exception {
        return buildFileStoreTable(this.isBatch, getTempDirPath(), iArr, iArr2);
    }

    private static RowData srcRow(RowKind rowKind, int i, String str, int i2) {
        return wrap(GenericRowData.ofKind(rowKind, new Object[]{Integer.valueOf(i), StringData.fromString(str), Integer.valueOf(i2)}));
    }

    public static StreamExecutionEnvironment buildStreamEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setParallelism(2);
        return executionEnvironment;
    }

    public static StreamExecutionEnvironment buildBatchEnv() {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        executionEnvironment.setParallelism(2);
        return executionEnvironment;
    }

    public static FileStoreTable buildFileStoreTable(boolean z, String str, int[] iArr, int[] iArr2) throws Exception {
        Options buildConfiguration = buildConfiguration(z, str);
        Path path = new CoreOptions(buildConfiguration.toMap()).path();
        Schema schema = new Schema(LogicalTypeConversion.toDataType(TABLE_TYPE).getFields(), (List) Arrays.stream(iArr).mapToObj(i -> {
            return (String) TABLE_TYPE.getFieldNames().get(i);
        }).collect(Collectors.toList()), (List) Arrays.stream(iArr2).mapToObj(i2 -> {
            return (String) TABLE_TYPE.getFieldNames().get(i2);
        }).collect(Collectors.toList()), buildConfiguration.toMap(), "");
        return (FileStoreTable) FailingFileIO.retryArtificialException(() -> {
            new SchemaManager(LocalFileIO.create(), path).createTable(schema);
            return FileStoreTableFactory.create(LocalFileIO.create(), buildConfiguration);
        });
    }

    public static Options buildConfiguration(boolean z, String str) {
        Options options = new Options();
        options.set(CoreOptions.BUCKET, Integer.valueOf(NUM_BUCKET));
        if (z) {
            options.set(CoreOptions.PATH, str);
        } else {
            String uuid = UUID.randomUUID().toString();
            FailingFileIO.reset(uuid, NUM_BUCKET, 100);
            options.set(CoreOptions.PATH, FailingFileIO.getFailingPath(uuid, str));
        }
        options.set(CoreOptions.FILE_FORMAT, CoreOptions.FileFormatType.AVRO);
        options.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
        return options;
    }

    public static DataStreamSource<RowData> buildTestSource(StreamExecutionEnvironment streamExecutionEnvironment, boolean z) {
        return z ? streamExecutionEnvironment.fromCollection(SOURCE_DATA, InternalTypeInfo.of(TABLE_TYPE)) : streamExecutionEnvironment.addSource(new FiniteTestSource(SOURCE_DATA, false), InternalTypeInfo.of(TABLE_TYPE));
    }

    public static List<Row> executeAndCollect(DataStream<RowData> dataStream) throws Exception {
        return executeAndCollect(dataStream, CONVERTER);
    }

    public static List<Row> executeAndCollect(DataStream<RowData> dataStream, DataStructureConverter<RowData, Row> dataStructureConverter) throws Exception {
        CloseableIterator executeAndCollect = dataStream.executeAndCollect();
        ArrayList arrayList = new ArrayList();
        while (executeAndCollect.hasNext()) {
            arrayList.add(dataStructureConverter.toExternal(executeAndCollect.next()));
        }
        executeAndCollect.close();
        return arrayList;
    }
}
