package org.apache.flink.statefun.flink.core.feedback;

import java.io.OutputStream;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import java.util.function.Supplier;
import org.apache.flink.statefun.flink.core.logger.FeedbackLogger;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/Checkpoints.class */
final class Checkpoints<T> implements AutoCloseable {
    private final Supplier<? extends FeedbackLogger<T>> feedbackLoggerFactory;
    private final TreeMap<Long, FeedbackLogger<T>> uncompletedCheckpoints = new TreeMap<>();

    /* JADX INFO: Access modifiers changed from: package-private */
    public Checkpoints(Supplier<? extends FeedbackLogger<T>> supplier) {
        this.feedbackLoggerFactory = (Supplier) Objects.requireNonNull(supplier);
    }

    public void startLogging(long j, OutputStream outputStream) {
        FeedbackLogger<T> feedbackLogger = this.feedbackLoggerFactory.get();
        feedbackLogger.startLogging(outputStream);
        this.uncompletedCheckpoints.put(Long.valueOf(j), feedbackLogger);
    }

    public void append(T t) {
        Iterator<FeedbackLogger<T>> it2 = this.uncompletedCheckpoints.values().iterator();
        while (it2.hasNext()) {
            it2.next().append(t);
        }
    }

    public void commitCheckpointsUntil(long j) {
        NavigableMap<Long, FeedbackLogger<T>> headMap = this.uncompletedCheckpoints.headMap(Long.valueOf(j), true);
        headMap.values().forEach((v0) -> {
            v0.commit();
        });
        headMap.clear();
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        IOUtils.closeAllQuietly(this.uncompletedCheckpoints.values());
        this.uncompletedCheckpoints.clear();
    }
}
