package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.resourcemanager.exceptions.UnfulfillableSlotRequestException;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.ManualClock;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolBatchSlotRequestTest.class */
public class SlotPoolBatchSlotRequestTest extends TestLogger {
    private static final ResourceProfile resourceProfile = ResourceProfile.fromResources(1.0d, 1024);
    private static final ResourceProfile smallerResourceProfile = ResourceProfile.fromResources(0.5d, 512);
    public static final CompletableFuture[] COMPLETABLE_FUTURES_EMPTY_ARRAY = new CompletableFuture[0];
    private static ScheduledExecutorService singleThreadScheduledExecutorService;
    private static ComponentMainThreadExecutor mainThreadExecutor;

    @BeforeClass
    public static void setupClass() {
        singleThreadScheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
        mainThreadExecutor = ComponentMainThreadExecutorServiceAdapter.forSingleThreadExecutor(singleThreadScheduledExecutorService);
    }

    @AfterClass
    public static void teardownClass() {
        if (singleThreadScheduledExecutorService != null) {
            singleThreadScheduledExecutorService.shutdownNow();
        }
    }

    @Test
    public void testPendingBatchSlotRequestTimeout() throws Exception {
        TestingSlotPoolImpl build = new SlotPoolBuilder(mainThreadExecutor).build();
        Throwable th = null;
        try {
            try {
                SlotPoolUtils.requestNewAllocatedBatchSlot(build, mainThreadExecutor, ResourceProfile.UNKNOWN).get();
                Assert.fail("Expected that slot future times out.");
            } catch (ExecutionException e) {
                MatcherAssert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TimeoutException.class));
            }
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotTimeoutIfFulfillingSlotExists() throws Exception {
        Time milliseconds = Time.milliseconds(2L);
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        Clock manualClock = new ManualClock();
        TestingSlotPoolImpl build = new SlotPoolBuilder(forMainThread).setClock(manualClock).setBatchSlotTimeout(milliseconds).build();
        Throwable th = null;
        try {
            try {
                SlotPoolUtils.offerSlots(build, forMainThread, Collections.singletonList(resourceProfile));
                List asList = Arrays.asList(SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, resourceProfile), SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, ResourceProfile.UNKNOWN), SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, smallerResourceProfile));
                advanceTimeAndTriggerCheckBatchSlotTimeout(build, manualClock, milliseconds);
                Iterator it = asList.iterator();
                while (it.hasNext()) {
                    MatcherAssert.assertThat(Boolean.valueOf(((CompletableFuture) it.next()).isDone()), Matchers.is(false));
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotFailIfAllocationFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest.getAllocationId());
        });
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        TestingSlotPoolImpl build = new SlotPoolBuilder(forMainThread).setBatchSlotTimeout(Time.milliseconds(1000L)).setResourceManagerGateway(testingResourceManagerGateway).build();
        Throwable th = null;
        try {
            CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot = SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, resourceProfile);
            SlotPoolUtils.failAllocation(build, forMainThread, (AllocationID) completableFuture.get(), new FlinkException("Failed request"));
            MatcherAssert.assertThat(Boolean.valueOf(requestNewAllocatedBatchSlot.isDone()), Matchers.is(false));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testPendingBatchSlotRequestFailsIfAllocationFailsUnfulfillably() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
            completableFuture.complete(slotRequest.getAllocationId());
        });
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        TestingSlotPoolImpl build = new SlotPoolBuilder(forMainThread).setResourceManagerGateway(testingResourceManagerGateway).build();
        Throwable th = null;
        try {
            try {
                CompletableFuture<PhysicalSlot> requestNewAllocatedBatchSlot = SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, resourceProfile);
                SlotPoolUtils.failAllocation(build, forMainThread, (AllocationID) completableFuture.get(), new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN));
                MatcherAssert.assertThat(Boolean.valueOf(requestNewAllocatedBatchSlot.isCompletedExceptionally()), Matchers.is(true));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPendingBatchSlotRequestDoesNotFailIfRMRequestFails() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new FlinkException("Failed request")));
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        TestingSlotPoolImpl build = new SlotPoolBuilder(forMainThread).setBatchSlotTimeout(Time.milliseconds(1000L)).setResourceManagerGateway(testingResourceManagerGateway).build();
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat(Boolean.valueOf(SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, resourceProfile).isDone()), Matchers.is(false));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPendingBatchSlotRequestFailsIfRMRequestFailsUnfulfillably() throws Exception {
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setRequestSlotFuture(FutureUtils.completedExceptionally(new UnfulfillableSlotRequestException(new AllocationID(), ResourceProfile.UNKNOWN)));
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        TestingSlotPoolImpl build = new SlotPoolBuilder(forMainThread).setResourceManagerGateway(testingResourceManagerGateway).build();
        Throwable th = null;
        try {
            try {
                MatcherAssert.assertThat(Boolean.valueOf(SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, resourceProfile).isCompletedExceptionally()), Matchers.is(true));
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testPendingBatchSlotRequestTimeoutAfterSlotRelease() throws Exception {
        ComponentMainThreadExecutor forMainThread = ComponentMainThreadExecutorServiceAdapter.forMainThread();
        Clock manualClock = new ManualClock();
        Time milliseconds = Time.milliseconds(1000L);
        TestingSlotPoolImpl build = new SlotPoolBuilder(forMainThread).setClock(manualClock).setBatchSlotTimeout(milliseconds).build();
        Throwable th = null;
        try {
            try {
                ResourceID offerSlots = SlotPoolUtils.offerSlots(build, forMainThread, Collections.singletonList(resourceProfile));
                List<CompletableFuture> asList = Arrays.asList(SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, resourceProfile), SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, ResourceProfile.UNKNOWN), SlotPoolUtils.requestNewAllocatedBatchSlot(build, forMainThread, smallerResourceProfile));
                advanceTimeAndTriggerCheckBatchSlotTimeout(build, manualClock, milliseconds);
                MatcherAssert.assertThat(Boolean.valueOf(CompletableFuture.anyOf((CompletableFuture[]) asList.toArray(COMPLETABLE_FUTURES_EMPTY_ARRAY)).isDone()), Matchers.is(false));
                SlotPoolUtils.releaseTaskManager(build, forMainThread, offerSlots);
                advanceTimeAndTriggerCheckBatchSlotTimeout(build, manualClock, milliseconds);
                for (CompletableFuture completableFuture : asList) {
                    MatcherAssert.assertThat(Boolean.valueOf(completableFuture.isCompletedExceptionally()), Matchers.is(true));
                    try {
                        completableFuture.get();
                        Assert.fail("Expected that the slot future times out.");
                    } catch (ExecutionException e) {
                        MatcherAssert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TimeoutException.class));
                    }
                }
                if (build != null) {
                    if (0 == 0) {
                        build.close();
                        return;
                    }
                    try {
                        build.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (th != null) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    private void advanceTimeAndTriggerCheckBatchSlotTimeout(TestingSlotPoolImpl testingSlotPoolImpl, ManualClock manualClock, Time time) {
        testingSlotPoolImpl.triggerCheckBatchSlotTimeout();
        manualClock.advanceTime(time.toMilliseconds() + 1, TimeUnit.MILLISECONDS);
        testingSlotPoolImpl.triggerCheckBatchSlotTimeout();
    }
}
