package dev.responsive.kafka.internal.db;

import dev.responsive.kafka.internal.db.partitioning.Segmenter;
import dev.responsive.kafka.internal.stores.RemoteWriteResult;
import dev.responsive.kafka.internal.utils.PendingFlushSegmentMetadata;
import dev.responsive.kafka.internal.utils.WindowedKey;
import java.util.Iterator;

/* loaded from: input_file:dev/responsive/kafka/internal/db/WindowFlushManager.class */
public abstract class WindowFlushManager implements FlushManager<WindowedKey, Segmenter.SegmentPartition> {
    private final int kafkaPartition;
    private final Segmenter segmenter;
    private final PendingFlushSegmentMetadata pendingFlushSegmentMetadata;

    public WindowFlushManager(String str, int i, Segmenter segmenter, long j) {
        this.kafkaPartition = i;
        this.segmenter = segmenter;
        this.pendingFlushSegmentMetadata = new PendingFlushSegmentMetadata(str, i, j);
    }

    public long streamTime() {
        return this.pendingFlushSegmentMetadata.batchStreamTime();
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public void writeAdded(WindowedKey windowedKey) {
        this.pendingFlushSegmentMetadata.updateStreamTime(windowedKey.windowStartMs);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> preFlush() {
        Iterator<Long> it = this.pendingFlushSegmentMetadata.prepareRoll(this.segmenter).segmentsToCreate().iterator();
        while (it.hasNext()) {
            RemoteWriteResult<Segmenter.SegmentPartition> createSegment = createSegment(new Segmenter.SegmentPartition(this.kafkaPartition, it.next().longValue()));
            if (!createSegment.wasApplied()) {
                return createSegment;
            }
        }
        return RemoteWriteResult.success(null);
    }

    @Override // dev.responsive.kafka.internal.db.FlushManager
    public RemoteWriteResult<Segmenter.SegmentPartition> postFlush(long j) {
        RemoteWriteResult<Segmenter.SegmentPartition> updateOffsetAndStreamTime = updateOffsetAndStreamTime(j, this.pendingFlushSegmentMetadata.batchStreamTime());
        if (!updateOffsetAndStreamTime.wasApplied()) {
            return updateOffsetAndStreamTime;
        }
        Iterator<Long> it = this.pendingFlushSegmentMetadata.segmentRoll().segmentsToExpire().iterator();
        while (it.hasNext()) {
            RemoteWriteResult<Segmenter.SegmentPartition> deleteSegment = deleteSegment(new Segmenter.SegmentPartition(this.kafkaPartition, it.next().longValue()));
            if (!deleteSegment.wasApplied()) {
                return deleteSegment;
            }
        }
        this.pendingFlushSegmentMetadata.finalizeRoll();
        return RemoteWriteResult.success(null);
    }

    protected abstract RemoteWriteResult<Segmenter.SegmentPartition> updateOffsetAndStreamTime(long j, long j2);

    protected abstract RemoteWriteResult<Segmenter.SegmentPartition> createSegment(Segmenter.SegmentPartition segmentPartition);

    protected abstract RemoteWriteResult<Segmenter.SegmentPartition> deleteSegment(Segmenter.SegmentPartition segmentPartition);
}
