package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CancellationException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.streaming.runtime.operators.TestProcessingTimeServiceTest;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SystemProcessingTimeServiceTest.class */
public class SystemProcessingTimeServiceTest extends TestLogger {
    @Test
    public void testTriggerHoldsLock() throws Exception {
        final Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        try {
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            systemProcessingTimeService.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.1
                public void onProcessingTime(long j) {
                    Assert.assertTrue(Thread.holdsLock(obj));
                }
            }).get();
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            systemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testScheduleAtFixedRateHoldsLock() throws Exception {
        final Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        try {
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            ScheduledFuture scheduleAtFixedRate = systemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.2
                public void onProcessingTime(long j) {
                    Assert.assertTrue(Thread.holdsLock(obj));
                    oneShotLatch.trigger();
                }
            }, 0L, 100L);
            oneShotLatch.await();
            scheduleAtFixedRate.cancel(true);
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            systemProcessingTimeService.shutdownService();
        }
    }

    @Test(timeout = 10000)
    public void testScheduleAtFixedRate() throws Exception {
        Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        try {
            systemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.3
                public void onProcessingTime(long j) throws Exception {
                    countDownLatch.countDown();
                }
            }, 0L, 10L);
            countDownLatch.await();
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            systemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testQuiesceAndAwaitingCancelsScheduledAtFixRateFuture() throws Exception {
        Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        try {
            ScheduledFuture scheduleAtFixedRate = systemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.4
                public void onProcessingTime(long j) throws Exception {
                }
            }, 0L, 10L);
            Assert.assertFalse(scheduleAtFixedRate.isDone());
            systemProcessingTimeService.quiesce();
            systemProcessingTimeService.awaitPendingAfterQuiesce();
            try {
                scheduleAtFixedRate.get();
                Assert.fail("scheduled future is not cancelled");
            } catch (CancellationException e) {
            }
            Assert.assertNotNull(systemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.5
                public void onProcessingTime(long j) throws Exception {
                    throw new Exception("Test exception.");
                }
            }, 0L, 100L));
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            systemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testImmediateShutdown() throws Exception {
        Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        try {
            Assert.assertFalse(systemProcessingTimeService.isTerminated());
            final OneShotLatch oneShotLatch = new OneShotLatch();
            systemProcessingTimeService.registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.6
                public void onProcessingTime(long j) throws Exception {
                    oneShotLatch.trigger();
                    Thread.sleep(100000000L);
                }
            });
            oneShotLatch.await();
            systemProcessingTimeService.shutdownService();
            synchronized (obj) {
                Assert.assertTrue(systemProcessingTimeService.isTerminated());
            }
            try {
                systemProcessingTimeService.registerTimer(System.currentTimeMillis() + 1000, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.7
                    public void onProcessingTime(long j) {
                    }
                });
                Assert.fail("should result in an exception");
            } catch (IllegalStateException e) {
            }
            try {
                systemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.8
                    public void onProcessingTime(long j) {
                    }
                }, 0L, 100L);
                Assert.fail("should result in an exception");
            } catch (IllegalStateException e2) {
            }
            Assert.assertNotNull(atomicReference.get());
            Assert.assertTrue(((Throwable) atomicReference.get()).getCause() instanceof InterruptedException);
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            systemProcessingTimeService.shutdownService();
        } catch (Throwable th) {
            systemProcessingTimeService.shutdownService();
            throw th;
        }
    }

    @Test
    public void testQuiescing() throws Exception {
        Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        try {
            final OneShotLatch oneShotLatch = new OneShotLatch();
            final ReentrantLock reentrantLock = new ReentrantLock();
            systemProcessingTimeService.registerTimer(systemProcessingTimeService.getCurrentProcessingTime() + 20, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.9
                public void onProcessingTime(long j) throws Exception {
                    reentrantLock.lock();
                    try {
                        oneShotLatch.trigger();
                        Thread.sleep(5L);
                    } finally {
                        reentrantLock.unlock();
                    }
                }
            });
            oneShotLatch.await();
            systemProcessingTimeService.quiesce();
            systemProcessingTimeService.awaitPendingAfterQuiesce();
            Assert.assertTrue(reentrantLock.tryLock());
            Assert.assertNotNull(systemProcessingTimeService.registerTimer(systemProcessingTimeService.getCurrentProcessingTime() - 5, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.10
                public void onProcessingTime(long j) throws Exception {
                    throw new Exception("test");
                }
            }));
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            systemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testFutureCancellation() throws Exception {
        Object obj = new Object();
        AtomicReference atomicReference = new AtomicReference();
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService(new TestProcessingTimeServiceTest.ReferenceSettingExceptionHandler(atomicReference), obj);
        try {
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            ScheduledFuture registerTimer = systemProcessingTimeService.registerTimer(System.currentTimeMillis() + 100000000, new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.11
                public void onProcessingTime(long j) {
                }
            });
            Assert.assertEquals(1L, systemProcessingTimeService.getNumTasksScheduled());
            registerTimer.cancel(false);
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            ScheduledFuture scheduleAtFixedRate = systemProcessingTimeService.scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.12
                public void onProcessingTime(long j) throws Exception {
                }
            }, 10000000000L, 50L);
            Assert.assertEquals(1L, systemProcessingTimeService.getNumTasksScheduled());
            scheduleAtFixedRate.cancel(false);
            Assert.assertEquals(0L, systemProcessingTimeService.getNumTasksScheduled());
            if (atomicReference.get() != null) {
                throw new Exception((Throwable) atomicReference.get());
            }
        } finally {
            systemProcessingTimeService.shutdownService();
        }
    }

    @Test
    public void testExceptionReporting() throws InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        new SystemProcessingTimeService(new AsyncExceptionHandler() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.13
            public void handleAsyncException(String str, Throwable th) {
                atomicBoolean.set(true);
                oneShotLatch.trigger();
            }
        }, new Object()).registerTimer(System.currentTimeMillis(), new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.14
            public void onProcessingTime(long j) throws Exception {
                throw new Exception("Exception in Timer");
            }
        });
        oneShotLatch.await();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testExceptionReportingScheduleAtFixedRate() throws InterruptedException {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        new SystemProcessingTimeService(new AsyncExceptionHandler() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.15
            public void handleAsyncException(String str, Throwable th) {
                atomicBoolean.set(true);
                oneShotLatch.trigger();
            }
        }, new Object()).scheduleAtFixedRate(new ProcessingTimeCallback() { // from class: org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeServiceTest.16
            public void onProcessingTime(long j) throws Exception {
                throw new Exception("Exception in Timer");
            }
        }, 0L, 100L);
        oneShotLatch.await();
        Assert.assertTrue(atomicBoolean.get());
    }

    @Test
    public void testShutdownAndWaitPending() {
        Object obj = new Object();
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        OneShotLatch oneShotLatch3 = new OneShotLatch();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        SystemProcessingTimeService systemProcessingTimeService = new SystemProcessingTimeService((str, th) -> {
        }, obj);
        systemProcessingTimeService.scheduleAtFixedRate(j -> {
            oneShotLatch.trigger();
            try {
                oneShotLatch2.await();
                atomicBoolean.set(false);
            } catch (InterruptedException e) {
            }
            try {
                oneShotLatch3.await();
            } catch (InterruptedException e2) {
                atomicBoolean.set(false);
            }
        }, 0L, 10L);
        try {
            oneShotLatch.await();
        } catch (InterruptedException e) {
            Assert.fail();
        }
        Assert.assertFalse(systemProcessingTimeService.isTerminated());
        try {
            Assert.assertFalse(systemProcessingTimeService.shutdownAndAwaitPending(1L, TimeUnit.SECONDS));
        } catch (InterruptedException e2) {
            Assert.fail("Unexpected interruption.");
        }
        oneShotLatch3.trigger();
        try {
            Assert.assertTrue(systemProcessingTimeService.shutdownAndAwaitPending(60L, TimeUnit.SECONDS));
        } catch (InterruptedException e3) {
            Assert.fail("Unexpected interruption.");
        }
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(systemProcessingTimeService.isTerminated());
    }
}
