package org.apache.flink.table.store.table.source.snapshot;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.io.DataFileMetaSerializer;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.system.BucketsTable;
import org.apache.flink.types.RowKind;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/table/store/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.class */
public class ContinuousCompactorFollowUpScannerTest extends SnapshotEnumeratorTestBase {
    private final DataFileMetaSerializer dataFileMetaSerializer = new DataFileMetaSerializer();

    @Test
    public void testGetPlan() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        SnapshotManager snapshotManager = createFileStoreTable.snapshotManager();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 10, 100L));
        newWrite.write(rowData(1, 20, 200L));
        newWrite.write(rowData(2, 40, 400L));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 30, 300L));
        newWrite.write(rowDataWithKind(RowKind.DELETE, 2, 40, 400L));
        newWrite.compact(binaryRow(1), 0, true);
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.close();
        newCommit.close();
        HashMap hashMap = new HashMap();
        hashMap.put("pt", "1");
        TableWrite withOverwrite = createFileStoreTable.newWrite(this.commitUser).withOverwrite(true);
        TableCommit withOverwritePartition = createFileStoreTable.newCommit(this.commitUser).withOverwritePartition(hashMap);
        withOverwrite.write(rowData(1, 10, 101L));
        withOverwrite.write(rowData(1, 20, 201L));
        withOverwritePartition.commit(2L, withOverwrite.prepareCommit(true, 2L));
        withOverwrite.close();
        withOverwritePartition.close();
        Assertions.assertThat(snapshotManager.latestSnapshotId()).isEqualTo(4L);
        BucketsTable bucketsTable = new BucketsTable(createFileStoreTable, true);
        DataTableScan newScan = bucketsTable.newScan();
        TableRead newRead = bucketsTable.newRead();
        ContinuousCompactorFollowUpScanner continuousCompactorFollowUpScanner = new ContinuousCompactorFollowUpScanner();
        Snapshot snapshot = snapshotManager.snapshot(1L);
        Assertions.assertThat(snapshot.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        Assertions.assertThat(continuousCompactorFollowUpScanner.shouldScanSnapshot(snapshot)).isTrue();
        DataTableScan.DataFilePlan plan = continuousCompactorFollowUpScanner.getPlan(1L, newScan);
        Assertions.assertThat(plan.snapshotId).isEqualTo(1L);
        Assertions.assertThat(getResult(newRead, plan.splits())).hasSameElementsAs(Arrays.asList("+I 1|1|0|1", "+I 1|2|0|1"));
        Snapshot snapshot2 = snapshotManager.snapshot(2L);
        Assertions.assertThat(snapshot2.commitKind()).isEqualTo(Snapshot.CommitKind.APPEND);
        Assertions.assertThat(continuousCompactorFollowUpScanner.shouldScanSnapshot(snapshot2)).isTrue();
        DataTableScan.DataFilePlan plan2 = continuousCompactorFollowUpScanner.getPlan(2L, newScan);
        Assertions.assertThat(plan2.snapshotId).isEqualTo(2L);
        Assertions.assertThat(getResult(newRead, plan2.splits())).hasSameElementsAs(Collections.singletonList("+I 2|2|0|1"));
        Snapshot snapshot3 = snapshotManager.snapshot(3L);
        Assertions.assertThat(snapshot3.commitKind()).isEqualTo(Snapshot.CommitKind.COMPACT);
        Assertions.assertThat(continuousCompactorFollowUpScanner.shouldScanSnapshot(snapshot3)).isFalse();
        Snapshot snapshot4 = snapshotManager.snapshot(4L);
        Assertions.assertThat(snapshot4.commitKind()).isEqualTo(Snapshot.CommitKind.OVERWRITE);
        Assertions.assertThat(continuousCompactorFollowUpScanner.shouldScanSnapshot(snapshot4)).isFalse();
    }

    @Override // org.apache.flink.table.store.table.source.snapshot.SnapshotEnumeratorTestBase
    protected String rowDataToString(RowData rowData) {
        try {
            return String.format("%s %d|%d|%d|%d", rowData.getRowKind().shortString(), Long.valueOf(rowData.getLong(0)), Integer.valueOf(rowData.getInt(1)), Integer.valueOf(rowData.getInt(2)), Integer.valueOf(this.dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size()));
        } catch (IOException e) {
            throw new UncheckedIOException(e);
        }
    }
}
