package org.apache.paimon.flink.sink.cdc;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.UUID;
import java.util.function.Predicate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.flink.sink.Committable;
import org.apache.paimon.flink.sink.CommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest.class */
public class CdcDynamicBucketWriteOperatorTest {

    @TempDir
    Path tempDir;
    private org.apache.paimon.fs.Path tablePath;
    private String commitUser;

    @BeforeEach
    public void before() {
        this.tablePath = new org.apache.paimon.fs.Path("traceable://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @AfterEach
    public void after() {
        Predicate predicate = path -> {
            return path.toString().contains(this.tempDir.toString());
        };
        Assertions.assertThat(TraceableFileIO.openInputStreams(predicate)).isEmpty();
        Assertions.assertThat(TraceableFileIO.openOutputStreams(predicate)).isEmpty();
    }

    @Test
    public void testCompactionMetrics() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable(RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT()}, new String[]{"pk", "col1"}), Collections.emptyList(), Collections.singletonList("pk"));
        OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, Committable> createTestHarness = createTestHarness(createFileStoreTable);
        CdcDynamicBucketWriteOperator oneInputOperator = createTestHarness.getOneInputOperator();
        createTestHarness.open();
        MetricGroup addGroup = oneInputOperator.getMetricGroup().addGroup("paimon").addGroup("table", createFileStoreTable.name()).addGroup("partition", "_").addGroup("bucket", "0").addGroup("compaction");
        HashMap hashMap = new HashMap();
        hashMap.put("pk", "1");
        hashMap.put("col1", "2");
        createTestHarness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT, hashMap), 0), 0L);
        oneInputOperator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
        oneInputOperator.getWrite().prepareCommit(true, 1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo(1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo(1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo(0L);
        hashMap.put("pk", "1");
        hashMap.put("col1", "3");
        createTestHarness.processElement(Tuple2.of(new CdcRecord(RowKind.INSERT, hashMap), 0), 0 + 1);
        oneInputOperator.getWrite().compact(BinaryRow.EMPTY_ROW, 0, true);
        oneInputOperator.getWrite().prepareCommit(true, 1 + 1);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo(2L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo(1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo(0L);
        createTestHarness.close();
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedBefore")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedAfter")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastChangelogFilesCompacted")).isNull();
    }

    private OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, Committable> createTestHarness(FileStoreTable fileStoreTable) throws Exception {
        CdcDynamicBucketWriteOperator cdcDynamicBucketWriteOperator = new CdcDynamicBucketWriteOperator(fileStoreTable, (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
            return new StoreSinkWriteImpl(fileStoreTable2, str, storeSinkWriteState, iOManager, false, false, true, memorySegmentPool, metricGroup);
        }, this.commitUser);
        JavaSerializer javaSerializer = new JavaSerializer();
        TypeSerializer createSerializer = new CommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<Tuple2<CdcRecord, Integer>, Committable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(cdcDynamicBucketWriteOperator, javaSerializer);
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    private FileStoreTable createFileStoreTable(RowType rowType, List<String> list, List<String> list2) throws Exception {
        Options options = new Options();
        options.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, Duration.ofMillis(10L));
        return FileStoreTableFactory.create(LocalFileIO.create(), this.tablePath, SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), this.tablePath), new Schema(rowType.getFields(), list, list2, options.toMap(), "")));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 732603299:
                if (implMethodName.equals("lambda$createTestHarness$fffd59a6$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$Provider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/CdcDynamicBucketWriteOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemorySegmentPool;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    return (fileStoreTable2, str, storeSinkWriteState, iOManager, memorySegmentPool, metricGroup) -> {
                        return new StoreSinkWriteImpl(fileStoreTable2, str, storeSinkWriteState, iOManager, false, false, true, memorySegmentPool, metricGroup);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
