package org.apache.flink.runtime.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.scheduler.ExecutionVertexSchedulingRequirements;
import org.apache.flink.runtime.scheduler.TestingInputsLocationsRetriever;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest.class */
public class DefaultExecutionSlotAllocatorTest extends TestLogger {
    private AllocationToggableSlotProvider slotProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/scheduler/DefaultExecutionSlotAllocatorTest$AllocationToggableSlotProvider.class */
    public static class AllocationToggableSlotProvider implements SlotProvider {
        private final List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> slotAllocationRequests;
        private final List<SlotRequestId> cancelledSlotRequestIds;
        private boolean slotAllocationDisabled;

        private AllocationToggableSlotProvider() {
            this.slotAllocationRequests = new ArrayList();
            this.cancelledSlotRequestIds = new ArrayList();
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean z, Time time) {
            this.slotAllocationRequests.add(Tuple3.of(slotRequestId, scheduledUnit, slotProfile));
            return this.slotAllocationDisabled ? new CompletableFuture<>() : CompletableFuture.completedFuture(new TestingLogicalSlot());
        }

        public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
            this.cancelledSlotRequestIds.add(slotRequestId);
        }

        public List<Tuple3<SlotRequestId, ScheduledUnit, SlotProfile>> getSlotAllocationRequests() {
            return Collections.unmodifiableList(this.slotAllocationRequests);
        }

        public List<SlotRequestId> getReceivedSlotRequestIds() {
            return (List) this.slotAllocationRequests.stream().map(tuple3 -> {
                return (SlotRequestId) tuple3.f0;
            }).collect(Collectors.toList());
        }

        public List<SlotRequestId> getCancelledSlotRequestIds() {
            return Collections.unmodifiableList(this.cancelledSlotRequestIds);
        }

        public void disableSlotAllocation() {
            this.slotAllocationDisabled = true;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.slotProvider = new AllocationToggableSlotProvider();
    }

    @Test
    public void testConsumersAssignedToSlotsAfterProducers() {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        ExecutionVertexID executionVertexID2 = new ExecutionVertexID(new JobVertexID(), 0);
        TestingInputsLocationsRetriever build = new TestingInputsLocationsRetriever.Builder().connectConsumerToProducer(executionVertexID2, executionVertexID).build();
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator(build);
        build.markScheduled(executionVertexID);
        build.markScheduled(executionVertexID2);
        Collection<SlotExecutionVertexAssignment> allocateSlotsFor = createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements(executionVertexID, executionVertexID2));
        Assert.assertThat(allocateSlotsFor, Matchers.hasSize(2));
        SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId = findSlotAssignmentByExecutionVertexId(executionVertexID, allocateSlotsFor);
        SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId2 = findSlotAssignmentByExecutionVertexId(executionVertexID2, allocateSlotsFor);
        Assert.assertTrue(findSlotAssignmentByExecutionVertexId.getLogicalSlotFuture().isDone());
        Assert.assertFalse(findSlotAssignmentByExecutionVertexId2.getLogicalSlotFuture().isDone());
        build.assignTaskManagerLocation(executionVertexID);
        Assert.assertTrue(findSlotAssignmentByExecutionVertexId2.getLogicalSlotFuture().isDone());
        Assert.assertEquals(0L, createExecutionSlotAllocator.getNumberOfPendingSlotAssignments());
    }

    @Test
    public void testAllocateSlotsParameters() {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        AllocationID allocationID = new AllocationID();
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        ResourceProfile resourceProfile = new ResourceProfile(0.5d, 250);
        CoLocationConstraint locationConstraint = new CoLocationGroup().getLocationConstraint(0);
        Set singleton = Collections.singleton(new LocalTaskManagerLocation());
        createExecutionSlotAllocator().allocateSlotsFor(Arrays.asList(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(executionVertexID).withPreviousAllocationId(allocationID).withSlotSharingGroupId(slotSharingGroupId).withPreferredLocations(singleton).withResourceProfile(resourceProfile).withCoLocationConstraint(locationConstraint).build()));
        Assert.assertThat(this.slotProvider.getSlotAllocationRequests(), Matchers.hasSize(1));
        ScheduledUnit scheduledUnit = (ScheduledUnit) this.slotProvider.getSlotAllocationRequests().get(0).f1;
        SlotProfile slotProfile = (SlotProfile) this.slotProvider.getSlotAllocationRequests().get(0).f2;
        Assert.assertEquals(slotSharingGroupId, scheduledUnit.getSlotSharingGroupId());
        Assert.assertEquals(locationConstraint, scheduledUnit.getCoLocationConstraint());
        Assert.assertThat(slotProfile.getPreferredAllocations(), Matchers.contains(new AllocationID[]{allocationID}));
        Assert.assertThat(slotProfile.getPreviousExecutionGraphAllocations(), Matchers.contains(new AllocationID[]{allocationID}));
        Assert.assertEquals(resourceProfile, slotProfile.getResourceProfile());
        Assert.assertThat(slotProfile.getPreferredLocations(), Matchers.contains(singleton.toArray()));
    }

    @Test
    public void testCancelNonExistingExecutionVertex() {
        createExecutionSlotAllocator().cancel(new ExecutionVertexID(new JobVertexID(), 0));
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), CoreMatchers.is(Matchers.empty()));
    }

    @Test
    public void testCancelFulfilledSlotRequest() {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator();
        createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements(executionVertexID));
        createExecutionSlotAllocator.cancel(executionVertexID);
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), CoreMatchers.is(Matchers.empty()));
    }

    @Test
    public void testCancelUnFulfilledSlotRequest() throws Exception {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator();
        this.slotProvider.disableSlotAllocation();
        Collection allocateSlotsFor = createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements(executionVertexID));
        createExecutionSlotAllocator.cancel(executionVertexID);
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), Matchers.hasSize(1));
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), Matchers.contains(this.slotProvider.getReceivedSlotRequestIds().toArray()));
        try {
            ((SlotExecutionVertexAssignment) allocateSlotsFor.iterator().next()).getLogicalSlotFuture().get();
            Assert.fail("Expect a CancellationException but got nothing.");
        } catch (CancellationException e) {
        }
    }

    @Test
    public void testStop() throws Exception {
        ExecutionVertexID executionVertexID = new ExecutionVertexID(new JobVertexID(), 0);
        DefaultExecutionSlotAllocator createExecutionSlotAllocator = createExecutionSlotAllocator();
        this.slotProvider.disableSlotAllocation();
        createExecutionSlotAllocator.allocateSlotsFor(createSchedulingRequirements(executionVertexID));
        createExecutionSlotAllocator.stop().get();
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), Matchers.hasSize(1));
        Assert.assertThat(this.slotProvider.getCancelledSlotRequestIds(), Matchers.contains(this.slotProvider.getReceivedSlotRequestIds().toArray()));
        Assert.assertEquals(0L, createExecutionSlotAllocator.getNumberOfPendingSlotAssignments());
    }

    @Test
    public void testComputeAllPriorAllocationIds() {
        List asList = Arrays.asList(new AllocationID(), new AllocationID());
        Assert.assertThat(DefaultExecutionSlotAllocator.computeAllPriorAllocationIds(Arrays.asList(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 0)).withPreviousAllocationId((AllocationID) asList.get(0)).build(), new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 1)).withPreviousAllocationId((AllocationID) asList.get(0)).build(), new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 2)).withPreviousAllocationId((AllocationID) asList.get(1)).build(), new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(new ExecutionVertexID(new JobVertexID(), 3)).build())), Matchers.containsInAnyOrder(asList.toArray()));
    }

    private DefaultExecutionSlotAllocator createExecutionSlotAllocator() {
        return createExecutionSlotAllocator(new TestingInputsLocationsRetriever.Builder().build());
    }

    private DefaultExecutionSlotAllocator createExecutionSlotAllocator(InputsLocationsRetriever inputsLocationsRetriever) {
        return new DefaultExecutionSlotAllocator(this.slotProvider, inputsLocationsRetriever, Time.seconds(10L));
    }

    private List<ExecutionVertexSchedulingRequirements> createSchedulingRequirements(ExecutionVertexID... executionVertexIDArr) {
        ArrayList arrayList = new ArrayList(executionVertexIDArr.length);
        for (ExecutionVertexID executionVertexID : executionVertexIDArr) {
            arrayList.add(new ExecutionVertexSchedulingRequirements.Builder().withExecutionVertexId(executionVertexID).build());
        }
        return arrayList;
    }

    private SlotExecutionVertexAssignment findSlotAssignmentByExecutionVertexId(ExecutionVertexID executionVertexID, Collection<SlotExecutionVertexAssignment> collection) {
        return collection.stream().filter(slotExecutionVertexAssignment -> {
            return slotExecutionVertexAssignment.getExecutionVertexId().equals(executionVertexID);
        }).findFirst().orElseThrow(() -> {
            return new IllegalArgumentException(String.format("SlotExecutionVertexAssignment with execution vertex id %s not found", executionVertexID));
        });
    }
}
