package org.apache.flink.table.store.connector.sink;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.store.connector.source.CompactorSourceBuilder;
import org.apache.flink.table.store.file.Snapshot;
import org.apache.flink.table.store.file.schema.SchemaManager;
import org.apache.flink.table.store.file.schema.UpdateSchema;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.FileStoreTableFactory;
import org.apache.flink.table.store.table.sink.TableCommit;
import org.apache.flink.table.store.table.sink.TableWrite;
import org.apache.flink.table.store.table.source.DataSplit;
import org.apache.flink.table.store.table.source.DataTableScan;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.test.util.AbstractTestBase;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/store/connector/sink/CompactorSinkITCase.class */
public class CompactorSinkITCase extends AbstractTestBase {
    private static final RowType ROW_TYPE = RowType.of(new LogicalType[]{DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.INT().getLogicalType(), DataTypes.STRING().getLogicalType()}, new String[]{"k", "v", "hh", "dt"});
    private Path tablePath;
    private String commitUser;

    @Before
    public void before() throws IOException {
        this.tablePath = new Path(TEMPORARY_FOLDER.newFolder().toString());
        this.commitUser = UUID.randomUUID().toString();
    }

    @Test
    public void testCompact() throws Exception {
        FileStoreTable createFileStoreTable = createFileStoreTable();
        SnapshotManager snapshotManager = createFileStoreTable.snapshotManager();
        TableWrite newWrite = createFileStoreTable.newWrite(this.commitUser);
        TableCommit newCommit = createFileStoreTable.newCommit(this.commitUser);
        newWrite.write(rowData(1, 100, 15, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 100, 16, StringData.fromString("20221208")));
        newWrite.write(rowData(1, 100, 15, StringData.fromString("20221209")));
        newCommit.commit(0L, newWrite.prepareCommit(true, 0L));
        newWrite.write(rowData(2, 200, 15, StringData.fromString("20221208")));
        newWrite.write(rowData(2, 200, 16, StringData.fromString("20221208")));
        newWrite.write(rowData(2, 200, 15, StringData.fromString("20221209")));
        newCommit.commit(1L, newWrite.prepareCommit(true, 1L));
        Snapshot snapshot = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assert.assertEquals(2L, snapshot.id());
        Assert.assertEquals(Snapshot.CommitKind.APPEND, snapshot.commitKind());
        newWrite.close();
        newCommit.close();
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.setRuntimeMode(RuntimeExecutionMode.BATCH);
        new CompactorSinkBuilder(createFileStoreTable).withInput(new CompactorSourceBuilder(this.tablePath.toString(), createFileStoreTable).withEnv(executionEnvironment).withContinuousMode(false).withPartitions(getSpecifiedPartitions()).build()).build();
        executionEnvironment.execute();
        Snapshot snapshot2 = snapshotManager.snapshot(snapshotManager.latestSnapshotId().longValue());
        Assert.assertEquals(3L, snapshot2.id());
        Assert.assertEquals(Snapshot.CommitKind.COMPACT, snapshot2.commitKind());
        DataTableScan.DataFilePlan plan = createFileStoreTable.newScan().plan();
        Assert.assertEquals(3L, plan.splits().size());
        Iterator it = plan.splits.iterator();
        while (it.hasNext()) {
            if (((DataSplit) it.next()).partition().getInt(1) == 15) {
                Assert.assertEquals(1L, r0.files().size());
            } else {
                Assert.assertEquals(2L, r0.files().size());
            }
        }
    }

    private List<Map<String, String>> getSpecifiedPartitions() {
        HashMap hashMap = new HashMap();
        hashMap.put("dt", "20221208");
        hashMap.put("hh", "15");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("dt", "20221209");
        hashMap2.put("hh", "15");
        return Arrays.asList(hashMap, hashMap2);
    }

    private GenericRowData rowData(Object... objArr) {
        return GenericRowData.of(objArr);
    }

    private FileStoreTable createFileStoreTable() throws Exception {
        return FileStoreTableFactory.create(this.tablePath, new SchemaManager(this.tablePath).commitNewVersion(new UpdateSchema(ROW_TYPE, Arrays.asList("dt", "hh"), Arrays.asList("dt", "hh", "k"), Collections.emptyMap(), "")));
    }
}
