package org.apache.flink.table.store.connector.action;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.RecordReaderIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.store.table.source.Split;
import org.apache.flink.table.store.table.source.TableRead;
import org.apache.flink.table.store.table.source.snapshot.ContinuousDataFileSnapshotEnumerator;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/action/CompactActionITCase.class */
public class CompactActionITCase extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()}, new String[]{"k", "v", "hh", "dt"});
    private Path tablePath;
    private String commitUser;

    @Before
    public void before() throws IOException {
        this.tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test(timeout = 60000)
    public void testBatchCompact() throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        FileStoreTable createFileStoreTable = createFileStoreTable(hashMap);
        SnapshotManager snapshotManager = createFileStoreTable.snapshotManager();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 100, 15, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 100, 16, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 100, 15, StringData.fromString("20221209")));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 200, 15, StringData.fromString("20221208")));
        newWrite.write(rowData(2, 200, 16, StringData.fromString("20221208")));
        newWrite.write(rowData(2, 200, 15, StringData.fromString("20221209")));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assert.assertEquals(2L, snapshot.id());
        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
        newWrite.close();
        newCommit.close();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        new CompactAction(this.tablePath).withPartitions(getSpecifiedPartitions()).build(executionEnvironment);
        executionEnvironment.execute();
        Snapshot snapshot2 = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assert.assertEquals(3L, snapshot2.id());
        Assert.assertEquals(Snapshot.CommitKind.COMPACT, snapshot2.commitKind());
        DataTableScan.DataFilePlan plan = createFileStoreTable.newScan().plan();
        Assert.assertEquals(3L, plan.splits().size());
        Iterator it = plan.splits.iterator();
        while (it.hasNext()) {
            if (((DataSplit) it.next()).partition().getInt(1) == 15) {
                Assert.assertEquals(1L, r0.files().size());
            } else {
                Assert.assertEquals(2L, r0.files().size());
            }
        }
    }

    @Test(timeout = 60000)
    public void testStreamingCompact() throws Exception {
        DataTableScan.DataFilePlan enumerate;
        HashMap hashMap = new HashMap();
        hashMap.put(CoreOptions.CHANGELOG_PRODUCER.key(), "full-compaction");
        hashMap.put(CoreOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL.key(), "1s");
        hashMap.put(CoreOptions.WRITE_ONLY.key(), "true");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN.key(), "3");
        hashMap.put(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX.key(), "3");
        FileStoreTable createFileStoreTable = createFileStoreTable(hashMap);
        SnapshotManager snapshotManager = createFileStoreTable.snapshotManager();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 100, 15, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 100, 16, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 100, 15, StringData.fromString("20221209")));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assert.assertEquals(1L, snapshot.id());
        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
        ContinuousDataFileSnapshotEnumerator create = ContinuousDataFileSnapshotEnumerator.create(createFileStoreTable, createFileStoreTable.newScan(), (Long) null);
        DataTableScan.DataFilePlan enumerate2 = create.enumerate();
        Assert.assertEquals(1L, enumerate2.snapshotId.longValue());
        Assert.assertTrue(enumerate2.splits().isEmpty());
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.STREAMING);
        executionEnvironment.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        executionEnvironment.getCheckpointConfig().setCheckpointInterval(500L);
        new CompactAction(this.tablePath).withPartitions(getSpecifiedPartitions()).build(executionEnvironment);
        executionEnvironment.executeAsync();
        while (true) {
            enumerate = create.enumerate();
            if (enumerate != null) {
                break;
            } else {
                Thread.sleep(1000L);
            }
        }
        Assert.assertEquals(2L, enumerate.snapshotId.longValue());
        List<String> result = getResult(createFileStoreTable.newRead(), enumerate.splits());
        result.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assert.assertEquals(Arrays.asList("+I 1|100|15|20221208", "+I 1|100|15|20221209"), result);
        newWrite.write(rowData(1, 101, 15, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 101, 16, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 101, 15, StringData.fromString("20221209")));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        newWrite.close();
        newCommit.close();
        while (true) {
            DataTableScan.DataFilePlan enumerate3 = create.enumerate();
            if (enumerate3 != null) {
                Assert.assertEquals(4L, enumerate3.snapshotId.longValue());
                List<String> result2 = getResult(createFileStoreTable.newRead(), enumerate3.splits());
                result2.sort((v0, v1) -> {
                    return v0.compareTo(v1);
                });
                Assert.assertEquals(Arrays.asList("+U 1|101|15|20221208", "+U 1|101|15|20221209", "-U 1|100|15|20221208", "-U 1|100|15|20221209"), result2);
                Assert.assertEquals(2L, snapshotManager.earliestSnapshotId().longValue());
                return;
            }
            Thread.sleep(1000L);
        }
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap hashMap = new HashMap();
        hashMap.put("dt", "20221208");
        hashMap.put("hh", "15");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("dt", "20221209");
        hashMap2.put("hh", "15");
        return Arrays.asList(hashMap, hashMap2);
    }

    private GenericRowData rowData(Object... objArr) {
        return GenericRowData.of(objArr);
    }

    private List<String> getResult(TableRead tableRead, List<Split> list) throws Exception {
        ArrayList arrayList = new ArrayList();
        for (Split split : list) {
            arrayList.add(() -> {
                return tableRead.createReader(split);
            });
        }
        RecordReaderIterator recordReaderIterator = new RecordReaderIterator(ConcatRecordReader.create(arrayList));
        ArrayList arrayList2 = new ArrayList();
        while (recordReaderIterator.hasNext()) {
            arrayList2.add(rowDataToString((RowData) recordReaderIterator.next()));
        }
        recordReaderIterator.close();
        return arrayList2;
    }

    private String rowDataToString(RowData rowData) {
        return String.format("%s %d|%d|%d|%s", rowData.getRowKind().shortString(), Integer.valueOf(rowData.getInt(0)), Integer.valueOf(rowData.getInt(1)), Integer.valueOf(rowData.getInt(2)), rowData.getString(3).toString());
    }

    private FileStoreTable createFileStoreTable(Map<String, String> map) throws Exception {
        return FileStoreTableFactory.create(this.tablePath, new SchemaManager(this.tablePath).commitNewVersion(new UpdateSchema(ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), map, "")));
    }
}
