package org.apache.paimon.table.sink;

import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.stream.LongStream;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.lineage.LineageMetaFactory;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.metastore.MetastoreClient;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaUtils;
import org.apache.paimon.table.CatalogEnvironment;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.ExceptionUtils;
import org.apache.paimon.utils.FailingFileIO;
import org.apache.paimon.utils.FileStorePathFactoryTest;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/paimon/table/sink/TableCommitTest.class */
public class TableCommitTest {

    @TempDir
    Path tempDir;
    private static final Map<String, Set<Long>> commitCallbackResult = new ConcurrentHashMap();

    /* loaded from: input_file:org/apache/paimon/table/sink/TableCommitTest$TestCommitCallback.class */
    public static class TestCommitCallback implements CommitCallback {
        private final String testId;

        public TestCommitCallback(String str) {
            this.testId = str;
        }

        public void call(List<ManifestCommittable> list) {
            list.forEach(manifestCommittable -> {
                ((Set) TableCommitTest.commitCallbackResult.get(this.testId)).add(Long.valueOf(manifestCommittable.identifier()));
            });
        }

        public void close() throws Exception {
        }
    }

    @Test
    public void testCommitCallbackWithFailureFixedBucket() throws Exception {
        innerTestCommitCallbackWithFailure(1);
    }

    @Test
    public void testCommitCallbackWithFailureDynamicBucket() throws Exception {
        innerTestCommitCallbackWithFailure(-1);
    }

    private void innerTestCommitCallbackWithFailure(int i) throws Exception {
        String uuid = UUID.randomUUID().toString();
        commitCallbackResult.put(uuid, new HashSet());
        try {
            testCommitCallbackWithFailureImpl(i, 30, uuid);
            commitCallbackResult.remove(uuid);
        } catch (Throwable th) {
            commitCallbackResult.remove(uuid);
            throw th;
        }
    }

    private void testCommitCallbackWithFailureImpl(int i, int i2, String str) throws Exception {
        String uuid = UUID.randomUUID().toString();
        FailingFileIO.reset(uuid, 0, 1);
        String failingPath = FailingFileIO.getFailingPath(uuid, this.tempDir.toString());
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k", "v"});
        Options options = new Options();
        options.set(CoreOptions.PATH, failingPath);
        options.set(CoreOptions.BUCKET, Integer.valueOf(i));
        options.set(CoreOptions.COMMIT_CALLBACKS, TestCommitCallback.class.getName());
        options.set(CoreOptions.COMMIT_CALLBACK_PARAM.key().replace("#", TestCommitCallback.class.getName()), str);
        FileStoreTable create = FileStoreTableFactory.create(new FailingFileIO(), new org.apache.paimon.fs.Path(failingPath), SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), new org.apache.paimon.fs.Path(failingPath)), new Schema(of.getFields(), Collections.emptyList(), Collections.singletonList("k"), options.toMap(), "")), new CatalogEnvironment(Lock.emptyFactory(), (MetastoreClient.Factory) null, (LineageMetaFactory) null));
        String uuid2 = UUID.randomUUID().toString();
        TableWriteImpl newWrite = create.newWrite(uuid2);
        HashMap hashMap = new HashMap();
        for (int i3 = 0; i3 < i2; i3++) {
            if (i == -1) {
                newWrite.write(GenericRow.of(new Object[]{Integer.valueOf(i3), Long.valueOf(i3 * 1000)}), 0);
            } else {
                newWrite.write(GenericRow.of(new Object[]{Integer.valueOf(i3), Long.valueOf(i3 * 1000)}));
            }
            hashMap.put(Long.valueOf(i3), newWrite.prepareCommit(true, i3));
        }
        newWrite.close();
        TableCommitImpl newCommit = create.newCommit(uuid2);
        FailingFileIO.reset(uuid, 3, 1000);
        while (true) {
            try {
                newCommit.filterAndCommit(hashMap);
                newCommit.close();
                Assertions.assertThat(commitCallbackResult.get(str)).isEqualTo(LongStream.range(0L, i2).boxed().collect(Collectors.toSet()));
                return;
            } catch (Throwable th) {
                Optional findThrowable = ExceptionUtils.findThrowable(th, FailingFileIO.ArtificialException.class);
                Optional findThrowableWithMessage = ExceptionUtils.findThrowableWithMessage(th, "Conflicts during commits are normal");
                if (!findThrowable.isPresent() && !findThrowableWithMessage.isPresent()) {
                    throw th;
                }
            }
        }
    }

    @Test
    public void testRecoverDeletedFiles() throws Exception {
        String path = this.tempDir.toString();
        RowType of = RowType.of(new DataType[]{DataTypes.INT(), DataTypes.BIGINT()}, new String[]{"k", "v"});
        Options options = new Options();
        options.set(CoreOptions.PATH, path);
        options.set(CoreOptions.BUCKET, 1);
        FileStoreTable create = FileStoreTableFactory.create(LocalFileIO.create(), new org.apache.paimon.fs.Path(path), SchemaUtils.forceCommit(new SchemaManager(LocalFileIO.create(), new org.apache.paimon.fs.Path(path)), new Schema(of.getFields(), Collections.emptyList(), Collections.singletonList("k"), options.toMap(), "")), new CatalogEnvironment(Lock.emptyFactory(), (MetastoreClient.Factory) null, (LineageMetaFactory) null));
        String uuid = UUID.randomUUID().toString();
        TableWriteImpl newWrite = create.newWrite(uuid);
        newWrite.write(GenericRow.of(new Object[]{0, 0L}));
        List prepareCommit = newWrite.prepareCommit(true, 0L);
        newWrite.write(GenericRow.of(new Object[]{1, 1L}));
        List prepareCommit2 = newWrite.prepareCommit(true, 1L);
        newWrite.close();
        TableCommitImpl newCommit = create.newCommit(uuid);
        newCommit.commit(0L, prepareCommit);
        for (CommitMessageImpl commitMessageImpl : Arrays.asList((CommitMessageImpl) prepareCommit.get(0), (CommitMessageImpl) prepareCommit2.get(0))) {
            LocalFileIO.create().delete((org.apache.paimon.fs.Path) ((DataFileMeta) commitMessageImpl.newFilesIncrement().newFiles().get(0)).collectFiles(FileStorePathFactoryTest.createNonPartFactory(new org.apache.paimon.fs.Path(path)).createDataFilePathFactory(commitMessageImpl.partition(), commitMessageImpl.bucket())).get(0), true);
        }
        newCommit.filterAndCommit(Collections.singletonMap(0L, prepareCommit));
        Assertions.assertThatThrownBy(() -> {
            newCommit.filterAndCommit(Collections.singletonMap(1L, prepareCommit2));
        }).hasMessageContaining("Cannot recover from this checkpoint because some files in the snapshot that need to be resubmitted have been deleted");
    }
}
