package org.apache.flink.runtime.jobmaster.slotpool;

import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.util.TestLogger;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/slotpool/SlotPoolSlotSpreadOutTest.class */
public class SlotPoolSlotSpreadOutTest extends TestLogger {
    public static final Time TIMEOUT = Time.seconds(10);

    @Rule
    public final SlotPoolResource slotPoolResource = new SlotPoolResource(LocationPreferenceSlotSelectionStrategy.createEvenlySpreadOut());

    @Test
    public void allocateSingleSlot_withNoRequirements_selectsSlotSoThatWorkloadIsSpreadOut() {
        registerTaskExecutors(2, 4);
        ScheduledUnit createSimpleSlotRequest = createSimpleSlotRequest();
        ScheduledUnit createSimpleSlotRequest2 = createSimpleSlotRequest();
        MatcherAssert.assertThat(getTaskManagerLocation(allocateSlot(createSimpleSlotRequest)), Matchers.is(Matchers.not(Matchers.equalTo(getTaskManagerLocation(allocateSlot(createSimpleSlotRequest2))))));
    }

    @Test
    public void allocateSingleSlot_withInputPreference_inputPreferenceHasPrecedenceOverSpreadOut() {
        registerTaskExecutors(2, 2);
        ScheduledUnit createSimpleSlotRequest = createSimpleSlotRequest();
        ScheduledUnit createSimpleSlotRequest2 = createSimpleSlotRequest();
        TaskManagerLocation taskManagerLocation = getTaskManagerLocation(allocateSlot(createSimpleSlotRequest));
        MatcherAssert.assertThat(getTaskManagerLocation(allocateSlotWithInputPreference(createSimpleSlotRequest2, Collections.singleton(taskManagerLocation))), Matchers.is(Matchers.equalTo(taskManagerLocation)));
    }

    @Test
    public void allocateSharedSlot_withNoRequirements_selectsSlotsSoThatWorkloadIsSpreadOut() {
        registerTaskExecutors(2, 2);
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        SlotSharingGroupId slotSharingGroupId = new SlotSharingGroupId();
        List list = (List) IntStream.range(0, 4).mapToObj(i -> {
            return createSharedSlotRequest(jobVertexID, slotSharingGroupId);
        }).collect(Collectors.toList());
        List list2 = (List) IntStream.range(0, 2).mapToObj(i2 -> {
            return createSharedSlotRequest(jobVertexID2, slotSharingGroupId);
        }).collect(Collectors.toList());
        list.forEach(this::allocateSlot);
        MatcherAssert.assertThat((Set) list2.stream().map(this::allocateSlot).map(this::getTaskManagerLocation).collect(Collectors.toSet()), Matchers.hasSize(2));
    }

    private ScheduledUnit createSharedSlotRequest(JobVertexID jobVertexID, SlotSharingGroupId slotSharingGroupId) {
        return new ScheduledUnit(jobVertexID, slotSharingGroupId, (CoLocationConstraint) null);
    }

    private ScheduledUnit createSimpleSlotRequest() {
        return new ScheduledUnit(new JobVertexID(), new SlotSharingGroupId(), (CoLocationConstraint) null);
    }

    private CompletableFuture<LogicalSlot> allocateSlot(ScheduledUnit scheduledUnit) {
        return internalAllocateSlot(scheduledUnit, SlotProfile.noRequirements());
    }

    private CompletableFuture<LogicalSlot> internalAllocateSlot(ScheduledUnit scheduledUnit, SlotProfile slotProfile) {
        return this.slotPoolResource.getSlotProvider().allocateSlot(new SlotRequestId(), scheduledUnit, slotProfile, TIMEOUT);
    }

    private CompletableFuture<LogicalSlot> allocateSlotWithInputPreference(ScheduledUnit scheduledUnit, Collection<TaskManagerLocation> collection) {
        return internalAllocateSlot(scheduledUnit, SlotProfile.preferredLocality(ResourceProfile.UNKNOWN, collection));
    }

    private TaskManagerLocation getTaskManagerLocation(CompletableFuture<? extends LogicalSlot> completableFuture) {
        return completableFuture.join().getTaskManagerLocation();
    }

    private void registerTaskExecutors(int i, int i2) {
        for (int i3 = 0; i3 < i; i3++) {
            registerTaskExecutor(i2);
        }
    }

    private void registerTaskExecutor(int i) {
        SlotPoolImpl slotPool = this.slotPoolResource.getSlotPool();
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        slotPool.registerTaskManager(localTaskManagerLocation.getResourceID());
        slotPool.offerSlots(localTaskManagerLocation, new SimpleAckingTaskManagerGateway(), (Collection) IntStream.range(0, i).mapToObj(i2 -> {
            return new SlotOffer(new AllocationID(), i2, ResourceProfile.ANY);
        }).collect(Collectors.toList()));
    }
}
