package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService.class */
public class SystemProcessingTimeService extends ProcessingTimeService {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SystemProcessingTimeService.class);
    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;
    private final AsyncExceptionHandler task;
    private final Object checkpointLock;
    private final ScheduledThreadPoolExecutor timerService;
    private final AtomicInteger status;

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService$NeverCompleteFuture.class */
    private static final class NeverCompleteFuture implements ScheduledFuture<Object> {
        private final Object lock;
        private final long delayMillis;
        private volatile boolean canceled;

        private NeverCompleteFuture(long j) {
            this.lock = new Object();
            this.delayMillis = j;
        }

        @Override // java.util.concurrent.Delayed
        public long getDelay(@Nonnull TimeUnit timeUnit) {
            return timeUnit.convert(this.delayMillis, TimeUnit.MILLISECONDS);
        }

        @Override // java.lang.Comparable
        public int compareTo(@Nonnull Delayed delayed) {
            return Long.compare(this.delayMillis, delayed.getDelay(TimeUnit.MILLISECONDS));
        }

        @Override // java.util.concurrent.Future
        public boolean cancel(boolean z) {
            synchronized (this.lock) {
                this.canceled = true;
                this.lock.notifyAll();
            }
            return true;
        }

        @Override // java.util.concurrent.Future
        public boolean isCancelled() {
            return this.canceled;
        }

        @Override // java.util.concurrent.Future
        public boolean isDone() {
            return false;
        }

        @Override // java.util.concurrent.Future
        public Object get() throws InterruptedException {
            synchronized (this.lock) {
                while (!this.canceled) {
                    this.lock.wait();
                }
            }
            throw new CancellationException();
        }

        @Override // java.util.concurrent.Future
        public Object get(long j, @Nonnull TimeUnit timeUnit) throws InterruptedException, TimeoutException {
            synchronized (this.lock) {
                while (!this.canceled) {
                    timeUnit.timedWait(this.lock, j);
                }
                if (this.canceled) {
                    throw new CancellationException();
                }
                throw new TimeoutException();
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService$RepeatedTriggerTask.class */
    private static final class RepeatedTriggerTask implements Runnable {
        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long period;
        private final AsyncExceptionHandler exceptionHandler;
        private long nextTimestamp;

        private RepeatedTriggerTask(AtomicInteger atomicInteger, AsyncExceptionHandler asyncExceptionHandler, Object obj, ProcessingTimeCallback processingTimeCallback, long j, long j2) {
            this.serviceStatus = (AtomicInteger) Preconditions.checkNotNull(atomicInteger);
            this.lock = Preconditions.checkNotNull(obj);
            this.target = (ProcessingTimeCallback) Preconditions.checkNotNull(processingTimeCallback);
            this.period = j2;
            this.exceptionHandler = (AsyncExceptionHandler) Preconditions.checkNotNull(asyncExceptionHandler);
            this.nextTimestamp = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this.lock) {
                try {
                    if (this.serviceStatus.get() == 0) {
                        this.target.onProcessingTime(this.nextTimestamp);
                    }
                    this.nextTimestamp += this.period;
                } catch (Throwable th) {
                    this.exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", new TimerException(th));
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeService$TriggerTask.class */
    private static final class TriggerTask implements Runnable {
        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long timestamp;
        private final AsyncExceptionHandler exceptionHandler;

        private TriggerTask(AtomicInteger atomicInteger, AsyncExceptionHandler asyncExceptionHandler, Object obj, ProcessingTimeCallback processingTimeCallback, long j) {
            this.serviceStatus = (AtomicInteger) Preconditions.checkNotNull(atomicInteger);
            this.exceptionHandler = (AsyncExceptionHandler) Preconditions.checkNotNull(asyncExceptionHandler);
            this.lock = Preconditions.checkNotNull(obj);
            this.target = (ProcessingTimeCallback) Preconditions.checkNotNull(processingTimeCallback);
            this.timestamp = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (this.lock) {
                try {
                    if (this.serviceStatus.get() == 0) {
                        this.target.onProcessingTime(this.timestamp);
                    }
                } catch (Throwable th) {
                    this.exceptionHandler.handleAsyncException("Caught exception while processing timer.", new TimerException(th));
                }
            }
        }
    }

    public SystemProcessingTimeService(AsyncExceptionHandler asyncExceptionHandler, Object obj) {
        this(asyncExceptionHandler, obj, null);
    }

    public SystemProcessingTimeService(AsyncExceptionHandler asyncExceptionHandler, Object obj, ThreadFactory threadFactory) {
        this.task = (AsyncExceptionHandler) Preconditions.checkNotNull(asyncExceptionHandler);
        this.checkpointLock = Preconditions.checkNotNull(obj);
        this.status = new AtomicInteger(0);
        if (threadFactory == null) {
            this.timerService = new ScheduledThreadPoolExecutor(1);
        } else {
            this.timerService = new ScheduledThreadPoolExecutor(1, threadFactory);
        }
        this.timerService.setRemoveOnCancelPolicy(true);
        this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public ScheduledFuture<?> registerTimer(long j, ProcessingTimeCallback processingTimeCallback) {
        long max = Math.max(j - getCurrentProcessingTime(), 0L) + 1;
        try {
            return this.timerService.schedule(new TriggerTask(this.status, this.task, this.checkpointLock, processingTimeCallback, j), max, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            int i = this.status.get();
            if (i == 1) {
                return new NeverCompleteFuture(max);
            }
            if (i == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback processingTimeCallback, long j, long j2) {
        try {
            return this.timerService.scheduleAtFixedRate(new RepeatedTriggerTask(this.status, this.task, this.checkpointLock, processingTimeCallback, getCurrentProcessingTime() + j, j2), j, j2, TimeUnit.MILLISECONDS);
        } catch (RejectedExecutionException e) {
            int i = this.status.get();
            if (i == 1) {
                return new NeverCompleteFuture(j);
            }
            if (i == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @VisibleForTesting
    boolean isAlive() {
        return this.status.get() == 0;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public boolean isTerminated() {
        return this.status.get() == 2;
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public void quiesce() throws InterruptedException {
        if (this.status.compareAndSet(0, 1)) {
            this.timerService.shutdown();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public void awaitPendingAfterQuiesce() throws InterruptedException {
        if (this.timerService.isTerminated()) {
            return;
        }
        Preconditions.checkState(this.timerService.isTerminating() || this.timerService.isShutdown());
        this.timerService.awaitTermination(365L, TimeUnit.DAYS);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public void shutdownService() {
        if (this.status.compareAndSet(0, 2) || this.status.compareAndSet(1, 2)) {
            this.timerService.shutdownNow();
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public boolean shutdownAndAwaitPending(long j, TimeUnit timeUnit) throws InterruptedException {
        shutdownService();
        return this.timerService.awaitTermination(j, timeUnit);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.ProcessingTimeService
    public boolean shutdownServiceUninterruptible(long j) {
        Deadline fromNow = Deadline.fromNow(Duration.ofMillis(j));
        boolean z = false;
        boolean z2 = false;
        do {
            try {
                z = shutdownAndAwaitPending(fromNow.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            } catch (InterruptedException e) {
                z2 = true;
                LOG.trace("Intercepted attempt to interrupt timer service shutdown.", (Throwable) e);
            }
            if (!fromNow.hasTimeLeft()) {
                break;
            }
        } while (!z);
        if (z2) {
            Thread.currentThread().interrupt();
        }
        return z;
    }

    protected void finalize() throws Throwable {
        super.finalize();
        this.timerService.shutdownNow();
    }

    @VisibleForTesting
    int getNumTasksScheduled() {
        BlockingQueue<Runnable> queue = this.timerService.getQueue();
        if (queue == null) {
            return 0;
        }
        return queue.size();
    }
}
