package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SynchronousSavepointSyncLatchTest.class */
public class SynchronousSavepointSyncLatchTest {
    private ExecutorService executors;

    @Before
    public void startExecutorService() {
        this.executors = Executors.newCachedThreadPool();
    }

    @After
    public void terminateExecutors() throws InterruptedException {
        while (!this.executors.isTerminated()) {
            this.executors.shutdownNow();
            this.executors.awaitTermination(10L, TimeUnit.SECONDS);
        }
    }

    @Test
    public void triggerUnblocksWait() throws Exception {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        Assert.assertFalse(synchronousSavepointLatch.isWaiting());
        Future<Void> runThreadWaitingForCheckpointAck = runThreadWaitingForCheckpointAck(synchronousSavepointLatch);
        while (!synchronousSavepointLatch.isWaiting()) {
            Thread.sleep(5L);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        synchronousSavepointLatch.acknowledgeCheckpointAndTrigger(2L, () -> {
            atomicBoolean.set(true);
        });
        Assert.assertFalse(atomicBoolean.get());
        Assert.assertFalse(synchronousSavepointLatch.isCompleted());
        Assert.assertTrue(synchronousSavepointLatch.isWaiting());
        synchronousSavepointLatch.acknowledgeCheckpointAndTrigger(1L, () -> {
            atomicBoolean.set(true);
        });
        Assert.assertTrue(atomicBoolean.get());
        Assert.assertTrue(synchronousSavepointLatch.isCompleted());
        runThreadWaitingForCheckpointAck.get();
        Assert.assertFalse(synchronousSavepointLatch.isWaiting());
    }

    @Test
    public void cancelUnblocksWait() throws Exception {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        Assert.assertFalse(synchronousSavepointLatch.isWaiting());
        Future<Void> runThreadWaitingForCheckpointAck = runThreadWaitingForCheckpointAck(synchronousSavepointLatch);
        while (!synchronousSavepointLatch.isWaiting()) {
            Thread.sleep(5L);
        }
        synchronousSavepointLatch.cancelCheckpointLatch();
        Assert.assertTrue(synchronousSavepointLatch.isCanceled());
        runThreadWaitingForCheckpointAck.get();
        Assert.assertFalse(synchronousSavepointLatch.isWaiting());
    }

    @Test
    public void waitAfterTriggerIsNotBlocking() throws Exception {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        synchronousSavepointLatch.acknowledgeCheckpointAndTrigger(1L, () -> {
        });
        synchronousSavepointLatch.blockUntilCheckpointIsAcknowledged();
    }

    @Test
    public void waitAfterCancelIsNotBlocking() throws Exception {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        synchronousSavepointLatch.cancelCheckpointLatch();
        Assert.assertTrue(synchronousSavepointLatch.isCanceled());
        synchronousSavepointLatch.blockUntilCheckpointIsAcknowledged();
    }

    @Test
    public void triggeringInvokesCallbackAtMostOnce() throws Exception {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        AtomicInteger atomicInteger = new AtomicInteger();
        atomicInteger.getClass();
        Future<Void> runThreadTriggeringCheckpoint = runThreadTriggeringCheckpoint(synchronousSavepointLatch, 1L, atomicInteger::incrementAndGet);
        atomicInteger.getClass();
        Future<Void> runThreadTriggeringCheckpoint2 = runThreadTriggeringCheckpoint(synchronousSavepointLatch, 1L, atomicInteger::incrementAndGet);
        atomicInteger.getClass();
        Future<Void> runThreadTriggeringCheckpoint3 = runThreadTriggeringCheckpoint(synchronousSavepointLatch, 1L, atomicInteger::incrementAndGet);
        runThreadTriggeringCheckpoint.get();
        runThreadTriggeringCheckpoint2.get();
        runThreadTriggeringCheckpoint3.get();
        Assert.assertEquals(1L, atomicInteger.get());
    }

    @Test
    public void triggeringAfterCancelDoesNotInvokeCallback() throws Exception {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        synchronousSavepointLatch.cancelCheckpointLatch();
        Assert.assertTrue(synchronousSavepointLatch.isCanceled());
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        synchronousSavepointLatch.acknowledgeCheckpointAndTrigger(1L, () -> {
            atomicBoolean.set(true);
        });
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test
    public void checkpointIdIsSetOnlyOnce() {
        SynchronousSavepointLatch synchronousSavepointLatch = new SynchronousSavepointLatch();
        synchronousSavepointLatch.setCheckpointId(1L);
        Assert.assertTrue(synchronousSavepointLatch.isSet());
        Assert.assertEquals(1L, synchronousSavepointLatch.getCheckpointId());
        synchronousSavepointLatch.setCheckpointId(2L);
        Assert.assertTrue(synchronousSavepointLatch.isSet());
        Assert.assertEquals(1L, synchronousSavepointLatch.getCheckpointId());
    }

    private Future<Void> runThreadWaitingForCheckpointAck(SynchronousSavepointLatch synchronousSavepointLatch) {
        return this.executors.submit(() -> {
            synchronousSavepointLatch.blockUntilCheckpointIsAcknowledged();
            return null;
        });
    }

    private Future<Void> runThreadTriggeringCheckpoint(SynchronousSavepointLatch synchronousSavepointLatch, long j, RunnableWithException runnableWithException) {
        return this.executors.submit(() -> {
            synchronousSavepointLatch.acknowledgeCheckpointAndTrigger(j, runnableWithException);
            return null;
        });
    }
}
