/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.streaming.runtime.tasks.SynchronousSavepointLatch;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class SynchronousSavepointSyncLatchTest {
    private ExecutorService executors;

    @Before
    public void startExecutorService() {
        this.executors = Executors.newCachedThreadPool();
    }

    @After
    public void terminateExecutors() throws InterruptedException {
        while (!this.executors.isTerminated()) {
            this.executors.shutdownNow();
            this.executors.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void triggerUnblocksWait() throws Exception {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        Assert.assertFalse((boolean)latchUnderTest.isWaiting());
        Future<Void> future = this.runThreadWaitingForCheckpointAck(latchUnderTest);
        while (!latchUnderTest.isWaiting()) {
            Thread.sleep(5L);
        }
        AtomicBoolean triggered = new AtomicBoolean();
        latchUnderTest.acknowledgeCheckpointAndTrigger(2L, () -> triggered.set(true));
        Assert.assertFalse((boolean)triggered.get());
        Assert.assertFalse((boolean)latchUnderTest.isCompleted());
        Assert.assertTrue((boolean)latchUnderTest.isWaiting());
        latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
        Assert.assertTrue((boolean)triggered.get());
        Assert.assertTrue((boolean)latchUnderTest.isCompleted());
        future.get();
        Assert.assertFalse((boolean)latchUnderTest.isWaiting());
    }

    @Test
    public void cancelUnblocksWait() throws Exception {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        Assert.assertFalse((boolean)latchUnderTest.isWaiting());
        Future<Void> future = this.runThreadWaitingForCheckpointAck(latchUnderTest);
        while (!latchUnderTest.isWaiting()) {
            Thread.sleep(5L);
        }
        latchUnderTest.cancelCheckpointLatch();
        Assert.assertTrue((boolean)latchUnderTest.isCanceled());
        future.get();
        Assert.assertFalse((boolean)latchUnderTest.isWaiting());
    }

    @Test
    public void waitAfterTriggerIsNotBlocking() throws Exception {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> {});
        latchUnderTest.blockUntilCheckpointIsAcknowledged();
    }

    @Test
    public void waitAfterCancelIsNotBlocking() throws Exception {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        latchUnderTest.cancelCheckpointLatch();
        Assert.assertTrue((boolean)latchUnderTest.isCanceled());
        latchUnderTest.blockUntilCheckpointIsAcknowledged();
    }

    @Test
    public void triggeringInvokesCallbackAtMostOnce() throws Exception {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        AtomicInteger counter = new AtomicInteger();
        Future<Void> future1 = this.runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
        Future<Void> future2 = this.runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
        Future<Void> future3 = this.runThreadTriggeringCheckpoint(latchUnderTest, 1L, counter::incrementAndGet);
        future1.get();
        future2.get();
        future3.get();
        Assert.assertEquals((long)1L, (long)counter.get());
    }

    @Test
    public void triggeringAfterCancelDoesNotInvokeCallback() throws Exception {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        latchUnderTest.cancelCheckpointLatch();
        Assert.assertTrue((boolean)latchUnderTest.isCanceled());
        AtomicBoolean triggered = new AtomicBoolean();
        latchUnderTest.acknowledgeCheckpointAndTrigger(1L, () -> triggered.set(true));
        Assert.assertFalse((boolean)triggered.get());
    }

    @Test
    public void checkpointIdIsSetOnlyOnce() {
        SynchronousSavepointLatch latchUnderTest = new SynchronousSavepointLatch();
        latchUnderTest.setCheckpointId(1L);
        Assert.assertTrue((boolean)latchUnderTest.isSet());
        Assert.assertEquals((long)1L, (long)latchUnderTest.getCheckpointId());
        latchUnderTest.setCheckpointId(2L);
        Assert.assertTrue((boolean)latchUnderTest.isSet());
        Assert.assertEquals((long)1L, (long)latchUnderTest.getCheckpointId());
    }

    private Future<Void> runThreadWaitingForCheckpointAck(SynchronousSavepointLatch latch) {
        return this.executors.submit(() -> {
            latch.blockUntilCheckpointIsAcknowledged();
            return null;
        });
    }

    private Future<Void> runThreadTriggeringCheckpoint(SynchronousSavepointLatch latch, long checkpointId, RunnableWithException runnable) {
        return this.executors.submit(() -> {
            latch.acknowledgeCheckpointAndTrigger(checkpointId, runnable);
            return null;
        });
    }
}

