package org.apache.paimon.flink.sink.cdc;

import java.lang.invoke.SerializedLambda;
import java.nio.file.Path;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.JavaSerializer;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.CatalogFactory;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.MultiTableCommittable;
import org.apache.paimon.flink.sink.MultiTableCommittableTypeInfo;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.StoreSinkWriteImpl;
import org.apache.paimon.flink.utils.MetricUtils;
import org.apache.paimon.io.DataFileTestUtils;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.CommonTestUtils;
import org.apache.paimon.utils.TraceableFileIO;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest.class */
public class CdcRecordStoreMultiWriteOperatorTest {

    @TempDir
    Path tempDir;
    private String commitUser;
    private org.apache.paimon.fs.Path warehouse;
    private String databaseName;
    private Identifier firstTable;
    private Catalog catalog;
    private Identifier secondTable;
    private Catalog.Loader catalogLoader;
    private Schema firstTableSchema;

    /* loaded from: input_file:org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest$Runner.class */
    private static class Runner implements Runnable {
        private final OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> harness;
        private final BlockingQueue<CdcMultiplexRecord> toProcess;
        private final BlockingQueue<CdcMultiplexRecord> processed;
        private final AtomicBoolean running;

        private Runner(OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> oneInputStreamOperatorTestHarness) {
            this.toProcess = new LinkedBlockingQueue();
            this.processed = new LinkedBlockingQueue();
            this.running = new AtomicBoolean(true);
            this.harness = oneInputStreamOperatorTestHarness;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void offer(CdcMultiplexRecord cdcMultiplexRecord) {
            this.toProcess.offer(cdcMultiplexRecord);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CdcMultiplexRecord take() throws Exception {
            return this.processed.take();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public CdcMultiplexRecord poll(long j) throws Exception {
            return this.processed.poll(j, TimeUnit.SECONDS);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void stop() {
            this.running.set(false);
        }

        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness<org.apache.paimon.flink.sink.cdc.CdcMultiplexRecord, org.apache.paimon.flink.sink.MultiTableCommittable>, org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness] */
        @Override // java.lang.Runnable
        public void run() {
            long j = 0;
            while (this.running.get()) {
                try {
                    if (this.toProcess.isEmpty()) {
                        Thread.sleep(10L);
                    } else {
                        CdcMultiplexRecord poll = this.toProcess.poll();
                        ?? r0 = this.harness;
                        long j2 = j + 1;
                        j = r0;
                        r0.processElement(poll, j2);
                        this.processed.offer(poll);
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    @BeforeEach
    public void before() throws Exception {
        this.warehouse = new org.apache.paimon.fs.Path("traceable://" + this.tempDir.toString());
        this.commitUser = UUID.randomUUID().toString();
        this.databaseName = "test_db";
        this.firstTable = Identifier.create(this.databaseName, "test_table1");
        this.secondTable = Identifier.create(this.databaseName, "test_table2");
        this.catalogLoader = createCatalogLoader();
        this.catalog = this.catalogLoader.load();
        this.catalog.createDatabase(this.databaseName, true);
        Options options = new Options();
        options.set(CdcRecordStoreWriteOperator.RETRY_SLEEP_TIME, Duration.ofMillis(10L));
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.STRING()}, new String[]{"pt", "k", "v"});
        RowType of2 = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.INT(), DataTypes.FLOAT(), DataTypes.VARCHAR(5), DataTypes.VARBINARY(5)}, new String[]{"k", "v1", "v2", "v3", "v4"});
        this.firstTableSchema = new Schema(of.getFields(), Collections.singletonList("pt"), Arrays.asList("pt", "k"), options.toMap(), "");
        createTestTables(this.catalog, Tuple2.of(this.firstTable, this.firstTableSchema), Tuple2.of(this.secondTable, new Schema(of2.getFields(), Collections.emptyList(), Collections.singletonList("k"), options.toMap(), "")));
    }

    private void createTestTables(Catalog catalog, Tuple2<Identifier, Schema>... tuple2Arr) throws Exception {
        for (Tuple2<Identifier, Schema> tuple2 : tuple2Arr) {
            catalog.createTable((Identifier) tuple2.f0, (Schema) tuple2.f1, false);
        }
    }

    @AfterEach
    public void after() {
        Predicate predicate = path -> {
            return path.toString().contains(this.tempDir.toString());
        };
        Assertions.assertThat(TraceableFileIO.openInputStreams(predicate)).isEmpty();
        Assertions.assertThat(TraceableFileIO.openOutputStreams(predicate)).isEmpty();
    }

    @Timeout(30)
    @Test
    public void testAsyncTableCreate() throws Exception {
        Identifier create = Identifier.create(this.databaseName, "async_new_table");
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        CdcMultiplexRecord fromCdcRecord = CdcMultiplexRecord.fromCdcRecord(this.databaseName, create.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap));
        runner.offer(fromCdcRecord);
        Assertions.assertThat(runner.poll(1L)).isNull();
        this.catalog.createTable(create, this.firstTableSchema, true);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("pt", "0");
        hashMap2.put("k", "3");
        hashMap2.put("v", "30");
        CdcMultiplexRecord fromCdcRecord2 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, create.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap2));
        runner.offer(fromCdcRecord2);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord2);
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    @Timeout(30)
    @Test
    public void testInitializeState() throws Exception {
        Identifier create = Identifier.create(this.databaseName, "async_new_table");
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        CdcMultiplexRecord fromCdcRecord = CdcMultiplexRecord.fromCdcRecord(this.databaseName, create.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap));
        runner.offer(fromCdcRecord);
        Assertions.assertThat(runner.poll(1L)).isNull();
        CdcRecordStoreMultiWriteOperator operator = createTestHarness.getOperator();
        Assertions.assertThat(operator.tables().size()).isEqualTo(0);
        Assertions.assertThat(operator.writes().size()).isEqualTo(0);
        this.catalog.createTable(create, this.firstTableSchema, true);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord);
        Assertions.assertThat(operator.tables().size()).isEqualTo(1);
        Assertions.assertThat(operator.writes().size()).isEqualTo(1);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("pt", "0");
        hashMap2.put("k", "3");
        hashMap2.put("v", "30");
        CdcMultiplexRecord fromCdcRecord2 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, create.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap2));
        runner.offer(fromCdcRecord2);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord2);
        long j = 1 + 1;
        OperatorSubtaskState snapshot = createTestHarness.snapshot(0L, 1L);
        String str = this.commitUser;
        this.commitUser = UUID.randomUUID().toString();
        createTestHarness.close();
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness2 = createTestHarness(this.catalogLoader);
        createTestHarness2.initializeState(snapshot);
        Assertions.assertThat(createTestHarness2.getOperator().commitUser()).isEqualTo(str);
        runner.stop();
        thread.join();
        createTestHarness2.close();
    }

    @Timeout(30)
    @Test
    public void testSingleTableAddColumn() throws Exception {
        Identifier identifier = this.firstTable;
        FileStoreTable table = this.catalog.getTable(identifier);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        CdcMultiplexRecord fromCdcRecord = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap));
        runner.offer(fromCdcRecord);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("pt", "0");
        hashMap2.put("k", "2");
        CdcMultiplexRecord fromCdcRecord2 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap2));
        runner.offer(fromCdcRecord2);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("pt", "0");
        hashMap3.put("k", "3");
        hashMap3.put("v", "30");
        hashMap3.put("v2", "300");
        CdcMultiplexRecord fromCdcRecord3 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap3));
        runner.offer(fromCdcRecord3);
        Assertions.assertThat(runner.poll(1L)).isNull();
        new SchemaManager(table.fileIO(), table.location()).commitChanges(new SchemaChange[]{SchemaChange.addColumn("v2", DataTypes.INT())});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord3);
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    private Catalog.Loader createCatalogLoader() {
        Options createCatalogOptions = createCatalogOptions(this.warehouse);
        return () -> {
            return CatalogFactory.createCatalog(CatalogContext.create(createCatalogOptions));
        };
    }

    private Options createCatalogOptions(org.apache.paimon.fs.Path path) {
        Options options = new Options();
        options.set(CatalogOptions.WAREHOUSE, path.toString());
        options.set(CatalogOptions.URI, "");
        return options;
    }

    @Timeout(30)
    @Test
    public void testSingleTableUpdateColumnType() throws Exception {
        Identifier identifier = this.secondTable;
        FileStoreTable table = this.catalog.getTable(identifier);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("k", "1");
        hashMap.put("v1", "10");
        hashMap.put("v2", "0.625");
        hashMap.put("v3", "one");
        hashMap.put("v4", "b_one");
        CdcMultiplexRecord fromCdcRecord = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap));
        runner.offer(fromCdcRecord);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("k", "2");
        hashMap2.put("v1", "12345678987654321");
        hashMap2.put("v2", "0.25");
        CdcMultiplexRecord fromCdcRecord2 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap2));
        runner.offer(fromCdcRecord2);
        Assertions.assertThat(runner.poll(1L)).isNull();
        SchemaManager schemaManager = new SchemaManager(table.fileIO(), table.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v1", DataTypes.BIGINT())});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("k", "3");
        hashMap3.put("v1", "100");
        hashMap3.put("v2", "1.0000000000009095");
        CdcMultiplexRecord fromCdcRecord3 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap3));
        runner.offer(fromCdcRecord3);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v2", DataTypes.DOUBLE())});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord3);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("k", "4");
        hashMap4.put("v1", "40");
        hashMap4.put("v3", "long four");
        CdcMultiplexRecord fromCdcRecord4 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap4));
        runner.offer(fromCdcRecord4);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v3", DataTypes.VARCHAR(10))});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord4);
        HashMap hashMap5 = new HashMap();
        hashMap5.put("k", "5");
        hashMap5.put("v1", "50");
        hashMap5.put("v4", "long five~");
        CdcMultiplexRecord fromCdcRecord5 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap5));
        runner.offer(fromCdcRecord5);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v4", DataTypes.VARBINARY(10))});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord5);
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    @Timeout(30)
    @Test
    public void testMultiTableUpdateColumnType() throws Exception {
        FileStoreTable table = this.catalog.getTable(this.firstTable);
        FileStoreTable table2 = this.catalog.getTable(this.secondTable);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        CdcMultiplexRecord fromCdcRecord = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.firstTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap));
        runner.offer(fromCdcRecord);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("k", "1");
        hashMap2.put("v1", "10");
        hashMap2.put("v2", "0.625");
        hashMap2.put("v3", "one");
        hashMap2.put("v4", "b_one");
        CdcMultiplexRecord fromCdcRecord2 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.secondTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap2));
        runner.offer(fromCdcRecord2);
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("pt", "1");
        hashMap3.put("k", "123456789876543211");
        hashMap3.put("v", "varchar");
        CdcMultiplexRecord fromCdcRecord3 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.firstTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap3));
        runner.offer(fromCdcRecord3);
        Assertions.assertThat(runner.poll(1L)).isNull();
        new SchemaManager(table.fileIO(), table.location()).commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("k", DataTypes.BIGINT())});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord3);
        HashMap hashMap4 = new HashMap();
        hashMap4.put("k", "2");
        hashMap4.put("v1", "12345678987654321");
        hashMap4.put("v2", "0.25");
        CdcMultiplexRecord fromCdcRecord4 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.secondTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap4));
        runner.offer(fromCdcRecord4);
        Assertions.assertThat(runner.poll(1L)).isNull();
        new SchemaManager(table2.fileIO(), table2.location()).commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v1", DataTypes.BIGINT())});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord4);
        HashMap hashMap5 = new HashMap();
        hashMap5.put("k", "3");
        hashMap5.put("v1", "100");
        hashMap5.put("v2", "1.0000000000009095");
        CdcMultiplexRecord fromCdcRecord5 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.secondTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap5));
        runner.offer(fromCdcRecord5);
        Assertions.assertThat(runner.poll(1L)).isNull();
        new SchemaManager(table2.fileIO(), table2.location()).commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v2", DataTypes.DOUBLE())});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord5);
        HashMap hashMap6 = new HashMap();
        hashMap6.put("k", "4");
        hashMap6.put("v1", "40");
        hashMap6.put("v3", "long four");
        CdcMultiplexRecord fromCdcRecord6 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.secondTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap6));
        runner.offer(fromCdcRecord6);
        Assertions.assertThat(runner.poll(1L)).isNull();
        SchemaManager schemaManager = new SchemaManager(table2.fileIO(), table2.location());
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v3", DataTypes.VARCHAR(10))});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord6);
        HashMap hashMap7 = new HashMap();
        hashMap7.put("k", "5");
        hashMap7.put("v1", "50");
        hashMap7.put("v4", "long five~");
        CdcMultiplexRecord fromCdcRecord7 = CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.secondTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap7));
        runner.offer(fromCdcRecord7);
        Assertions.assertThat(runner.poll(1L)).isNull();
        schemaManager.commitChanges(new SchemaChange[]{SchemaChange.updateColumnType("v4", DataTypes.VARBINARY(10))});
        Assertions.assertThat(runner.take()).isEqualTo(fromCdcRecord7);
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    @Timeout(30)
    @Test
    public void testUsingTheSameCompactExecutor() throws Exception {
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        Runner runner = new Runner(createTestHarness);
        Thread thread = new Thread(runner);
        thread.start();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        runner.offer(CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.firstTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap)));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("k", "1");
        hashMap2.put("v1", "10");
        hashMap2.put("v2", "0.625");
        hashMap2.put("v3", "one");
        hashMap2.put("v4", "b_one");
        runner.offer(CdcMultiplexRecord.fromCdcRecord(this.databaseName, this.secondTable.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap2)));
        CdcRecordStoreMultiWriteOperator operator = createTestHarness.getOperator();
        CommonTestUtils.waitUtil(() -> {
            return Boolean.valueOf(operator.writes().size() == 2);
        }, Duration.ofSeconds(5L), Duration.ofMillis(100L));
        ArrayList arrayList = new ArrayList(operator.writes().values());
        ArrayList arrayList2 = new ArrayList();
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            arrayList2.add(((StoreSinkWrite) it.next()).getWrite().getWrite().getCompactExecutor());
        }
        Assertions.assertThat(arrayList2.get(0) == arrayList2.get(1)).isTrue();
        ExecutorService executorService = (ExecutorService) arrayList2.get(0);
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((StoreSinkWrite) it2.next()).close();
            Assertions.assertThat(executorService.isShutdown()).isFalse();
        }
        operator.close();
        Assertions.assertThat(executorService.isShutdown()).isTrue();
        runner.stop();
        thread.join();
        createTestHarness.close();
    }

    @Test
    public void testSingleTableCompactionMetrics() throws Exception {
        Identifier identifier = this.firstTable;
        FileStoreTable table = this.catalog.getTable(identifier);
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness = createTestHarness(this.catalogLoader);
        createTestHarness.open();
        CdcRecordStoreMultiWriteOperator oneInputOperator = createTestHarness.getOneInputOperator();
        MetricGroup addGroup = oneInputOperator.getMetricGroup().addGroup("paimon").addGroup("table", table.name()).addGroup("partition", "pt=0").addGroup("bucket", "0").addGroup("compaction");
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "0");
        hashMap.put("k", "1");
        hashMap.put("v", "10");
        createTestHarness.processElement(CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap)), 0L);
        ((StoreSinkWrite) oneInputOperator.writes().get(identifier)).compact(DataFileTestUtils.row(0), 0, true);
        ((StoreSinkWrite) oneInputOperator.writes().get(identifier)).prepareCommit(true, 1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo(1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo(1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo(0L);
        hashMap.put("pt", "0");
        hashMap.put("k", "2");
        hashMap.put("v", "12");
        createTestHarness.processElement(CdcMultiplexRecord.fromCdcRecord(this.databaseName, identifier.getObjectName(), new CdcRecord(RowKind.INSERT, hashMap)), 1L);
        ((StoreSinkWrite) oneInputOperator.writes().get(identifier)).compact(DataFileTestUtils.row(0), 0, true);
        ((StoreSinkWrite) oneInputOperator.writes().get(identifier)).prepareCommit(true, 1 + 1);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedBefore").getValue()).isEqualTo(2L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedAfter").getValue()).isEqualTo(1L);
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastChangelogFilesCompacted").getValue()).isEqualTo(0L);
        createTestHarness.close();
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedBefore")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastTableFilesCompactedAfter")).isNull();
        Assertions.assertThat(MetricUtils.getGauge(addGroup, "lastChangelogFilesCompacted")).isNull();
    }

    private OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> createTestHarness(Catalog.Loader loader) throws Exception {
        CdcRecordStoreMultiWriteOperator cdcRecordStoreMultiWriteOperator = new CdcRecordStoreMultiWriteOperator(loader, (fileStoreTable, str, storeSinkWriteState, iOManager, memoryPoolFactory, metricGroup) -> {
            return new StoreSinkWriteImpl(fileStoreTable, str, storeSinkWriteState, iOManager, false, false, true, memoryPoolFactory, metricGroup);
        }, this.commitUser, Options.fromMap(new HashMap()));
        JavaSerializer javaSerializer = new JavaSerializer();
        TypeSerializer createSerializer = new MultiTableCommittableTypeInfo().createSerializer(new ExecutionConfig());
        OneInputStreamOperatorTestHarness<CdcMultiplexRecord, MultiTableCommittable> oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness<>(cdcRecordStoreMultiWriteOperator, javaSerializer);
        oneInputStreamOperatorTestHarness.setup(createSerializer);
        return oneInputStreamOperatorTestHarness;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -280860613:
                if (implMethodName.equals("lambda$createCatalogLoader$71cd4600$1")) {
                    z = true;
                    break;
                }
                break;
            case -249245184:
                if (implMethodName.equals("lambda$createTestHarness$df0b6424$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/flink/sink/StoreSinkWrite$WithWriteBufferProvider") && serializedLambda.getFunctionalInterfaceMethodName().equals("provide") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemoryPoolFactory;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/table/FileStoreTable;Ljava/lang/String;Lorg/apache/paimon/flink/sink/StoreSinkWriteState;Lorg/apache/flink/runtime/io/disk/iomanager/IOManager;Lorg/apache/paimon/memory/MemoryPoolFactory;Lorg/apache/flink/metrics/MetricGroup;)Lorg/apache/paimon/flink/sink/StoreSinkWrite;")) {
                    return (fileStoreTable, str, storeSinkWriteState, iOManager, memoryPoolFactory, metricGroup) -> {
                        return new StoreSinkWriteImpl(fileStoreTable, str, storeSinkWriteState, iOManager, false, false, true, memoryPoolFactory, metricGroup);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/paimon/catalog/Catalog$Loader") && serializedLambda.getFunctionalInterfaceMethodName().equals("load") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("()Lorg/apache/paimon/catalog/Catalog;") && serializedLambda.getImplClass().equals("org/apache/paimon/flink/sink/cdc/CdcRecordStoreMultiWriteOperatorTest") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/paimon/options/Options;)Lorg/apache/paimon/catalog/Catalog;")) {
                    Options options = (Options) serializedLambda.getCapturedArg(0);
                    return () -> {
                        return CatalogFactory.createCatalog(CatalogContext.create(options));
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
