package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.TestingComponentMainThreadExecutor;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobmanager.scheduler.DummyScheduledUnit;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest.class */
public class SlotPoolInteractionsTest extends TestLogger {
    private static final Time fastTimeout = Time.milliseconds(1);

    @ClassRule
    public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = new TestingComponentMainThreadExecutor.Resource(10);
    private final TestingComponentMainThreadExecutor testMainThreadExecutor = EXECUTOR_RESOURCE.getComponentMainThreadTestExecutor();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolInteractionsTest$TestingSlotPool.class */
    public static final class TestingSlotPool extends SlotPoolImpl {
        private volatile Consumer<SlotRequestId> releaseSlotConsumer;
        private volatile Consumer<SlotRequestId> timeoutPendingSlotRequestConsumer;

        public TestingSlotPool(JobID jobID, Clock clock, Time time, Time time2, Time time3) {
            super(jobID, clock, time, time2, time3);
            this.releaseSlotConsumer = null;
            this.timeoutPendingSlotRequestConsumer = null;
        }

        public void setReleaseSlotConsumer(Consumer<SlotRequestId> consumer) {
            this.releaseSlotConsumer = (Consumer) Preconditions.checkNotNull(consumer);
        }

        public void setTimeoutPendingSlotRequestConsumer(Consumer<SlotRequestId> consumer) {
            this.timeoutPendingSlotRequestConsumer = (Consumer) Preconditions.checkNotNull(consumer);
        }

        public void releaseSlot(@Nonnull SlotRequestId slotRequestId, @Nullable Throwable th) {
            Consumer<SlotRequestId> consumer = this.releaseSlotConsumer;
            super.releaseSlot(slotRequestId, th);
            if (consumer != null) {
                consumer.accept(slotRequestId);
            }
        }

        protected void timeoutPendingSlotRequest(SlotRequestId slotRequestId) {
            Consumer<SlotRequestId> consumer = this.timeoutPendingSlotRequestConsumer;
            if (consumer != null) {
                consumer.accept(slotRequestId);
            }
            super.timeoutPendingSlotRequest(slotRequestId);
        }

        boolean containsAllocatedSlot(AllocationID allocationID) {
            return getAllocatedSlots().contains(allocationID);
        }

        boolean containsAvailableSlot(AllocationID allocationID) {
            return getAvailableSlots().contains(allocationID);
        }

        int getNumberOfPendingRequests() {
            return getPendingRequests().size();
        }

        int getNumberOfWaitingForResourceRequests() {
            return getWaitingForResourceManager().size();
        }
    }

    @Test
    public void testSlotAllocationNoResourceManager() throws Exception {
        SlotPoolImpl slotPoolImpl = new SlotPoolImpl(new JobID(), SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
        Throwable th = null;
        try {
            slotPoolImpl.start(JobMasterId.generate(), "foobar", this.testMainThreadExecutor.getMainThreadExecutor());
            SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), slotPoolImpl);
            schedulerImpl.start(this.testMainThreadExecutor.getMainThreadExecutor());
            try {
                ((CompletableFuture) this.testMainThreadExecutor.execute(() -> {
                    return schedulerImpl.allocateSlot(new SlotRequestId(), new ScheduledUnit(ExecutionGraphTestUtils.getExecution()), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), fastTimeout);
                })).get();
                Assert.fail("We expected an ExecutionException.");
            } catch (ExecutionException e) {
                Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
            }
            if (slotPoolImpl != null) {
                if (0 == 0) {
                    slotPoolImpl.close();
                    return;
                }
                try {
                    slotPoolImpl.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (slotPoolImpl != null) {
                if (0 != 0) {
                    try {
                        slotPoolImpl.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    slotPoolImpl.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testCancelSlotAllocationWithoutResourceManager() throws Exception {
        TestingSlotPool createTestingSlotPool = createTestingSlotPool(new JobID());
        Throwable th = null;
        try {
            try {
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.getClass();
                createTestingSlotPool.setTimeoutPendingSlotRequestConsumer((v1) -> {
                    r1.complete(v1);
                });
                createTestingSlotPool.start(JobMasterId.generate(), "foobar", this.testMainThreadExecutor.getMainThreadExecutor());
                SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), createTestingSlotPool);
                schedulerImpl.start(this.testMainThreadExecutor.getMainThreadExecutor());
                SlotRequestId slotRequestId = new SlotRequestId();
                try {
                    ((CompletableFuture) this.testMainThreadExecutor.execute(() -> {
                        return schedulerImpl.allocateSlot(slotRequestId, new ScheduledUnit(ExecutionGraphTestUtils.getExecution()), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), fastTimeout);
                    })).get();
                    Assert.fail("We expected a TimeoutException.");
                } catch (ExecutionException e) {
                    Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
                }
                completableFuture.get();
                Assert.assertEquals(0L, createTestingSlotPool.getNumberOfWaitingForResourceRequests());
                if (createTestingSlotPool != null) {
                    if (0 == 0) {
                        createTestingSlotPool.close();
                        return;
                    }
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestingSlotPool != null) {
                if (th != null) {
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestingSlotPool.close();
                }
            }
            throw th4;
        }
    }

    @Nonnull
    private TestingSlotPool createTestingSlotPool(JobID jobID) {
        return new TestingSlotPool(jobID, SystemClock.getInstance(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime(), TestingUtils.infiniteTime());
    }

    @Test
    public void testSlotAllocationTimeout() throws Exception {
        TestingSlotPool createTestingSlotPool = createTestingSlotPool(new JobID());
        Throwable th = null;
        try {
            try {
                createTestingSlotPool.start(JobMasterId.generate(), "foobar", this.testMainThreadExecutor.getMainThreadExecutor());
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.getClass();
                createTestingSlotPool.setTimeoutPendingSlotRequestConsumer((v1) -> {
                    r1.complete(v1);
                });
                createTestingSlotPool.connectToResourceManager(new TestingResourceManagerGateway());
                SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), createTestingSlotPool);
                schedulerImpl.start(this.testMainThreadExecutor.getMainThreadExecutor());
                SlotRequestId slotRequestId = new SlotRequestId();
                try {
                    ((CompletableFuture) this.testMainThreadExecutor.execute(() -> {
                        return schedulerImpl.allocateSlot(slotRequestId, new DummyScheduledUnit(), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), fastTimeout);
                    })).get();
                    Assert.fail("We expected a TimeoutException.");
                } catch (ExecutionException e) {
                    Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
                }
                completableFuture.get();
                Assert.assertEquals(0L, createTestingSlotPool.getNumberOfPendingRequests());
                if (createTestingSlotPool != null) {
                    if (0 == 0) {
                        createTestingSlotPool.close();
                        return;
                    }
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestingSlotPool != null) {
                if (th != null) {
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestingSlotPool.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testExtraSlotsAreKept() throws Exception {
        TestingSlotPool createTestingSlotPool = createTestingSlotPool(new JobID());
        Throwable th = null;
        try {
            try {
                createTestingSlotPool.start(JobMasterId.generate(), "foobar", this.testMainThreadExecutor.getMainThreadExecutor());
                SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), createTestingSlotPool);
                schedulerImpl.start(this.testMainThreadExecutor.getMainThreadExecutor());
                CompletableFuture completableFuture = new CompletableFuture();
                TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
                testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> {
                    completableFuture.complete(slotRequest.getAllocationId());
                });
                CompletableFuture completableFuture2 = new CompletableFuture();
                completableFuture2.getClass();
                createTestingSlotPool.setTimeoutPendingSlotRequestConsumer((v1) -> {
                    r1.complete(v1);
                });
                createTestingSlotPool.connectToResourceManager(testingResourceManagerGateway);
                SlotRequestId slotRequestId = new SlotRequestId();
                try {
                    ((CompletableFuture) this.testMainThreadExecutor.execute(() -> {
                        return schedulerImpl.allocateSlot(slotRequestId, new ScheduledUnit(ExecutionGraphTestUtils.getExecution()), SlotProfile.noLocality(AvailableSlotsTest.DEFAULT_TESTING_PROFILE), fastTimeout);
                    })).get();
                    Assert.fail("We expected a TimeoutException.");
                } catch (ExecutionException e) {
                    Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
                }
                completableFuture2.get();
                Assert.assertEquals(0L, createTestingSlotPool.getNumberOfPendingRequests());
                AllocationID allocationID = (AllocationID) completableFuture.get();
                SlotOffer slotOffer = new SlotOffer(allocationID, 0, AvailableSlotsTest.DEFAULT_TESTING_PROFILE);
                LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
                SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
                this.testMainThreadExecutor.execute(() -> {
                    return Boolean.valueOf(createTestingSlotPool.registerTaskManager(localTaskManagerLocation.getResourceID()));
                });
                Assert.assertTrue(((Boolean) this.testMainThreadExecutor.execute(() -> {
                    return Boolean.valueOf(createTestingSlotPool.offerSlot(localTaskManagerLocation, simpleAckingTaskManagerGateway, slotOffer));
                })).booleanValue());
                Assert.assertTrue(createTestingSlotPool.containsAvailableSlot(allocationID));
                if (createTestingSlotPool != null) {
                    if (0 == 0) {
                        createTestingSlotPool.close();
                        return;
                    }
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestingSlotPool != null) {
                if (th != null) {
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestingSlotPool.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testProviderAndOwnerSlotAllocationTimeout() throws Exception {
        TestingSlotPool createTestingSlotPool = createTestingSlotPool(new JobID());
        Throwable th = null;
        try {
            try {
                CompletableFuture completableFuture = new CompletableFuture();
                completableFuture.getClass();
                createTestingSlotPool.setReleaseSlotConsumer((v1) -> {
                    r1.complete(v1);
                });
                createTestingSlotPool.start(JobMasterId.generate(), "foobar", this.testMainThreadExecutor.getMainThreadExecutor());
                createTestingSlotPool.connectToResourceManager(new TestingResourceManagerGateway());
                SchedulerImpl schedulerImpl = new SchedulerImpl(LocationPreferenceSlotSelectionStrategy.createDefault(), createTestingSlotPool);
                schedulerImpl.start(this.testMainThreadExecutor.getMainThreadExecutor());
                try {
                    ((CompletableFuture) this.testMainThreadExecutor.execute(() -> {
                        return schedulerImpl.allocateSlot(new DummyScheduledUnit(), SlotProfile.noRequirements(), fastTimeout);
                    })).get();
                    Assert.fail("We expected a TimeoutException.");
                } catch (ExecutionException e) {
                    Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof TimeoutException);
                }
                completableFuture.get();
                Assert.assertEquals(0L, createTestingSlotPool.getNumberOfPendingRequests());
                if (createTestingSlotPool != null) {
                    if (0 == 0) {
                        createTestingSlotPool.close();
                        return;
                    }
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (createTestingSlotPool != null) {
                if (th != null) {
                    try {
                        createTestingSlotPool.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    createTestingSlotPool.close();
                }
            }
            throw th4;
        }
    }
}
