package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
/* loaded from: input_file:org/apache/flink/changelog/fs/BatchingStateChangeUploader.class */
class BatchingStateChangeUploader implements StateChangeUploader {
    private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeUploader.class);
    private final RetryingExecutor retryingExecutor;
    private final RetryPolicy retryPolicy;
    private final StateChangeUploader delegate;
    private final ScheduledExecutorService scheduler;
    private final long scheduleDelayMs;
    private final long sizeThresholdBytes;

    @GuardedBy("scheduled")
    private final Queue<StateChangeUploader.UploadTask> scheduled;

    @GuardedBy("scheduled")
    private long scheduledBytesCounter;

    @GuardedBy("scheduled")
    @Nullable
    private ScheduledFuture<?> scheduledFuture;

    @GuardedBy("this")
    @Nullable
    private Throwable errorUnsafe;
    private final long maxBytesInFlight;
    private final LongAdder inFlightBytesCounter;

    /* JADX INFO: Access modifiers changed from: package-private */
    public BatchingStateChangeUploader(long j, long j2, RetryPolicy retryPolicy, StateChangeUploader stateChangeUploader, int i, long j3) {
        this(j, j2, retryPolicy, stateChangeUploader, SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG), new RetryingExecutor(i), j3);
    }

    BatchingStateChangeUploader(long j, long j2, RetryPolicy retryPolicy, StateChangeUploader stateChangeUploader, ScheduledExecutorService scheduledExecutorService, RetryingExecutor retryingExecutor, long j3) {
        this.inFlightBytesCounter = new LongAdder();
        this.scheduleDelayMs = j;
        this.scheduled = new LinkedList();
        this.scheduler = scheduledExecutorService;
        this.retryPolicy = retryPolicy;
        this.retryingExecutor = retryingExecutor;
        this.sizeThresholdBytes = j2;
        this.maxBytesInFlight = j3;
        this.delegate = stateChangeUploader;
    }

    @Override // org.apache.flink.changelog.fs.StateChangeUploader
    public void upload(StateChangeUploader.UploadTask uploadTask) {
        Throwable errorSafe = getErrorSafe();
        if (errorSafe != null) {
            LOG.debug("don't persist {} changesets, already failed", Integer.valueOf(uploadTask.changeSets.size()));
            uploadTask.fail(errorSafe);
            return;
        }
        LOG.debug("persist {} changeSets", Integer.valueOf(uploadTask.changeSets.size()));
        try {
            Preconditions.checkState(this.inFlightBytesCounter.sum() <= this.maxBytesInFlight, "In flight data size threshold exceeded %s > %s", new Object[]{Long.valueOf(this.inFlightBytesCounter.sum()), Long.valueOf(this.maxBytesInFlight)});
            synchronized (this.scheduled) {
                long size = uploadTask.getSize();
                this.inFlightBytesCounter.add(size);
                this.scheduledBytesCounter += size;
                this.scheduled.add(wrapWithSizeUpdate(uploadTask, size, this.inFlightBytesCounter));
                scheduleUploadIfNeeded();
            }
        } catch (Exception e) {
            uploadTask.fail(e);
            throw e;
        }
    }

    private void scheduleUploadIfNeeded() {
        Preconditions.checkState(Thread.holdsLock(this.scheduled));
        if (this.scheduleDelayMs != 0 && this.scheduledBytesCounter < this.sizeThresholdBytes) {
            if (this.scheduledFuture == null) {
                this.scheduledFuture = this.scheduler.schedule(this::drainAndSave, this.scheduleDelayMs, TimeUnit.MILLISECONDS);
            }
        } else {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduledFuture = null;
            }
            drainAndSave();
        }
    }

    private void drainAndSave() {
        ArrayList arrayList;
        synchronized (this.scheduled) {
            arrayList = new ArrayList(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
            this.scheduledFuture = null;
        }
        try {
            Throwable errorSafe = getErrorSafe();
            if (errorSafe != null) {
                arrayList.forEach(uploadTask -> {
                    uploadTask.fail(errorSafe);
                });
            } else {
                this.retryingExecutor.execute(this.retryPolicy, () -> {
                    this.delegate.upload((Collection<StateChangeUploader.UploadTask>) arrayList);
                });
            }
        } catch (Throwable th) {
            arrayList.forEach(uploadTask2 -> {
                uploadTask2.fail(th);
            });
            if (ExceptionUtils.findThrowable(th, IOException.class).isPresent()) {
                LOG.warn("Caught IO exception while uploading", th);
            } else {
                setErrorSafe(th);
                throw th;
            }
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        ArrayList arrayList;
        LOG.debug("close");
        this.scheduler.shutdownNow();
        if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown scheduler in 5s");
        }
        synchronized (this.scheduled) {
            arrayList = new ArrayList(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
        }
        CancellationException cancellationException = new CancellationException();
        arrayList.forEach(uploadTask -> {
            uploadTask.fail(cancellationException);
        });
        this.retryingExecutor.close();
        this.delegate.close();
    }

    private synchronized Throwable getErrorSafe() {
        return this.errorUnsafe;
    }

    private synchronized void setErrorSafe(Throwable th) {
        this.errorUnsafe = th;
    }

    private static StateChangeUploader.UploadTask wrapWithSizeUpdate(StateChangeUploader.UploadTask uploadTask, long j, LongAdder longAdder) {
        return new StateChangeUploader.UploadTask(uploadTask.changeSets, list -> {
            longAdder.add(-j);
            uploadTask.successCallback.accept(list);
        }, (list2, th) -> {
            longAdder.add(-j);
            uploadTask.failureCallback.accept(list2, th);
        });
    }
}
