package org.apache.flink.streaming.api.functions.sink.filesystem;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Objects;
import java.util.TreeMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileWriter;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.class */
public class Bucket<IN, BucketID> {
    private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
    private final BucketID bucketId;
    private final Path bucketPath;
    private final int subtaskIndex;
    private final PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory;
    private final RecoverableWriter fsWriter;
    private final RollingPolicy<IN, BucketID> rollingPolicy;
    private final NavigableMap<Long, RecoverableWriter.ResumeRecoverable> resumablesPerCheckpoint;
    private final NavigableMap<Long, List<RecoverableWriter.CommitRecoverable>> pendingPartsPerCheckpoint;
    private final OutputFileConfig outputFileConfig;
    private long partCounter;

    @Nullable
    private PartFileWriter<IN, BucketID> inProgressPart;
    private List<RecoverableWriter.CommitRecoverable> pendingPartsForCurrentCheckpoint;

    private Bucket(RecoverableWriter recoverableWriter, int i, BucketID bucketid, Path path, long j, PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory, RollingPolicy<IN, BucketID> rollingPolicy, OutputFileConfig outputFileConfig) {
        this.fsWriter = (RecoverableWriter) Preconditions.checkNotNull(recoverableWriter);
        this.subtaskIndex = i;
        this.bucketId = (BucketID) Preconditions.checkNotNull(bucketid);
        this.bucketPath = (Path) Preconditions.checkNotNull(path);
        this.partCounter = j;
        this.partFileFactory = (PartFileWriter.PartFileFactory) Preconditions.checkNotNull(partFileFactory);
        this.rollingPolicy = (RollingPolicy) Preconditions.checkNotNull(rollingPolicy);
        this.pendingPartsForCurrentCheckpoint = new ArrayList();
        this.pendingPartsPerCheckpoint = new TreeMap();
        this.resumablesPerCheckpoint = new TreeMap();
        this.outputFileConfig = (OutputFileConfig) Preconditions.checkNotNull(outputFileConfig);
    }

    private Bucket(RecoverableWriter recoverableWriter, int i, long j, PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory, RollingPolicy<IN, BucketID> rollingPolicy, BucketState<BucketID> bucketState, OutputFileConfig outputFileConfig) throws IOException {
        this(recoverableWriter, i, bucketState.getBucketId(), bucketState.getBucketPath(), j, partFileFactory, rollingPolicy, outputFileConfig);
        restoreInProgressFile(bucketState);
        commitRecoveredPendingFiles(bucketState);
    }

    private void restoreInProgressFile(BucketState<BucketID> bucketState) throws IOException {
        if (bucketState.hasInProgressResumableFile()) {
            RecoverableWriter.ResumeRecoverable inProgressResumableFile = bucketState.getInProgressResumableFile();
            if (!this.fsWriter.supportsResume()) {
                this.fsWriter.recoverForCommit(inProgressResumableFile).commitAfterRecovery();
            } else {
                this.inProgressPart = this.partFileFactory.resumeFrom(this.bucketId, this.fsWriter.recover(inProgressResumableFile), inProgressResumableFile, bucketState.getInProgressFileCreationTime());
            }
        }
    }

    private void commitRecoveredPendingFiles(BucketState<BucketID> bucketState) throws IOException {
        Iterator<List<RecoverableWriter.CommitRecoverable>> it = bucketState.getCommittableFilesPerCheckpoint().values().iterator();
        while (it.hasNext()) {
            Iterator<RecoverableWriter.CommitRecoverable> it2 = it.next().iterator();
            while (it2.hasNext()) {
                this.fsWriter.recoverForCommit(it2.next()).commitAfterRecovery();
            }
        }
    }

    BucketID getBucketId() {
        return this.bucketId;
    }

    Path getBucketPath() {
        return this.bucketPath;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public long getPartCounter() {
        return this.partCounter;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isActive() {
        return (this.inProgressPart == null && this.pendingPartsForCurrentCheckpoint.isEmpty() && this.pendingPartsPerCheckpoint.isEmpty()) ? false : true;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void merge(Bucket<IN, BucketID> bucket) throws IOException {
        Preconditions.checkNotNull(bucket);
        Preconditions.checkState(Objects.equals(bucket.bucketPath, this.bucketPath));
        Preconditions.checkState(bucket.pendingPartsForCurrentCheckpoint.isEmpty());
        Preconditions.checkState(bucket.pendingPartsPerCheckpoint.isEmpty());
        RecoverableWriter.CommitRecoverable closePartFile = bucket.closePartFile();
        if (closePartFile != null) {
            this.pendingPartsForCurrentCheckpoint.add(closePartFile);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} merging buckets for bucket id={}", Integer.valueOf(this.subtaskIndex), this.bucketId);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void write(IN in, long j) throws IOException {
        if (this.inProgressPart == null || this.rollingPolicy.shouldRollOnEvent(this.inProgressPart, in)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to element {}.", new Object[]{Integer.valueOf(this.subtaskIndex), this.bucketId, in});
            }
            rollPartFile(j);
        }
        this.inProgressPart.write(in, j);
    }

    private void rollPartFile(long j) throws IOException {
        closePartFile();
        Path assembleNewPartPath = assembleNewPartPath();
        this.inProgressPart = this.partFileFactory.openNew(this.bucketId, this.fsWriter.open(assembleNewPartPath), assembleNewPartPath, j);
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} opening new part file \"{}\" for bucket id={}.", new Object[]{Integer.valueOf(this.subtaskIndex), assembleNewPartPath.getName(), this.bucketId});
        }
        this.partCounter++;
    }

    private Path assembleNewPartPath() {
        return new Path(this.bucketPath, this.outputFileConfig.getPartPrefix() + '-' + this.subtaskIndex + '-' + this.partCounter + this.outputFileConfig.getPartSuffix());
    }

    private RecoverableWriter.CommitRecoverable closePartFile() throws IOException {
        RecoverableWriter.CommitRecoverable commitRecoverable = null;
        if (this.inProgressPart != null) {
            commitRecoverable = this.inProgressPart.closeForCommit();
            this.pendingPartsForCurrentCheckpoint.add(commitRecoverable);
            this.inProgressPart = null;
        }
        return commitRecoverable;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void disposePartFile() {
        if (this.inProgressPart != null) {
            this.inProgressPart.dispose();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public BucketState<BucketID> onReceptionOfCheckpoint(long j) throws IOException {
        prepareBucketForCheckpointing(j);
        RecoverableWriter.ResumeRecoverable resumeRecoverable = null;
        long j2 = Long.MAX_VALUE;
        if (this.inProgressPart != null) {
            resumeRecoverable = this.inProgressPart.persist();
            j2 = this.inProgressPart.getCreationTime();
            if (this.fsWriter.requiresCleanupOfRecoverableState()) {
                this.resumablesPerCheckpoint.put(Long.valueOf(j), resumeRecoverable);
            }
        }
        return new BucketState<>(this.bucketId, this.bucketPath, j2, resumeRecoverable, this.pendingPartsPerCheckpoint);
    }

    private void prepareBucketForCheckpointing(long j) throws IOException {
        if (this.inProgressPart != null && this.rollingPolicy.shouldRollOnCheckpoint(this.inProgressPart)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Subtask {} closing in-progress part file for bucket id={} on checkpoint.", Integer.valueOf(this.subtaskIndex), this.bucketId);
            }
            closePartFile();
        }
        if (this.pendingPartsForCurrentCheckpoint.isEmpty()) {
            return;
        }
        this.pendingPartsPerCheckpoint.put(Long.valueOf(j), this.pendingPartsForCurrentCheckpoint);
        this.pendingPartsForCurrentCheckpoint = new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onSuccessfulCompletionOfCheckpoint(long j) throws IOException {
        Preconditions.checkNotNull(this.fsWriter);
        Iterator<Map.Entry<Long, List<RecoverableWriter.CommitRecoverable>>> it = this.pendingPartsPerCheckpoint.headMap(Long.valueOf(j), true).entrySet().iterator();
        while (it.hasNext()) {
            Iterator<RecoverableWriter.CommitRecoverable> it2 = it.next().getValue().iterator();
            while (it2.hasNext()) {
                this.fsWriter.recoverForCommit(it2.next()).commit();
            }
            it.remove();
        }
        cleanupOutdatedResumables(j);
    }

    private void cleanupOutdatedResumables(long j) throws IOException {
        Iterator<Map.Entry<Long, RecoverableWriter.ResumeRecoverable>> it = this.resumablesPerCheckpoint.headMap(Long.valueOf(j), false).entrySet().iterator();
        while (it.hasNext()) {
            RecoverableWriter.ResumeRecoverable value = it.next().getValue();
            if (this.fsWriter.requiresCleanupOfRecoverableState()) {
                boolean cleanupRecoverableState = this.fsWriter.cleanupRecoverableState(value);
                if (LOG.isDebugEnabled() && cleanupRecoverableState) {
                    LOG.debug("Subtask {} successfully deleted incomplete part for bucket id={}.", Integer.valueOf(this.subtaskIndex), this.bucketId);
                }
            }
            it.remove();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onProcessingTime(long j) throws IOException {
        if (this.inProgressPart == null || !this.rollingPolicy.shouldRollOnProcessingTime(this.inProgressPart, j)) {
            return;
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Subtask {} closing in-progress part file for bucket id={} due to processing time rolling policy (in-progress file created @ {}, last updated @ {} and current time is {}).", new Object[]{Integer.valueOf(this.subtaskIndex), this.bucketId, Long.valueOf(this.inProgressPart.getCreationTime()), Long.valueOf(this.inProgressPart.getLastUpdateTime()), Long.valueOf(j)});
        }
        closePartFile();
    }

    @VisibleForTesting
    Map<Long, List<RecoverableWriter.CommitRecoverable>> getPendingPartsPerCheckpoint() {
        return this.pendingPartsPerCheckpoint;
    }

    @VisibleForTesting
    @Nullable
    PartFileWriter<IN, BucketID> getInProgressPart() {
        return this.inProgressPart;
    }

    @VisibleForTesting
    List<RecoverableWriter.CommitRecoverable> getPendingPartsForCurrentCheckpoint() {
        return this.pendingPartsForCurrentCheckpoint;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <IN, BucketID> Bucket<IN, BucketID> getNew(RecoverableWriter recoverableWriter, int i, BucketID bucketid, Path path, long j, PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory, RollingPolicy<IN, BucketID> rollingPolicy, OutputFileConfig outputFileConfig) {
        return new Bucket<>(recoverableWriter, i, bucketid, path, j, partFileFactory, rollingPolicy, outputFileConfig);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <IN, BucketID> Bucket<IN, BucketID> restore(RecoverableWriter recoverableWriter, int i, long j, PartFileWriter.PartFileFactory<IN, BucketID> partFileFactory, RollingPolicy<IN, BucketID> rollingPolicy, BucketState<BucketID> bucketState, OutputFileConfig outputFileConfig) throws IOException {
        return new Bucket<>(recoverableWriter, i, j, partFileFactory, rollingPolicy, bucketState, outputFileConfig);
    }
}
