package org.apache.paimon.flink.sink;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.append.AppendOnlyCompactionTask;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.table.TableTestBase;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.types.DataTypes;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/paimon/flink/sink/AppendOnlyTableCompactionWorkerOperatorTest.class */
public class AppendOnlyTableCompactionWorkerOperatorTest extends TableTestBase {
    @Test
    public void testAsyncCompactionWorks() throws Exception {
        createTableDefault();
        AppendOnlyTableCompactionWorkerOperator appendOnlyTableCompactionWorkerOperator = new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user");
        List<AppendOnlyCompactionTask> packTask = packTask(writeDataDefault(200, 20), 5);
        List list = (List) packTask.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList());
        Assertions.assertThat(packTask.size()).isEqualTo(4);
        appendOnlyTableCompactionWorkerOperator.open();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            appendOnlyTableCompactionWorkerOperator.processElement((StreamRecord) it.next());
        }
        ArrayList arrayList = new ArrayList();
        Long valueOf = Long.valueOf(System.currentTimeMillis());
        long j = 60000;
        Assertions.assertThatCode(() -> {
            while (arrayList.size() != 4) {
                arrayList.addAll(appendOnlyTableCompactionWorkerOperator.prepareCommit(false, Long.MAX_VALUE));
                if (Long.valueOf(System.currentTimeMillis()).longValue() - valueOf.longValue() > j && arrayList.size() != 4) {
                    throw new RuntimeException("Timeout waiting for compaction, maybe some error happens in " + AppendOnlyTableCompactionWorkerOperator.class.getName());
                }
                Thread.sleep(1000L);
            }
        }).doesNotThrowAnyException();
        arrayList.forEach(committable -> {
            Assertions.assertThat(((CommitMessageImpl) committable.wrappedCommittable()).compactIncrement().compactAfter().size() == 1).isTrue();
        });
    }

    @Test
    public void testAsyncCompactionFileDeletedWhenShutdown() throws Exception {
        createTableDefault();
        AppendOnlyTableCompactionWorkerOperator appendOnlyTableCompactionWorkerOperator = new AppendOnlyTableCompactionWorkerOperator(getTableDefault(), "user");
        List<AppendOnlyCompactionTask> packTask = packTask(writeDataDefault(200, 40), 5);
        List list = (List) packTask.stream().map((v1) -> {
            return new StreamRecord(v1);
        }).collect(Collectors.toList());
        Assertions.assertThat(packTask.size()).isEqualTo(8);
        appendOnlyTableCompactionWorkerOperator.open();
        Iterator it = list.iterator();
        while (it.hasNext()) {
            appendOnlyTableCompactionWorkerOperator.processElement((StreamRecord) it.next());
        }
        Thread.sleep(500L);
        LocalFileIO create = LocalFileIO.create();
        DataFilePathFactory createDataFilePathFactory = getTableDefault().store().pathFactory().createDataFilePathFactory(BinaryRow.EMPTY_ROW, 0);
        int i = 0;
        for (Future future : appendOnlyTableCompactionWorkerOperator.result()) {
            if (!future.isDone()) {
                break;
            }
            Iterator it2 = ((CommitMessage) future.get()).compactIncrement().compactAfter().iterator();
            while (it2.hasNext()) {
                Assertions.assertThat(create.exists(createDataFilePathFactory.toPath(((DataFileMeta) it2.next()).fileName()))).isTrue();
            }
            int i2 = i;
            i++;
            if (i2 > 2) {
                break;
            }
        }
        appendOnlyTableCompactionWorkerOperator.shutdown();
        Thread.sleep(2000L);
        for (Future future2 : appendOnlyTableCompactionWorkerOperator.result()) {
            try {
                if (!future2.isDone()) {
                    try {
                        future2.get(5L, TimeUnit.SECONDS);
                    } catch (Exception e) {
                        return;
                    }
                }
                Iterator it3 = ((CommitMessage) future2.get()).compactIncrement().compactAfter().iterator();
                while (it3.hasNext()) {
                    Assertions.assertThat(create.exists(createDataFilePathFactory.toPath(((DataFileMeta) it3.next()).fileName()))).isFalse();
                }
            } catch (Exception e2) {
            }
        }
    }

    protected Schema schemaDefault() {
        Schema.Builder newBuilder = Schema.newBuilder();
        newBuilder.column("f0", DataTypes.INT());
        newBuilder.column("f1", DataTypes.BIGINT());
        newBuilder.column("f2", DataTypes.STRING());
        newBuilder.option(CoreOptions.BUCKET.key(), "-1");
        newBuilder.option(CoreOptions.COMPACTION_MAX_FILE_NUM.key(), "5");
        return newBuilder.build();
    }

    protected InternalRow dataDefault(int i, int i2) {
        return GenericRow.of(new Object[]{Integer.valueOf(RANDOM.nextInt()), Long.valueOf(RANDOM.nextLong()), randomString()});
    }

    private List<AppendOnlyCompactionTask> packTask(List<CommitMessage> list, int i) {
        ArrayList arrayList = new ArrayList();
        List list2 = (List) list.stream().flatMap(commitMessage -> {
            return ((CommitMessageImpl) commitMessage).newFilesIncrement().newFiles().stream();
        }).collect(Collectors.toList());
        int i2 = 0;
        while (true) {
            int i3 = i2;
            if (i3 >= list2.size()) {
                return arrayList;
            }
            if (i3 < list2.size() - i) {
                arrayList.add(new AppendOnlyCompactionTask(BinaryRow.EMPTY_ROW, list2.subList(i3, i3 + i)));
            } else {
                arrayList.add(new AppendOnlyCompactionTask(BinaryRow.EMPTY_ROW, list2.subList(i3, list2.size())));
            }
            i2 = i3 + i;
        }
    }
}
