/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.changelog.fs;

import java.util.Optional;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.changelog.fs.RetryPolicy;
import org.apache.flink.changelog.fs.SchedulerFactory;
import org.apache.flink.util.function.RunnableWithException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class RetryingExecutor
implements AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(RetryingExecutor.class);
    private final ScheduledExecutorService scheduler;

    RetryingExecutor(int nThreads) {
        this(SchedulerFactory.create(nThreads, "ChangelogRetryScheduler", LOG));
    }

    RetryingExecutor(ScheduledExecutorService scheduler) {
        this.scheduler = scheduler;
    }

    void execute(RetryPolicy retryPolicy, RetriableAction action) {
        LOG.debug("execute with retryPolicy: {}", (Object)retryPolicy);
        RetriableTask task = new RetriableTask(action, retryPolicy, this.scheduler);
        this.scheduler.submit(task);
    }

    @Override
    public void close() throws Exception {
        LOG.debug("close");
        this.scheduler.shutdownNow();
        if (!this.scheduler.awaitTermination(1L, TimeUnit.SECONDS)) {
            LOG.warn("Unable to cleanly shutdown executorService in 1s");
        }
    }

    private static final class RetriableTask
    implements Runnable {
        private final RetriableAction runnable;
        private final ScheduledExecutorService executorService;
        private final int current;
        private final RetryPolicy retryPolicy;
        private final AtomicBoolean actionCompleted;
        private final AtomicBoolean attemptCompleted = new AtomicBoolean(false);

        RetriableTask(RetriableAction runnable, RetryPolicy retryPolicy, ScheduledExecutorService executorService) {
            this(1, new AtomicBoolean(false), runnable, retryPolicy, executorService);
        }

        private RetriableTask(int current, AtomicBoolean actionCompleted, RetriableAction runnable, RetryPolicy retryPolicy, ScheduledExecutorService executorService) {
            this.current = current;
            this.runnable = runnable;
            this.retryPolicy = retryPolicy;
            this.executorService = executorService;
            this.actionCompleted = actionCompleted;
        }

        @Override
        public void run() {
            if (!this.actionCompleted.get()) {
                Optional<ScheduledFuture<?>> timeoutFuture = this.scheduleTimeout();
                try {
                    this.runnable.run();
                    this.actionCompleted.set(true);
                    this.attemptCompleted.set(true);
                }
                catch (Exception e) {
                    this.handleError(e);
                }
                finally {
                    timeoutFuture.ifPresent(f -> f.cancel(true));
                }
            }
        }

        private void handleError(Exception e) {
            LOG.info("execution attempt {} failed: {}", (Object)this.current, (Object)e.getMessage());
            boolean attemptTransition = this.attemptCompleted.compareAndSet(false, true);
            if (attemptTransition && !this.actionCompleted.get()) {
                long nextAttemptDelay = this.retryPolicy.retryAfter(this.current, e);
                if (nextAttemptDelay == 0L) {
                    this.executorService.submit(this.next());
                } else if (nextAttemptDelay > 0L) {
                    this.executorService.schedule(this.next(), nextAttemptDelay, TimeUnit.MILLISECONDS);
                } else {
                    this.actionCompleted.set(true);
                }
            }
        }

        private RetriableTask next() {
            return new RetriableTask(this.current + 1, this.actionCompleted, this.runnable, this.retryPolicy, this.executorService);
        }

        private Optional<ScheduledFuture<?>> scheduleTimeout() {
            long timeout = this.retryPolicy.timeoutFor(this.current);
            return timeout <= 0L ? Optional.empty() : Optional.of(this.executorService.schedule(() -> this.handleError(this.fmtError(timeout)), timeout, TimeUnit.MILLISECONDS));
        }

        private TimeoutException fmtError(long timeout) {
            return new TimeoutException(String.format("Attempt %d timed out after %dms", this.current, timeout));
        }
    }

    static interface RetriableAction
    extends RunnableWithException {
    }
}

