package org.apache.flink.runtime.heartbeat;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.util.DirectExecutorService;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerTest.class */
public class HeartbeatManagerTest extends TestLogger {
    private static final Logger LOG = LoggerFactory.getLogger(HeartbeatManagerTest.class);

    /* loaded from: input_file:org/apache/flink/runtime/heartbeat/HeartbeatManagerTest$TestingHeartbeatListener.class */
    static class TestingHeartbeatListener implements HeartbeatListener<Object, Object> {
        private final CompletableFuture<ResourceID> future = new CompletableFuture<>();
        private final Object payload;
        private int numberHeartbeatReports;

        TestingHeartbeatListener(Object obj) {
            this.payload = obj;
        }

        CompletableFuture<ResourceID> getTimeoutFuture() {
            return this.future;
        }

        public int getNumberHeartbeatReports() {
            return this.numberHeartbeatReports;
        }

        public void notifyHeartbeatTimeout(ResourceID resourceID) {
            this.future.complete(resourceID);
        }

        public void reportPayload(ResourceID resourceID, Object obj) {
            this.numberHeartbeatReports++;
        }

        public CompletableFuture<Object> retrievePayload() {
            return CompletableFuture.completedFuture(this.payload);
        }
    }

    @Test
    public void testRegularHeartbeat() {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener) Mockito.mock(HeartbeatListener.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        Object obj = new Object();
        Mockito.when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(obj));
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(1000L, resourceID, heartbeatListener, new DirectExecutorService(), scheduledExecutor, LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class);
        heartbeatManagerImpl.monitorTarget(resourceID2, heartbeatTarget);
        heartbeatManagerImpl.requestHeartbeat(resourceID2, obj);
        ((HeartbeatListener) Mockito.verify(heartbeatListener, Mockito.times(1))).reportPayload(resourceID2, obj);
        ((HeartbeatListener) Mockito.verify(heartbeatListener, Mockito.times(1))).retrievePayload();
        ((HeartbeatTarget) Mockito.verify(heartbeatTarget, Mockito.times(1))).receiveHeartbeat(resourceID, obj);
        heartbeatManagerImpl.receiveHeartbeat(resourceID2, obj);
        ((HeartbeatListener) Mockito.verify(heartbeatListener, Mockito.times(2))).reportPayload(resourceID2, obj);
    }

    @Test
    public void testHeartbeatMonitorUpdate() {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener) Mockito.mock(HeartbeatListener.class);
        ScheduledExecutor scheduledExecutor = (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class);
        ScheduledFuture scheduledFuture = (ScheduledFuture) Mockito.mock(ScheduledFuture.class);
        ((ScheduledExecutor) Mockito.doReturn(scheduledFuture).when(scheduledExecutor)).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        Object obj = new Object();
        Mockito.when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(obj));
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(1000L, resourceID, heartbeatListener, new DirectExecutorService(), scheduledExecutor, LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class));
        heartbeatManagerImpl.receiveHeartbeat(resourceID2, obj);
        ((ScheduledFuture) Mockito.verify(scheduledFuture, Mockito.times(1))).cancel(true);
        ((ScheduledExecutor) Mockito.verify(scheduledExecutor, Mockito.times(2))).schedule((Runnable) Matchers.any(Runnable.class), Matchers.eq(1000L), (TimeUnit) Matchers.eq(TimeUnit.MILLISECONDS));
    }

    @Test
    public void testHeartbeatTimeout() throws Exception {
        Object obj = new Object();
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        TestingHeartbeatListener testingHeartbeatListener = new TestingHeartbeatListener(obj);
        ((ScheduledExecutorService) Mockito.doReturn((ScheduledFuture) Mockito.mock(ScheduledFuture.class)).when((ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class))).schedule((Runnable) Matchers.any(Runnable.class), Matchers.anyLong(), (TimeUnit) Matchers.any(TimeUnit.class));
        Object obj2 = new Object();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, resourceID, testingHeartbeatListener, new DirectExecutorService(), new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)), LOG);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class);
        CompletableFuture<ResourceID> timeoutFuture = testingHeartbeatListener.getTimeoutFuture();
        heartbeatManagerImpl.monitorTarget(resourceID2, heartbeatTarget);
        for (int i = 0; i < 10; i++) {
            heartbeatManagerImpl.receiveHeartbeat(resourceID2, obj2);
            Thread.sleep(20L);
        }
        Assert.assertFalse(timeoutFuture.isDone());
        Assert.assertEquals(resourceID2, timeoutFuture.get(2 * 100, TimeUnit.MILLISECONDS));
    }

    @Test
    public void testHeartbeatCluster() throws Exception {
        Object obj = new Object();
        Object obj2 = new Object();
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("barfoo");
        HeartbeatListener heartbeatListener = (HeartbeatListener) Mockito.mock(HeartbeatListener.class);
        Mockito.when(heartbeatListener.retrievePayload()).thenReturn(CompletableFuture.completedFuture(obj));
        TestingHeartbeatListener testingHeartbeatListener = new TestingHeartbeatListener(obj2);
        CompletableFuture<ResourceID> timeoutFuture = testingHeartbeatListener.getTimeoutFuture();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, resourceID, heartbeatListener, new DirectExecutorService(), new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)), LOG);
        HeartbeatManagerSenderImpl heartbeatManagerSenderImpl = new HeartbeatManagerSenderImpl(20L, 100L, resourceID2, testingHeartbeatListener, new DirectExecutorService(), new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)), LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, heartbeatManagerSenderImpl);
        heartbeatManagerSenderImpl.monitorTarget(resourceID, heartbeatManagerImpl);
        Thread.sleep(2 * 100);
        Assert.assertFalse(timeoutFuture.isDone());
        heartbeatManagerImpl.stop();
        Assert.assertEquals(resourceID, timeoutFuture.get(2 * 100, TimeUnit.MILLISECONDS));
        int i = (int) ((2 * 100) / 20);
        ((HeartbeatListener) Mockito.verify(heartbeatListener, Mockito.atLeast(i / 2))).reportPayload(resourceID2, obj2);
        Assert.assertTrue(testingHeartbeatListener.getNumberHeartbeatReports() >= i / 2);
    }

    @Test
    public void testTargetUnmonitoring() throws InterruptedException, ExecutionException {
        ResourceID resourceID = new ResourceID("foobar");
        ResourceID resourceID2 = new ResourceID("target");
        TestingHeartbeatListener testingHeartbeatListener = new TestingHeartbeatListener(new Object());
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, resourceID, testingHeartbeatListener, new DirectExecutorService(), new ScheduledExecutorServiceAdapter(new ScheduledThreadPoolExecutor(1)), LOG);
        heartbeatManagerImpl.monitorTarget(resourceID2, (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class));
        heartbeatManagerImpl.unmonitorTarget(resourceID2);
        try {
            testingHeartbeatListener.getTimeoutFuture().get(2 * 100, TimeUnit.MILLISECONDS);
            Assert.fail("Timeout should time out.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testLastHeartbeatFromUnregisteredTarget() {
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, ResourceID.generate(), (HeartbeatListener) Mockito.mock(HeartbeatListener.class), Executors.directExecutor(), (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            Assert.assertEquals(-1L, heartbeatManagerImpl.getLastHeartbeatFrom(ResourceID.generate()));
            heartbeatManagerImpl.stop();
        } catch (Throwable th) {
            heartbeatManagerImpl.stop();
            throw th;
        }
    }

    @Test
    public void testLastHeartbeatFrom() {
        ResourceID generate = ResourceID.generate();
        HeartbeatListener heartbeatListener = (HeartbeatListener) Mockito.mock(HeartbeatListener.class);
        HeartbeatTarget heartbeatTarget = (HeartbeatTarget) Mockito.mock(HeartbeatTarget.class);
        ResourceID generate2 = ResourceID.generate();
        HeartbeatManagerImpl heartbeatManagerImpl = new HeartbeatManagerImpl(100L, generate, heartbeatListener, Executors.directExecutor(), (ScheduledExecutor) Mockito.mock(ScheduledExecutor.class), LOG);
        try {
            heartbeatManagerImpl.monitorTarget(generate2, heartbeatTarget);
            Assert.assertEquals(0L, heartbeatManagerImpl.getLastHeartbeatFrom(generate2));
            long currentTimeMillis = System.currentTimeMillis();
            heartbeatManagerImpl.receiveHeartbeat(generate2, (Object) null);
            Assert.assertTrue(heartbeatManagerImpl.getLastHeartbeatFrom(generate2) >= currentTimeMillis);
            heartbeatManagerImpl.stop();
        } catch (Throwable th) {
            heartbeatManagerImpl.stop();
            throw th;
        }
    }
}
