package org.apache.paimon.operation;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.CommitIncrement;
import org.apache.paimon.utils.ExecutorThreadFactory;
import org.apache.paimon.utils.RecordWriter;
import org.apache.paimon.utils.Restorable;
import org.apache.paimon.utils.SnapshotManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/operation/AbstractFileStoreWrite.class */
public abstract class AbstractFileStoreWrite<T> implements FileStoreWrite<T>, Restorable<List<State>> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractFileStoreWrite.class);
    private final String commitUser;
    private final SnapshotManager snapshotManager;
    private final FileStoreScan scan;

    @Nullable
    protected IOManager ioManager;
    private ExecutorService lazyCompactExecutor;
    private boolean overwrite = false;
    protected final Map<BinaryRow, Map<Integer, WriterContainer<T>>> writers = new HashMap();

    /* loaded from: input_file:org/apache/paimon/operation/AbstractFileStoreWrite$State.class */
    public static class State {
        protected final BinaryRow partition;
        protected final int bucket;
        protected final long baseSnapshotId;
        protected final long lastModifiedCommitIdentifier;
        protected final List<DataFileMeta> dataFiles;
        protected final CommitIncrement commitIncrement;

        protected State(BinaryRow binaryRow, int i, long j, long j2, Collection<DataFileMeta> collection, CommitIncrement commitIncrement) {
            this.partition = binaryRow;
            this.bucket = i;
            this.baseSnapshotId = j;
            this.lastModifiedCommitIdentifier = j2;
            this.dataFiles = new ArrayList(collection);
            this.commitIncrement = commitIncrement;
        }

        public String toString() {
            return String.format("{%s, %d, %d, %d, %s, %s}", this.partition, Integer.valueOf(this.bucket), Long.valueOf(this.baseSnapshotId), Long.valueOf(this.lastModifiedCommitIdentifier), this.dataFiles, this.commitIncrement);
        }
    }

    @VisibleForTesting
    /* loaded from: input_file:org/apache/paimon/operation/AbstractFileStoreWrite$WriterContainer.class */
    public static class WriterContainer<T> {
        public final RecordWriter<T> writer;
        protected final long baseSnapshotId;
        protected long lastModifiedCommitIdentifier;

        protected WriterContainer(RecordWriter<T> recordWriter, Long l) {
            this.writer = recordWriter;
            this.baseSnapshotId = l == null ? 0L : l.longValue();
            this.lastModifiedCommitIdentifier = Long.MIN_VALUE;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractFileStoreWrite(String str, SnapshotManager snapshotManager, FileStoreScan fileStoreScan) {
        this.commitUser = str;
        this.snapshotManager = snapshotManager;
        this.scan = fileStoreScan;
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public FileStoreWrite<T> withIOManager(IOManager iOManager) {
        this.ioManager = iOManager;
        return this;
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public void withOverwrite(boolean z) {
        this.overwrite = z;
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public void write(BinaryRow binaryRow, int i, T t) throws Exception {
        getWriterWrapper(binaryRow, i).writer.write(t);
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public void compact(BinaryRow binaryRow, int i, boolean z) throws Exception {
        getWriterWrapper(binaryRow, i).writer.compact(z);
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public void notifyNewFiles(long j, BinaryRow binaryRow, int i, List<DataFileMeta> list) {
        WriterContainer<T> writerWrapper = getWriterWrapper(binaryRow, i);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Get extra compact files for partition {}, bucket {}. Extra snapshot {}, base snapshot {}.\nFiles: {}", new Object[]{binaryRow, Integer.valueOf(i), Long.valueOf(j), Long.valueOf(writerWrapper.baseSnapshotId), list});
        }
        if (j > writerWrapper.baseSnapshotId) {
            writerWrapper.writer.addNewFiles(list);
        }
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public List<CommitMessage> prepareCommit(boolean z, long j) throws Exception {
        long longValue = this.writers.values().stream().map((v0) -> {
            return v0.values();
        }).flatMap((v0) -> {
            return v0.stream();
        }).mapToLong(writerContainer -> {
            return writerContainer.lastModifiedCommitIdentifier;
        }).max().orElse(Long.MIN_VALUE) == Long.MIN_VALUE ? Long.MIN_VALUE : ((Long) this.snapshotManager.latestSnapshotOfUser(this.commitUser).map((v0) -> {
            return v0.commitIdentifier();
        }).orElse(Long.MIN_VALUE)).longValue();
        ArrayList arrayList = new ArrayList();
        Iterator<Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>>> it = this.writers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> next = it.next();
            BinaryRow key = next.getKey();
            Iterator<Map.Entry<Integer, WriterContainer<T>>> it2 = next.getValue().entrySet().iterator();
            while (it2.hasNext()) {
                Map.Entry<Integer, WriterContainer<T>> next2 = it2.next();
                int intValue = next2.getKey().intValue();
                WriterContainer<T> value = next2.getValue();
                CommitIncrement prepareCommit = value.writer.prepareCommit(z);
                CommitMessageImpl commitMessageImpl = new CommitMessageImpl(key, intValue, prepareCommit.newFilesIncrement(), prepareCommit.compactIncrement());
                arrayList.add(commitMessageImpl);
                if (!commitMessageImpl.isEmpty()) {
                    value.lastModifiedCommitIdentifier = j;
                } else if (value.lastModifiedCommitIdentifier <= longValue) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Closing writer for partition {}, bucket {}. Writer's last modified identifier is {}, while latest committed identifier is {}", new Object[]{key, Integer.valueOf(intValue), Long.valueOf(value.lastModifiedCommitIdentifier), Long.valueOf(longValue)});
                    }
                    value.writer.close();
                    it2.remove();
                }
            }
            if (next.getValue().isEmpty()) {
                it.remove();
            }
        }
        return arrayList;
    }

    @Override // org.apache.paimon.operation.FileStoreWrite
    public void close() throws Exception {
        Iterator<Map<Integer, WriterContainer<T>>> it = this.writers.values().iterator();
        while (it.hasNext()) {
            Iterator<WriterContainer<T>> it2 = it.next().values().iterator();
            while (it2.hasNext()) {
                it2.next().writer.close();
            }
        }
        this.writers.clear();
        if (this.lazyCompactExecutor != null) {
            this.lazyCompactExecutor.shutdownNow();
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.paimon.utils.Restorable
    public List<State> checkpoint() {
        ArrayList arrayList = new ArrayList();
        for (Map.Entry<BinaryRow, Map<Integer, WriterContainer<T>>> entry : this.writers.entrySet()) {
            BinaryRow key = entry.getKey();
            for (Map.Entry<Integer, WriterContainer<T>> entry2 : entry.getValue().entrySet()) {
                int intValue = entry2.getKey().intValue();
                WriterContainer<T> value = entry2.getValue();
                try {
                    arrayList.add(new State(key, intValue, value.baseSnapshotId, value.lastModifiedCommitIdentifier, value.writer.dataFiles(), value.writer.prepareCommit(false)));
                } catch (Exception e) {
                    throw new RuntimeException("Failed to extract state from writer of partition " + key + " bucket " + intValue, e);
                }
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Extracted state " + arrayList.toString());
        }
        return arrayList;
    }

    @Override // org.apache.paimon.utils.Restorable
    public void restore(List<State> list) {
        for (State state : list) {
            RecordWriter<T> createWriter = createWriter(state.partition, state.bucket, state.dataFiles, state.commitIncrement, compactExecutor());
            notifyNewWriter(createWriter);
            WriterContainer<T> writerContainer = new WriterContainer<>(createWriter, Long.valueOf(state.baseSnapshotId));
            writerContainer.lastModifiedCommitIdentifier = state.lastModifiedCommitIdentifier;
            this.writers.computeIfAbsent(state.partition, binaryRow -> {
                return new HashMap();
            }).put(Integer.valueOf(state.bucket), writerContainer);
        }
    }

    private WriterContainer<T> getWriterWrapper(BinaryRow binaryRow, int i) {
        Map<Integer, WriterContainer<T>> map = this.writers.get(binaryRow);
        if (map == null) {
            map = new HashMap();
            this.writers.put(binaryRow.copy(), map);
        }
        return map.computeIfAbsent(Integer.valueOf(i), num -> {
            return createWriterContainer(binaryRow.copy(), i, this.overwrite);
        });
    }

    @VisibleForTesting
    public WriterContainer<T> createWriterContainer(BinaryRow binaryRow, int i, boolean z) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Creating writer for partition {}, bucket {}", binaryRow, Integer.valueOf(i));
        }
        Long latestSnapshotId = this.snapshotManager.latestSnapshotId();
        RecordWriter<T> createWriter = z ? createWriter(binaryRow.copy(), i, Collections.emptyList(), null, compactExecutor()) : createWriter(binaryRow.copy(), i, scanExistingFileMetas(latestSnapshotId, binaryRow, i), null, compactExecutor());
        notifyNewWriter(createWriter);
        return new WriterContainer<>(createWriter, latestSnapshotId);
    }

    private List<DataFileMeta> scanExistingFileMetas(Long l, BinaryRow binaryRow, int i) {
        ArrayList arrayList = new ArrayList();
        if (l != null) {
            Stream<R> map = this.scan.withSnapshot(l.longValue()).withPartitionBucket(binaryRow, i).plan().files().stream().map((v0) -> {
                return v0.file();
            });
            arrayList.getClass();
            map.forEach((v1) -> {
                r1.add(v1);
            });
        }
        return arrayList;
    }

    private ExecutorService compactExecutor() {
        if (this.lazyCompactExecutor == null) {
            this.lazyCompactExecutor = Executors.newSingleThreadScheduledExecutor(new ExecutorThreadFactory(Thread.currentThread().getName() + "-compaction"));
        }
        return this.lazyCompactExecutor;
    }

    protected void notifyNewWriter(RecordWriter<T> recordWriter) {
    }

    protected abstract RecordWriter<T> createWriter(BinaryRow binaryRow, int i, List<DataFileMeta> list, @Nullable CommitIncrement commitIncrement, ExecutorService executorService);
}
