package org.apache.flink.runtime.concurrent.akka;

import akka.actor.ActorSystem;
import akka.actor.Cancellable;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableScheduledFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nonnull;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.ScheduledExecutor;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter.class */
public final class ActorSystemScheduledExecutorAdapter implements ScheduledExecutor {
    private final ActorSystem actorSystem;
    private final ClassLoader flinkClassLoader;

    /* loaded from: input_file:org/apache/flink/runtime/concurrent/akka/ActorSystemScheduledExecutorAdapter$ScheduledFutureTask.class */
    private final class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
        private long time;
        private final long period;
        private volatile Cancellable cancellable;

        ScheduledFutureTask(Callable<V> callable, long j, long j2) {
            super(callable);
            this.time = j;
            this.period = j2;
        }

        ScheduledFutureTask(Runnable runnable, long j, long j2) {
            super(runnable, null);
            this.time = j;
            this.period = j2;
        }

        public void setCancellable(Cancellable cancellable) {
            this.cancellable = cancellable;
        }

        @Override // java.util.concurrent.FutureTask, java.util.concurrent.RunnableFuture, java.lang.Runnable
        public void run() {
            if (!isPeriodic()) {
                super.run();
                return;
            }
            if (runAndReset()) {
                if (this.period > 0) {
                    this.time += this.period;
                    return;
                }
                this.cancellable = ActorSystemScheduledExecutorAdapter.this.internalSchedule(this, -this.period, TimeUnit.NANOSECONDS);
                if (isCancelled()) {
                    this.cancellable.cancel();
                } else {
                    this.time = ActorSystemScheduledExecutorAdapter.this.triggerTime(-this.period);
                }
            }
        }

        @Override // java.util.concurrent.FutureTask, java.util.concurrent.Future
        public boolean cancel(boolean z) {
            return super.cancel(z) && this.cancellable.cancel();
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(@Nonnull TimeUnit timeUnit) {
            return timeUnit.convert(this.time - ActorSystemScheduledExecutorAdapter.this.now(), TimeUnit.NANOSECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(@Nonnull Delayed delayed) {
            if (delayed == this) {
                return 0;
            }
            long delay = getDelay(TimeUnit.NANOSECONDS) - delayed.getDelay(TimeUnit.NANOSECONDS);
            if (delay < 0) {
                return -1;
            }
            return delay > 0 ? 1 : 0;
        }

        @Override // java.util.concurrent.RunnableScheduledFuture
        public boolean isPeriodic() {
            return this.period != 0;
        }
    }

    public ActorSystemScheduledExecutorAdapter(ActorSystem actorSystem, ClassLoader classLoader) {
        this.actorSystem = (ActorSystem) Preconditions.checkNotNull(actorSystem, "rpcService");
        this.flinkClassLoader = (ClassLoader) Preconditions.checkNotNull(classLoader, "flinkClassLoader");
    }

    @Nonnull
    public ScheduledFuture<?> schedule(@Nonnull Runnable runnable, long j, @Nonnull TimeUnit timeUnit) {
        ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(runnable, timeUnit.toNanos(j), 0L);
        scheduledFutureTask.setCancellable(internalSchedule(scheduledFutureTask, j, timeUnit));
        return scheduledFutureTask;
    }

    @Nonnull
    public <V> ScheduledFuture<V> schedule(@Nonnull Callable<V> callable, long j, @Nonnull TimeUnit timeUnit) {
        ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(callable, timeUnit.toNanos(j), 0L);
        scheduledFutureTask.setCancellable(internalSchedule(scheduledFutureTask, j, timeUnit));
        return scheduledFutureTask;
    }

    @Nonnull
    public ScheduledFuture<?> scheduleAtFixedRate(@Nonnull Runnable runnable, long j, long j2, @Nonnull TimeUnit timeUnit) {
        ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(runnable, triggerTime(timeUnit.toNanos(j)), timeUnit.toNanos(j2));
        scheduledFutureTask.setCancellable(this.actorSystem.scheduler().schedule(new FiniteDuration(j, timeUnit), new FiniteDuration(j2, timeUnit), ClassLoadingUtils.withContextClassLoader(scheduledFutureTask, this.flinkClassLoader), this.actorSystem.dispatcher()));
        return scheduledFutureTask;
    }

    @Nonnull
    public ScheduledFuture<?> scheduleWithFixedDelay(@Nonnull Runnable runnable, long j, long j2, @Nonnull TimeUnit timeUnit) {
        ScheduledFutureTask scheduledFutureTask = new ScheduledFutureTask(runnable, triggerTime(timeUnit.toNanos(j)), timeUnit.toNanos(-j2));
        scheduledFutureTask.setCancellable(internalSchedule(scheduledFutureTask, j, timeUnit));
        return scheduledFutureTask;
    }

    public void execute(@Nonnull Runnable runnable) {
        this.actorSystem.dispatcher().execute(ClassLoadingUtils.withContextClassLoader(runnable, this.flinkClassLoader));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Cancellable internalSchedule(Runnable runnable, long j, TimeUnit timeUnit) {
        return this.actorSystem.scheduler().scheduleOnce(new FiniteDuration(j, timeUnit), ClassLoadingUtils.withContextClassLoader(runnable, this.flinkClassLoader), this.actorSystem.dispatcher());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long now() {
        return System.nanoTime();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public long triggerTime(long j) {
        return now() + j;
    }
}
