/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.LongAdder;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.flink.changelog.fs.RetryPolicy;
import org.apache.flink.changelog.fs.RetryingExecutor;
import org.apache.flink.changelog.fs.SchedulerFactory;
import org.apache.flink.changelog.fs.StateChangeUploader;
import org.apache.flink.changelog.fs.UploadResult;
import org.apache.flink.runtime.state.changelog.SequenceNumber;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ThreadSafe
class BatchingStateChangeUploader
implements StateChangeUploader {
    private static final Logger LOG = LoggerFactory.getLogger(BatchingStateChangeUploader.class);
    private final RetryingExecutor retryingExecutor;
    private final RetryPolicy retryPolicy;
    private final StateChangeUploader delegate;
    private final ScheduledExecutorService scheduler;
    private final long scheduleDelayMs;
    private final long sizeThresholdBytes;
    @GuardedBy(value="scheduled")
    private final Queue<StateChangeUploader.UploadTask> scheduled;
    @GuardedBy(value="scheduled")
    private long scheduledBytesCounter;
    @Nullable
    @GuardedBy(value="scheduled")
    private ScheduledFuture<?> scheduledFuture;
    @Nullable
    @GuardedBy(value="this")
    private Throwable errorUnsafe;
    private final long maxBytesInFlight;
    private final LongAdder inFlightBytesCounter = new LongAdder();

    BatchingStateChangeUploader(long persistDelayMs, long sizeThresholdBytes, RetryPolicy retryPolicy, StateChangeUploader delegate, int numUploadThreads, long maxBytesInFlight) {
        this(persistDelayMs, sizeThresholdBytes, retryPolicy, delegate, SchedulerFactory.create(1, "ChangelogUploadScheduler", LOG), new RetryingExecutor(numUploadThreads), maxBytesInFlight);
    }

    BatchingStateChangeUploader(long persistDelayMs, long sizeThresholdBytes, RetryPolicy retryPolicy, StateChangeUploader delegate, ScheduledExecutorService scheduler, RetryingExecutor retryingExecutor, long maxBytesInFlight) {
        this.scheduleDelayMs = persistDelayMs;
        this.scheduled = new LinkedList<StateChangeUploader.UploadTask>();
        this.scheduler = scheduler;
        this.retryPolicy = retryPolicy;
        this.retryingExecutor = retryingExecutor;
        this.sizeThresholdBytes = sizeThresholdBytes;
        this.maxBytesInFlight = maxBytesInFlight;
        this.delegate = delegate;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void upload(StateChangeUploader.UploadTask uploadTask) {
        Throwable error = this.getErrorSafe();
        if (error != null) {
            LOG.debug("don't persist {} changesets, already failed", (Object)uploadTask.changeSets.size());
            uploadTask.fail(error);
            return;
        }
        LOG.debug("persist {} changeSets", (Object)uploadTask.changeSets.size());
        try {
            Preconditions.checkState((this.inFlightBytesCounter.sum() <= this.maxBytesInFlight ? 1 : 0) != 0, (String)"In flight data size threshold exceeded %s > %s", (Object[])new Object[]{this.inFlightBytesCounter.sum(), this.maxBytesInFlight});
            Queue<StateChangeUploader.UploadTask> queue = this.scheduled;
            synchronized (queue) {
                long size = uploadTask.getSize();
                this.inFlightBytesCounter.add(size);
                this.scheduledBytesCounter += size;
                this.scheduled.add(BatchingStateChangeUploader.wrapWithSizeUpdate(uploadTask, size, this.inFlightBytesCounter));
                this.scheduleUploadIfNeeded();
            }
        }
        catch (Exception e) {
            uploadTask.fail(e);
            throw e;
        }
    }

    private void scheduleUploadIfNeeded() {
        Preconditions.checkState((boolean)Thread.holdsLock(this.scheduled));
        if (this.scheduleDelayMs == 0L || this.scheduledBytesCounter >= this.sizeThresholdBytes) {
            if (this.scheduledFuture != null) {
                this.scheduledFuture.cancel(false);
                this.scheduledFuture = null;
            }
            this.drainAndSave();
        } else if (this.scheduledFuture == null) {
            this.scheduledFuture = this.scheduler.schedule(this::drainAndSave, this.scheduleDelayMs, TimeUnit.MILLISECONDS);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void drainAndSave() {
        ArrayList<StateChangeUploader.UploadTask> tasks;
        Queue<StateChangeUploader.UploadTask> queue = this.scheduled;
        synchronized (queue) {
            tasks = new ArrayList<StateChangeUploader.UploadTask>(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
            this.scheduledFuture = null;
        }
        try {
            Throwable error = this.getErrorSafe();
            if (error != null) {
                tasks.forEach(task -> task.fail(error));
                return;
            }
            this.retryingExecutor.execute(this.retryPolicy, () -> this.delegate.upload(tasks));
        }
        catch (Throwable t) {
            tasks.forEach(task -> task.fail(t));
            if (ExceptionUtils.findThrowable((Throwable)t, IOException.class).isPresent()) {
                LOG.warn("Caught IO exception while uploading", t);
            }
            this.setErrorSafe(t);
            throw t;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() throws Exception {
        ArrayList<StateChangeUploader.UploadTask> drained;
        LOG.debug("close");
        this.scheduler.shutdownNow();
        if (!this.scheduler.awaitTermination(5L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown scheduler in 5s");
        }
        Queue<StateChangeUploader.UploadTask> queue = this.scheduled;
        synchronized (queue) {
            drained = new ArrayList<StateChangeUploader.UploadTask>(this.scheduled);
            this.scheduled.clear();
            this.scheduledBytesCounter = 0L;
        }
        CancellationException ce = new CancellationException();
        drained.forEach(task -> task.fail(ce));
        this.retryingExecutor.close();
        this.delegate.close();
    }

    private synchronized Throwable getErrorSafe() {
        return this.errorUnsafe;
    }

    private synchronized void setErrorSafe(Throwable t) {
        this.errorUnsafe = t;
    }

    private static StateChangeUploader.UploadTask wrapWithSizeUpdate(StateChangeUploader.UploadTask uploadTask, long preComputedTaskSize, LongAdder inflightSize) {
        return new StateChangeUploader.UploadTask(uploadTask.changeSets, result -> {
            inflightSize.add(-preComputedTaskSize);
            uploadTask.successCallback.accept((List<UploadResult>)result);
        }, (result, error) -> {
            inflightSize.add(-preComputedTaskSize);
            uploadTask.failureCallback.accept((List<SequenceNumber>)result, (Throwable)error);
        });
    }
}

