package org.apache.paimon.table.sink;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.time.Duration;
import java.time.LocalDateTime;
import java.time.temporal.TemporalAmount;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.consumer.ConsumerManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.manifest.ManifestCommittable;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.metrics.MetricRegistry;
import org.apache.paimon.operation.FileStoreCommit;
import org.apache.paimon.operation.FileStoreExpire;
import org.apache.paimon.operation.Lock;
import org.apache.paimon.operation.PartitionExpire;
import org.apache.paimon.operation.metrics.CommitMetrics;
import org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
import org.apache.paimon.shade.org.apache.commons.lang3.StringUtils;
import org.apache.paimon.tag.TagAutoCreation;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.FileUtils;
import org.apache.paimon.utils.IOUtils;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.PathFactory;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/table/sink/TableCommitImpl.class */
public class TableCommitImpl implements InnerTableCommit {
    private static final Logger LOG = LoggerFactory.getLogger(TableCommitImpl.class);
    private final FileStoreCommit commit;
    private final List<CommitCallback> commitCallbacks;

    @Nullable
    private final FileStoreExpire expire;

    @Nullable
    private final PartitionExpire partitionExpire;

    @Nullable
    private final TagAutoCreation tagAutoCreation;
    private final Lock lock;

    @Nullable
    private final Duration consumerExpireTime;
    private final ConsumerManager consumerManager;
    private final ExecutorService expireMainExecutor;
    private final AtomicReference<Throwable> expireError;
    private final String tableName;

    @Nullable
    private Map<String, String> overwritePartition = null;
    private boolean batchCommitted = false;

    public TableCommitImpl(FileStoreCommit fileStoreCommit, List<CommitCallback> list, @Nullable FileStoreExpire fileStoreExpire, @Nullable PartitionExpire partitionExpire, @Nullable TagAutoCreation tagAutoCreation, Lock lock, @Nullable Duration duration, ConsumerManager consumerManager, CoreOptions.ExpireExecutionMode expireExecutionMode, String str) {
        fileStoreCommit.withLock(lock);
        if (fileStoreExpire != null) {
            fileStoreExpire.withLock(lock);
        }
        if (partitionExpire != null) {
            partitionExpire.withLock(lock);
        }
        this.commit = fileStoreCommit;
        this.commitCallbacks = list;
        this.expire = fileStoreExpire;
        this.partitionExpire = partitionExpire;
        this.tagAutoCreation = tagAutoCreation;
        this.lock = lock;
        this.consumerExpireTime = duration;
        this.consumerManager = consumerManager;
        this.expireMainExecutor = expireExecutionMode == CoreOptions.ExpireExecutionMode.SYNC ? MoreExecutors.newDirectExecutorService() : Executors.newSingleThreadExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "expire-main-thread"));
        this.expireError = new AtomicReference<>(null);
        this.tableName = str;
    }

    public boolean forceCreatingSnapshot() {
        return this.tagAutoCreation != null && this.tagAutoCreation.forceCreatingSnapshot();
    }

    @Override // org.apache.paimon.table.sink.InnerTableCommit
    public TableCommitImpl withOverwrite(@Nullable Map<String, String> map) {
        this.overwritePartition = map;
        return this;
    }

    @Override // org.apache.paimon.table.sink.InnerTableCommit
    public TableCommitImpl ignoreEmptyCommit(boolean z) {
        this.commit.ignoreEmptyCommit(z);
        return this;
    }

    @Override // org.apache.paimon.table.sink.InnerTableCommit
    public InnerTableCommit withMetricRegistry(MetricRegistry metricRegistry) {
        this.commit.withMetrics(new CommitMetrics(metricRegistry, this.tableName));
        return this;
    }

    @Override // org.apache.paimon.table.sink.StreamTableCommit
    public Set<Long> filterCommitted(Set<Long> set) {
        return this.commit.filterCommitted(set);
    }

    @Override // org.apache.paimon.table.sink.BatchTableCommit
    public void commit(List<CommitMessage> list) {
        Preconditions.checkState(!this.batchCommitted, "BatchTableCommit only support one-time committing.");
        this.batchCommitted = true;
        commit(BatchWriteBuilder.COMMIT_IDENTIFIER, list);
    }

    @Override // org.apache.paimon.table.sink.StreamTableCommit
    public void commit(long j, List<CommitMessage> list) {
        commit(createManifestCommittable(j, list));
    }

    @Override // org.apache.paimon.table.sink.StreamTableCommit
    public int filterAndCommit(Map<Long, List<CommitMessage>> map) {
        return filterAndCommitMultiple((List) map.entrySet().stream().map(entry -> {
            return createManifestCommittable(((Long) entry.getKey()).longValue(), (List) entry.getValue());
        }).collect(Collectors.toList()));
    }

    private ManifestCommittable createManifestCommittable(long j, List<CommitMessage> list) {
        ManifestCommittable manifestCommittable = new ManifestCommittable(j);
        Iterator<CommitMessage> it = list.iterator();
        while (it.hasNext()) {
            manifestCommittable.addFileCommittable(it.next());
        }
        return manifestCommittable;
    }

    public void commit(ManifestCommittable manifestCommittable) {
        commitMultiple(Collections.singletonList(manifestCommittable));
    }

    public void commitMultiple(List<ManifestCommittable> list) {
        if (this.overwritePartition == null) {
            Iterator<ManifestCommittable> it = list.iterator();
            while (it.hasNext()) {
                this.commit.commit(it.next(), new HashMap());
            }
            if (!list.isEmpty()) {
                expire(list.get(list.size() - 1).identifier(), this.expireMainExecutor);
            }
        } else {
            if (list.size() > 1) {
                throw new RuntimeException("Multiple committables appear in overwrite mode, this may be a bug, please report it: " + list);
            }
            ManifestCommittable manifestCommittable = list.size() == 1 ? list.get(0) : new ManifestCommittable(BatchWriteBuilder.COMMIT_IDENTIFIER);
            this.commit.overwrite(this.overwritePartition, manifestCommittable, Collections.emptyMap());
            expire(manifestCommittable.identifier(), this.expireMainExecutor);
        }
        this.commitCallbacks.forEach(commitCallback -> {
            commitCallback.call(list);
        });
    }

    public int filterAndCommitMultiple(List<ManifestCommittable> list) {
        Set<Long> filterCommitted = this.commit.filterCommitted((Set<Long>) list.stream().map((v0) -> {
            return v0.identifier();
        }).collect(Collectors.toSet()));
        List list2 = (List) list.stream().filter(manifestCommittable -> {
            return !filterCommitted.contains(Long.valueOf(manifestCommittable.identifier()));
        }).collect(Collectors.toList());
        this.commitCallbacks.forEach(commitCallback -> {
            commitCallback.call(list2);
        });
        List<ManifestCommittable> list3 = (List) list.stream().filter(manifestCommittable2 -> {
            return filterCommitted.contains(Long.valueOf(manifestCommittable2.identifier()));
        }).sorted(Comparator.comparingLong((v0) -> {
            return v0.identifier();
        })).collect(Collectors.toList());
        if (list3.size() > 0) {
            checkFilesExistence(list3);
            commitMultiple(list3);
        }
        return list3.size();
    }

    private void checkFilesExistence(List<ManifestCommittable> list) {
        ArrayList arrayList = new ArrayList();
        HashMap hashMap = new HashMap();
        PathFactory indexFileFactory = this.commit.pathFactory().indexFileFactory();
        Iterator<ManifestCommittable> it = list.iterator();
        while (it.hasNext()) {
            for (CommitMessage commitMessage : it.next().fileCommittables()) {
                CommitMessageImpl commitMessageImpl = (CommitMessageImpl) commitMessage;
                DataFilePathFactory dataFilePathFactory = (DataFilePathFactory) hashMap.computeIfAbsent(Pair.of(commitMessage.partition(), Integer.valueOf(commitMessage.bucket())), pair -> {
                    return this.commit.pathFactory().createDataFilePathFactory((BinaryRow) pair.getKey(), ((Integer) pair.getValue()).intValue());
                });
                Consumer consumer = dataFileMeta -> {
                    arrayList.addAll(dataFileMeta.collectFiles(dataFilePathFactory));
                };
                commitMessageImpl.newFilesIncrement().newFiles().forEach(consumer);
                commitMessageImpl.newFilesIncrement().changelogFiles().forEach(consumer);
                commitMessageImpl.compactIncrement().compactBefore().forEach(consumer);
                commitMessageImpl.compactIncrement().compactAfter().forEach(consumer);
                Stream<R> map = commitMessageImpl.indexIncrement().newIndexFiles().stream().map((v0) -> {
                    return v0.fileName();
                });
                indexFileFactory.getClass();
                Stream map2 = map.map(indexFileFactory::toPath);
                arrayList.getClass();
                map2.forEach((v1) -> {
                    r1.add(v1);
                });
            }
        }
        Predicate predicate = path -> {
            try {
                return !this.commit.fileIO().exists(path);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        };
        try {
            List list2 = (List) FileUtils.COMMON_IO_FORK_JOIN_POOL.submit(() -> {
                return (List) arrayList.parallelStream().filter(predicate).collect(Collectors.toList());
            }).get();
            if (list2.size() > 0) {
                throw new RuntimeException(String.join(StringUtils.LF, "Cannot recover from this checkpoint because some files in the snapshot that need to be resubmitted have been deleted:", "    " + ((String) list2.stream().map((v0) -> {
                    return v0.toString();
                }).collect(Collectors.joining(FieldListaggAgg.DELIMITER))), "    The most likely reason is because you are recovering from a very old savepoint that contains some uncommitted files that have already been deleted."));
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException(e2.getCause());
        }
    }

    private void expire(long j, ExecutorService executorService) {
        if (this.expireError.get() != null) {
            throw new RuntimeException(this.expireError.get());
        }
        executorService.execute(() -> {
            try {
                expire(j);
            } catch (Throwable th) {
                LOG.error("Executing expire encountered an error.", th);
                this.expireError.compareAndSet(null, th);
            }
        });
    }

    private void expire(long j) {
        if (this.consumerExpireTime != null) {
            this.consumerManager.expire(LocalDateTime.now().minus((TemporalAmount) this.consumerExpireTime));
        }
        if (this.expire != null) {
            this.expire.expire();
        }
        if (this.partitionExpire != null) {
            this.partitionExpire.expire(j);
        }
        if (this.tagAutoCreation != null) {
            this.tagAutoCreation.run();
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Iterator<CommitCallback> it = this.commitCallbacks.iterator();
        while (it.hasNext()) {
            IOUtils.closeQuietly(it.next());
        }
        IOUtils.closeQuietly(this.lock);
        this.expireMainExecutor.shutdownNow();
    }

    @Override // org.apache.paimon.table.sink.TableCommit
    public void abort(List<CommitMessage> list) {
        this.commit.abort(list);
    }

    @VisibleForTesting
    public ExecutorService getExpireMainExecutor() {
        return this.expireMainExecutor;
    }

    @Override // org.apache.paimon.table.sink.InnerTableCommit
    public /* bridge */ /* synthetic */ InnerTableCommit withOverwrite(@Nullable Map map) {
        return withOverwrite((Map<String, String>) map);
    }
}
