package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.class */
public class SystemProcessingTimeServiceTest extends TestLogger {
    @Test(timeout = 10000)
    public void testScheduleAtFixedRate() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        try {
            createSystemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.1
                public void onProcessingTime(long j) throws Exception {
                    countDownLatch.countDown();
                }
            }, 0L, 10L);
            countDownLatch.await();
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            createSystemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        try {
            ScheduledFuture scheduleAtFixedRate = createSystemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.2
                public void onProcessingTime(long j) throws Exception {
                }
            }, 0L, 10L);
            Assert.assertFalse(scheduleAtFixedRate.isDone());
            createSystemProcessingTimeService.quiesce().get();
            try {
                scheduleAtFixedRate.get();
                Assert.fail("scheduled future is not cancelled");
            } catch (CancellationException e) {
            }
            Assert.assertNotNull(createSystemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.3
                public void onProcessingTime(long j) throws Exception {
                    throw new Exception("Test exception.");
                }
            }, 0L, 100L));
            Assert.assertEquals(0L, createSystemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            createSystemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testImmediateShutdown() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((CompletableFuture<Throwable>) completableFuture);
        try {
            Assert.assertFalse(createSystemProcessingTimeService.isTerminated());
            OneShotLatch oneShotLatch = new OneShotLatch();
            createSystemProcessingTimeService.registerTimer(System.currentTimeMillis(), j -> {
                oneShotLatch.trigger();
                Thread.sleep(100000000L);
            });
            oneShotLatch.await();
            createSystemProcessingTimeService.shutdownService();
            Assert.assertTrue(createSystemProcessingTimeService.isTerminated());
            Assert.assertEquals(0L, createSystemProcessingTimeService.getNumTasksScheduled());
            try {
                createSystemProcessingTimeService.registerTimer(System.currentTimeMillis() + 1000, j2 -> {
                    Assert.fail("should not be called");
                });
                Assert.fail("should result in an exception");
            } catch (IllegalStateException e) {
            }
            try {
                createSystemProcessingTimeService.scheduleAtFixedRate(j3 -> {
                    Assert.fail("should not be called");
                }, 0L, 100L);
                Assert.fail("should result in an exception");
            } catch (IllegalStateException e2) {
            }
            Assert.assertThat(completableFuture.get(30L, TimeUnit.SECONDS), Matchers.instanceOf(InterruptedException.class));
            createSystemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            createSystemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    public void testQuiescing() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        try {
            final OneShotLatch oneShotLatch = new OneShotLatch();
            final ReentrantLock reentrantLock = new ReentrantLock();
            createSystemProcessingTimeService.registerTimer(createSystemProcessingTimeService.getCurrentProcessingTime() + 20, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.4
                public void onProcessingTime(long j) throws Exception {
                    reentrantLock.lock();
                    try {
                        oneShotLatch.trigger();
                        Thread.sleep(5L);
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            });
            oneShotLatch.await();
            createSystemProcessingTimeService.quiesce().get();
            Assert.assertTrue(reentrantLock.tryLock());
            Assert.assertNotNull(createSystemProcessingTimeService.registerTimer(createSystemProcessingTimeService.getCurrentProcessingTime() - 5, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.5
                public void onProcessingTime(long j) throws Exception {
                    throw new Exception("test");
                }
            }));
            Assert.assertEquals(0L, createSystemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            createSystemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testFutureCancellation() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService createSystemProcessingTimeService = createSystemProcessingTimeService((AtomicReference<Throwable>) atomicReference);
        try {
            Assert.assertEquals(0L, createSystemProcessingTimeService.getNumTasksScheduled());
            ScheduledFuture registerTimer = createSystemProcessingTimeService.registerTimer(System.currentTimeMillis() + 100000000, j -> {
            });
            Assert.assertEquals(1L, createSystemProcessingTimeService.getNumTasksScheduled());
            registerTimer.cancel(false);
            Assert.assertEquals(0L, createSystemProcessingTimeService.getNumTasksScheduled());
            ScheduledFuture scheduleAtFixedRate = createSystemProcessingTimeService.scheduleAtFixedRate(j2 -> {
            }, 10000000000L, 50L);
            Assert.assertEquals(1L, createSystemProcessingTimeService.getNumTasksScheduled());
            scheduleAtFixedRate.cancel(false);
            Assert.assertEquals(0L, createSystemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            createSystemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testShutdownAndWaitPending() {
        OneShotLatch oneShotLatch = new OneShotLatch();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SystemProcessingTimeService createBlockingSystemProcessingTimeService = createBlockingSystemProcessingTimeService(oneShotLatch, atomicBoolean);
        Assert.assertFalse(createBlockingSystemProcessingTimeService.isTerminated());
        try {
            Assert.assertFalse(createBlockingSystemProcessingTimeService.shutdownAndAwaitPending(1L, TimeUnit.SECONDS));
        } catch (InterruptedException e) {
            Assert.fail("Unexpected interruption.");
        }
        oneShotLatch.trigger();
        try {
            Assert.assertTrue(createBlockingSystemProcessingTimeService.shutdownAndAwaitPending(60L, TimeUnit.SECONDS));
        } catch (InterruptedException e2) {
            Assert.fail("Unexpected interruption.");
        }
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(createBlockingSystemProcessingTimeService.isTerminated());
    }

    @Test
    public void testShutdownServiceUninterruptible() {
        OneShotLatch oneShotLatch = new OneShotLatch();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        SystemProcessingTimeService createBlockingSystemProcessingTimeService = createBlockingSystemProcessingTimeService(oneShotLatch, atomicBoolean);
        Assert.assertFalse(createBlockingSystemProcessingTimeService.isTerminated());
        Thread currentThread = Thread.currentThread();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(true);
        Thread thread = new Thread(() -> {
            while (atomicBoolean2.get()) {
                currentThread.interrupt();
                try {
                    Thread.sleep(1L);
                } catch (InterruptedException e) {
                }
            }
        });
        thread.start();
        long nanoTime = System.nanoTime();
        Assert.assertFalse(createBlockingSystemProcessingTimeService.isTerminated());
        Assert.assertFalse(createBlockingSystemProcessingTimeService.shutdownServiceUninterruptible(50L));
        Assert.assertTrue(createBlockingSystemProcessingTimeService.isTerminated());
        Assert.assertFalse(atomicBoolean.get());
        Assert.assertTrue(System.nanoTime() - nanoTime >= 50000000);
        atomicBoolean2.set(false);
        do {
            try {
                thread.join();
            } catch (InterruptedException e) {
            }
        } while (thread.isAlive());
        Thread.interrupted();
        oneShotLatch.trigger();
        Assert.assertTrue(createBlockingSystemProcessingTimeService.shutdownServiceUninterruptible(50L));
        Assert.assertTrue(atomicBoolean.get());
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(CompletableFuture<Throwable> completableFuture) {
        Preconditions.checkArgument(!completableFuture.isDone());
        completableFuture.getClass();
        return new SystemProcessingTimeService((v1) -> {
            r2.complete(v1);
        });
    }

    private static SystemProcessingTimeService createSystemProcessingTimeService(AtomicReference<Throwable> atomicReference) {
        Preconditions.checkArgument(atomicReference.get() == null);
        return new SystemProcessingTimeService(exc -> {
            atomicReference.compareAndSet(null, exc);
        });
    }

    private static SystemProcessingTimeService createBlockingSystemProcessingTimeService(OneShotLatch oneShotLatch, AtomicBoolean atomicBoolean) {
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        Preconditions.checkState(!atomicBoolean.get());
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(exc -> {
        });
        systemProcessingTimeService.scheduleAtFixedRate(j -> {
            oneShotLatch2.trigger();
            boolean z = false;
            while (!z) {
                try {
                    oneShotLatch.await();
                    z = true;
                } catch (InterruptedException e) {
                }
            }
            atomicBoolean.set(true);
        }, 0L, 10L);
        try {
            oneShotLatch2.await();
        } catch (InterruptedException e) {
            Assert.fail("Problem while starting up service.");
        }
        return systemProcessingTimeService;
    }
}
