package org.apache.paimon.flink.sink;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.utils.InternalRowTypeSerializer;
import org.apache.paimon.flink.utils.InternalTypeInfo;
import org.apache.paimon.flink.utils.TestingMetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.reader.RecordReader;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.TableCommitImpl;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/WriterOperatorTest.class */
public class WriterOperatorTest {

    @TempDir
    public Path tempDir;
    private org.apache.paimon.fs.Path tablePath;

    @BeforeEach
    public void before() {
        this.tablePath = new org.apache.paimon.fs.Path(this.tempDir.toString());
    }

    @Test
    public void testPrimaryKeyTableMetrics() throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT()}, new String[]{"a", "b"});
        Options options = new Options();
        options.set("bucket", "1");
        options.set("write-buffer-size", "256 b");
        options.set("page-size", "32 b");
        testMetricsImpl(createFileStoreTable(of, Collections.singletonList("a"), Collections.emptyList(), options));
    }

    @Test
    public void testAppendOnlyTableMetrics() throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT()}, new String[]{"a", "b"});
        Options options = new Options();
        options.set("write-buffer-for-append", "true");
        options.set("write-buffer-size", "256 b");
        options.set("page-size", "32 b");
        options.set("write-buffer-spillable", "false");
        testMetricsImpl(createFileStoreTable(of, Collections.emptyList(), Collections.emptyList(), options));
    }

    private void testMetricsImpl(FileStoreTable fileStoreTable) throws Exception {
        String name = this.tablePath.getName();
        RowDataStoreWriteOperator rowDataStoreWriteOperator = new RowDataStoreWriteOperator(fileStoreTable, (LogSinkFunction) null, (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
            return new StoreSinkWriteImpl(fileStoreTable2, str, storeSinkWriteState, iOManager, false, false, true, memorySegmentPool, metricGroup);
        }, "test");
        OneInputStreamOperatorTestHarness<InternalRow, Committable> createHarness = createHarness(rowDataStoreWriteOperator);
        createHarness.setup(new CommittableTypeInfo().createSerializer(new ExecutionConfig()));
        createHarness.open();
        for (int i = 0; i < 10; i++) {
            createHarness.processElement(GenericRow.of(new Object[]{1, 1}), 1L);
        }
        createHarness.prepareSnapshotPreBarrier(1L);
        createHarness.snapshot(1L, 2L);
        createHarness.notifyOfCompletedCheckpoint(1L);
        MetricGroup addGroup = rowDataStoreWriteOperator.getMetricGroup().addGroup("paimon").addGroup("table", name).addGroup("writerBuffer");
        Assertions.assertThat((Long) TestingMetricUtils.getGauge(addGroup, "bufferPreemptCount").getValue()).isEqualTo(0L);
        Assertions.assertThat((Long) TestingMetricUtils.getGauge(addGroup, "totalWriteBufferSizeByte").getValue()).isEqualTo(256L);
        createHarness.processElement(GenericRow.of(new Object[]{1, 1}), 1L);
        Assertions.assertThat((Long) TestingMetricUtils.getGauge(addGroup, "usedWriteBufferSizeByte").getValue()).isGreaterThan(0L);
        createHarness.close();
    }

    @Test
    public void testAsyncLookupWithFailure() throws Exception {
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.INT()}, new String[]{"pt", "k", "v"});
        Options options = new Options();
        options.set("bucket", "1");
        options.set("changelog-producer", "lookup");
        FileStoreTable createFileStoreTable = createFileStoreTable(of, Arrays.asList("pt", "k"), Collections.singletonList("k"), options);
        OneInputStreamOperatorTestHarness<InternalRow, Committable> createHarness = createHarness(getAsyncLookupWriteOperator(createFileStoreTable, false));
        TableCommitImpl newCommit = createFileStoreTable.newCommit("test");
        TypeSerializer createSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        createHarness.setup(createSerializer);
        createHarness.open();
        createHarness.processElement(GenericRow.of(new Object[]{1, 10, 100}), 1L);
        createHarness.processElement(GenericRow.of(new Object[]{2, 20, 200}), 2L);
        createHarness.processElement(GenericRow.of(new Object[]{3, 30, 300}), 3L);
        createHarness.prepareSnapshotPreBarrier(1L);
        createHarness.snapshot(1L, 10L);
        createHarness.notifyOfCompletedCheckpoint(1L);
        commitAll(createHarness, newCommit, 1L);
        createHarness.processElement(GenericRow.of(new Object[]{1, 10, 101}), 11L);
        createHarness.processElement(GenericRow.of(new Object[]{3, 30, 301}), 13L);
        createHarness.prepareSnapshotPreBarrier(2L);
        OperatorSubtaskState snapshot = createHarness.snapshot(2L, 20L);
        createHarness.notifyOfCompletedCheckpoint(2L);
        commitAll(createHarness, newCommit, 2L);
        createHarness.close();
        OneInputStreamOperatorTestHarness<InternalRow, Committable> createHarness2 = createHarness(getAsyncLookupWriteOperator(createFileStoreTable, true));
        createHarness2.setup(createSerializer);
        createHarness2.initializeState(snapshot);
        createHarness2.open();
        createHarness2.prepareSnapshotPreBarrier(3L);
        createHarness2.snapshot(3L, 30L);
        createHarness2.notifyOfCompletedCheckpoint(3L);
        commitAll(createHarness2, newCommit, 3L);
        createHarness2.close();
        newCommit.close();
        ReadBuilder newReadBuilder = createFileStoreTable.newReadBuilder();
        RecordReader createReader = newReadBuilder.newRead().createReader(newReadBuilder.newStreamScan().plan().splits());
        ArrayList arrayList = new ArrayList();
        createReader.forEachRemaining(internalRow -> {
            arrayList.add(String.format("%s[%d, %d, %d]", internalRow.getRowKind().shortString(), Integer.valueOf(internalRow.getInt(0)), Integer.valueOf(internalRow.getInt(1)), Integer.valueOf(internalRow.getInt(2))));
        });
        Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new String[]{"+I[1, 10, 101]", "+I[2, 20, 200]", "+I[3, 30, 301]"});
    }

    private RowDataStoreWriteOperator getAsyncLookupWriteOperator(FileStoreTable fileStoreTable, boolean z) {
        return new RowDataStoreWriteOperator(fileStoreTable, (LogSinkFunction) null, (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
            return new AsyncLookupSinkWrite(fileStoreTable2, str, storeSinkWriteState, iOManager, false, z, true, memorySegmentPool, metricGroup);
        }, "test");
    }

    private void commitAll(OneInputStreamOperatorTestHarness<InternalRow, Committable> oneInputStreamOperatorTestHarness, TableCommitImpl tableCommitImpl, long j) {
        ArrayList arrayList = new ArrayList();
        while (!oneInputStreamOperatorTestHarness.getOutput().isEmpty()) {
            Committable committable = (Committable) ((StreamRecord) oneInputStreamOperatorTestHarness.getOutput().poll()).getValue();
            Assertions.assertThat(committable.kind()).isEqualTo(Committable.Kind.FILE);
            arrayList.add((CommitMessage) committable.wrappedCommittable());
        }
        tableCommitImpl.commit(j, arrayList);
    }

    private FileStoreTable createFileStoreTable(RowType rowType, List<String> list, List<String> list2, Options options) throws Exception {
        options.set(CoreOptions.PATH, this.tablePath.toString());
        new SchemaManager(LocalFileIO.create(), this.tablePath).createTable(new Schema(rowType.getFields(), list2, list, options.toMap(), ""));
        return FileStoreTableFactory.create(LocalFileIO.create(), options);
    }

    private OneInputStreamOperatorTestHarness<InternalRow, Committable> createHarness(RowDataStoreWriteOperator rowDataStoreWriteOperator) throws Exception {
        return new OneInputStreamOperatorTestHarness<>(rowDataStoreWriteOperator, new InternalTypeInfo(new InternalRowTypeSerializer(RowType.builder().build())).createSerializer(new ExecutionConfig()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -2019166761:
                if (implMethodName.equals("lambda$testMetricsImpl$cb3bdbf$1")) {
                    z = false;
                    break;
                }
                break;
            case 122306912:
                if (implMethodName.equals("lambda$getAsyncLookupWriteOperator$75e6f71f$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/WriterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    return (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
                        return new StoreSinkWriteImpl(fileStoreTable2, str, storeSinkWriteState, iOManager, false, false, true, memorySegmentPool, metricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/WriterOperatorTest") && serializedLambda.getImplMethodSignature().equals("(ZLorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    boolean booleanValue = ((Boolean) serializedLambda.getCapturedArg(0)).booleanValue();
                    return (fileStoreTable22, str2, storeSinkWriteState2, iOManager2, memorySegmentPool2, metricGroup2) -> {
                        return new AsyncLookupSinkWrite(fileStoreTable22, str2, storeSinkWriteState2, iOManager2, false, booleanValue, true, memorySegmentPool2, metricGroup2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
