/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.time.Duration;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CancellationException;
import java.util.concurrent.Delayed;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SystemProcessingTimeService
extends ProcessingTimeService {
    private static final Logger LOG = LoggerFactory.getLogger(SystemProcessingTimeService.class);
    private static final int STATUS_ALIVE = 0;
    private static final int STATUS_QUIESCED = 1;
    private static final int STATUS_SHUTDOWN = 2;
    private final AsyncExceptionHandler task;
    private final Object checkpointLock;
    private final ScheduledThreadPoolExecutor timerService;
    private final AtomicInteger status;

    public SystemProcessingTimeService(AsyncExceptionHandler failureHandler, Object checkpointLock) {
        this(failureHandler, checkpointLock, null);
    }

    public SystemProcessingTimeService(AsyncExceptionHandler task, Object checkpointLock, ThreadFactory threadFactory) {
        this.task = (AsyncExceptionHandler)Preconditions.checkNotNull((Object)task);
        this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock);
        this.status = new AtomicInteger(0);
        this.timerService = threadFactory == null ? new ScheduledThreadPoolExecutor(1) : new ScheduledThreadPoolExecutor(1, threadFactory);
        this.timerService.setRemoveOnCancelPolicy(true);
        this.timerService.setContinueExistingPeriodicTasksAfterShutdownPolicy(false);
        this.timerService.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
    }

    @Override
    public long getCurrentProcessingTime() {
        return System.currentTimeMillis();
    }

    @Override
    public ScheduledFuture<?> registerTimer(long timestamp, ProcessingTimeCallback target) {
        long delay = Math.max(timestamp - this.getCurrentProcessingTime(), 0L) + 1L;
        try {
            return this.timerService.schedule(new TriggerTask(this.status, this.task, this.checkpointLock, target, timestamp), delay, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            int status = this.status.get();
            if (status == 1) {
                return new NeverCompleteFuture(delay);
            }
            if (status == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @Override
    public ScheduledFuture<?> scheduleAtFixedRate(ProcessingTimeCallback callback, long initialDelay, long period) {
        long nextTimestamp = this.getCurrentProcessingTime() + initialDelay;
        try {
            return this.timerService.scheduleAtFixedRate(new RepeatedTriggerTask(this.status, this.task, this.checkpointLock, callback, nextTimestamp, period), initialDelay, period, TimeUnit.MILLISECONDS);
        }
        catch (RejectedExecutionException e) {
            int status = this.status.get();
            if (status == 1) {
                return new NeverCompleteFuture(initialDelay);
            }
            if (status == 2) {
                throw new IllegalStateException("Timer service is shut down");
            }
            throw e;
        }
    }

    @VisibleForTesting
    boolean isAlive() {
        return this.status.get() == 0;
    }

    @Override
    public boolean isTerminated() {
        return this.status.get() == 2;
    }

    @Override
    public void quiesce() throws InterruptedException {
        if (this.status.compareAndSet(0, 1)) {
            this.timerService.shutdown();
        }
    }

    @Override
    public void awaitPendingAfterQuiesce() throws InterruptedException {
        if (!this.timerService.isTerminated()) {
            Preconditions.checkState((this.timerService.isTerminating() || this.timerService.isShutdown() ? 1 : 0) != 0);
            this.timerService.awaitTermination(365L, TimeUnit.DAYS);
        }
    }

    @Override
    public void shutdownService() {
        if (this.status.compareAndSet(0, 2) || this.status.compareAndSet(1, 2)) {
            this.timerService.shutdownNow();
        }
    }

    @Override
    public boolean shutdownAndAwaitPending(long time, TimeUnit timeUnit) throws InterruptedException {
        this.shutdownService();
        return this.timerService.awaitTermination(time, timeUnit);
    }

    @Override
    public boolean shutdownServiceUninterruptible(long timeoutMs) {
        Deadline deadline = Deadline.fromNow((Duration)Duration.ofMillis(timeoutMs));
        boolean shutdownComplete = false;
        boolean receivedInterrupt = false;
        do {
            try {
                shutdownComplete = this.shutdownAndAwaitPending(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException iex) {
                receivedInterrupt = true;
                LOG.trace("Intercepted attempt to interrupt timer service shutdown.", (Throwable)iex);
            }
        } while (deadline.hasTimeLeft() && !shutdownComplete);
        if (receivedInterrupt) {
            Thread.currentThread().interrupt();
        }
        return shutdownComplete;
    }

    protected void finalize() throws Throwable {
        super.finalize();
        this.timerService.shutdownNow();
    }

    @VisibleForTesting
    int getNumTasksScheduled() {
        BlockingQueue<Runnable> queue = this.timerService.getQueue();
        if (queue == null) {
            return 0;
        }
        return queue.size();
    }

    private static final class NeverCompleteFuture
    implements ScheduledFuture<Object> {
        private final Object lock = new Object();
        private final long delayMillis;
        private volatile boolean canceled;

        private NeverCompleteFuture(long delayMillis) {
            this.delayMillis = delayMillis;
        }

        @Override
        public long getDelay(@Nonnull TimeUnit unit) {
            return unit.convert(this.delayMillis, TimeUnit.MILLISECONDS);
        }

        @Override
        public int compareTo(@Nonnull Delayed o) {
            long otherMillis = o.getDelay(TimeUnit.MILLISECONDS);
            return Long.compare(this.delayMillis, otherMillis);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            Object object = this.lock;
            synchronized (object) {
                this.canceled = true;
                this.lock.notifyAll();
            }
            return true;
        }

        @Override
        public boolean isCancelled() {
            return this.canceled;
        }

        @Override
        public boolean isDone() {
            return false;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public Object get() throws InterruptedException {
            Object object = this.lock;
            synchronized (object) {
                while (!this.canceled) {
                    this.lock.wait();
                }
            }
            throw new CancellationException();
        }

        @Override
        public Object get(long timeout, @Nonnull TimeUnit unit) throws InterruptedException, TimeoutException {
            Object object = this.lock;
            synchronized (object) {
                while (!this.canceled) {
                    unit.timedWait(this.lock, timeout);
                }
                if (this.canceled) {
                    throw new CancellationException();
                }
                throw new TimeoutException();
            }
        }
    }

    private static final class RepeatedTriggerTask
    implements Runnable {
        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long period;
        private final AsyncExceptionHandler exceptionHandler;
        private long nextTimestamp;

        private RepeatedTriggerTask(AtomicInteger serviceStatus, AsyncExceptionHandler exceptionHandler, Object lock, ProcessingTimeCallback target, long nextTimestamp, long period) {
            this.serviceStatus = (AtomicInteger)Preconditions.checkNotNull((Object)serviceStatus);
            this.lock = Preconditions.checkNotNull((Object)lock);
            this.target = (ProcessingTimeCallback)Preconditions.checkNotNull((Object)target);
            this.period = period;
            this.exceptionHandler = (AsyncExceptionHandler)Preconditions.checkNotNull((Object)exceptionHandler);
            this.nextTimestamp = nextTimestamp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                try {
                    if (this.serviceStatus.get() == 0) {
                        this.target.onProcessingTime(this.nextTimestamp);
                    }
                    this.nextTimestamp += this.period;
                }
                catch (Throwable t) {
                    TimerException asyncException = new TimerException(t);
                    this.exceptionHandler.handleAsyncException("Caught exception while processing repeated timer task.", asyncException);
                }
            }
        }
    }

    private static final class TriggerTask
    implements Runnable {
        private final AtomicInteger serviceStatus;
        private final Object lock;
        private final ProcessingTimeCallback target;
        private final long timestamp;
        private final AsyncExceptionHandler exceptionHandler;

        private TriggerTask(AtomicInteger serviceStatus, AsyncExceptionHandler exceptionHandler, Object lock, ProcessingTimeCallback target, long timestamp) {
            this.serviceStatus = (AtomicInteger)Preconditions.checkNotNull((Object)serviceStatus);
            this.exceptionHandler = (AsyncExceptionHandler)Preconditions.checkNotNull((Object)exceptionHandler);
            this.lock = Preconditions.checkNotNull((Object)lock);
            this.target = (ProcessingTimeCallback)Preconditions.checkNotNull((Object)target);
            this.timestamp = timestamp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                try {
                    if (this.serviceStatus.get() == 0) {
                        this.target.onProcessingTime(this.timestamp);
                    }
                }
                catch (Throwable t) {
                    TimerException asyncException = new TimerException(t);
                    this.exceptionHandler.handleAsyncException("Caught exception while processing timer.", asyncException);
                }
            }
        }
    }
}

