package org.apache.flink.runtime.jobmanager.scheduler;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotProfile;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.instance.Instance;
import org.apache.flink.runtime.instance.SlotSharingGroupId;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.SlotRequestId;
import org.apache.flink.runtime.jobmaster.slotpool.LocationPreferenceSchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SchedulingStrategy;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPool;
import org.apache.flink.runtime.jobmaster.slotpool.SlotPoolGateway;
import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
import org.apache.flink.runtime.jobmaster.slotpool.SlotSharingManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Before;
import org.junit.runners.Parameterized;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.class */
public class SchedulerTestBase extends TestLogger {
    protected TestingSlotProvider testingSlotProvider;
    protected SchedulerType schedulerType;
    private RpcService rpcService = null;

    /* renamed from: org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality = new int[Locality.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.LOCAL.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.UNCONSTRAINED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.NON_LOCAL.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[Locality.HOST_LOCAL.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$SchedulerTestBase$SchedulerType = new int[SchedulerType.values().length];
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$SchedulerTestBase$SchedulerType[SchedulerType.SCHEDULER.ordinal()] = 1;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$SchedulerTestBase$SchedulerType[SchedulerType.SLOT_POOL.ordinal()] = 2;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase$SchedulerType.class */
    public enum SchedulerType {
        SCHEDULER,
        SLOT_POOL
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase$TestingSchedulerSlotProvider.class */
    private static final class TestingSchedulerSlotProvider implements TestingSlotProvider {
        private final Scheduler scheduler;

        private TestingSchedulerSlotProvider(Scheduler scheduler) {
            this.scheduler = (Scheduler) Preconditions.checkNotNull(scheduler);
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, boolean z, SlotProfile slotProfile, Time time) {
            return this.scheduler.allocateSlot(scheduledUnit, z, slotProfile, time);
        }

        public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public TaskManagerLocation addTaskManager(int i) {
            Instance randomInstance = SchedulerTestUtils.getRandomInstance(i);
            this.scheduler.newInstanceAvailable(randomInstance);
            return randomInstance.getTaskManagerLocation();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public void releaseTaskManager(ResourceID resourceID) {
            Instance scheduler = this.scheduler.getInstance(resourceID);
            if (scheduler != null) {
                this.scheduler.instanceDied(scheduler);
            }
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfAvailableSlots() {
            return this.scheduler.getNumberOfAvailableSlots();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfLocalizedAssignments() {
            return this.scheduler.getNumberOfLocalizedAssignments();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfNonLocalizedAssignments() {
            return this.scheduler.getNumberOfNonLocalizedAssignments();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfUnconstrainedAssignments() {
            return this.scheduler.getNumberOfUnconstrainedAssignments();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfHostLocalizedAssignments() {
            return 0;
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
            return slotSharingGroup.getTaskAssignment().getNumberOfSlots();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexID) {
            return slotSharingGroup.getTaskAssignment().getNumberOfAvailableSlotsForGroup(jobVertexID);
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public void shutdown() {
            this.scheduler.shutdown();
        }

        /* synthetic */ TestingSchedulerSlotProvider(Scheduler scheduler, AnonymousClass1 anonymousClass1) {
            this(scheduler);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase$TestingSlotPool.class */
    public static final class TestingSlotPool extends SlotPool {
        public TestingSlotPool(RpcService rpcService, JobID jobID, SchedulingStrategy schedulingStrategy) {
            super(rpcService, jobID, schedulingStrategy);
        }

        CompletableFuture<Integer> getNumberOfAvailableSlots() {
            return callAsync(() -> {
                return Integer.valueOf(getAvailableSlots().size());
            }, TestingUtils.infiniteTime());
        }

        CompletableFuture<Integer> getNumberOfSharedSlots(SlotSharingGroupId slotSharingGroupId) {
            return callAsync(() -> {
                SlotSharingManager slotSharingManager = (SlotSharingManager) this.slotSharingManagers.get(slotSharingGroupId);
                if (slotSharingManager != null) {
                    return Integer.valueOf(slotSharingManager.getResolvedRootSlots().size());
                }
                throw new FlinkException("No MultiTaskSlotManager registered under " + slotSharingGroupId + '.');
            }, TestingUtils.infiniteTime());
        }

        CompletableFuture<Integer> getNumberOfAvailableSlotsForGroup(SlotSharingGroupId slotSharingGroupId, JobVertexID jobVertexID) {
            return callAsync(() -> {
                SlotSharingManager slotSharingManager = (SlotSharingManager) this.slotSharingManagers.get(slotSharingGroupId);
                if (slotSharingManager == null) {
                    throw new FlinkException("No MultiTaskSlotmanager registered under " + slotSharingGroupId + '.');
                }
                int i = 0;
                Iterator it = slotSharingManager.getResolvedRootSlots().iterator();
                while (it.hasNext()) {
                    if (!((SlotSharingManager.MultiTaskSlot) it.next()).contains(jobVertexID)) {
                        i++;
                    }
                }
                return Integer.valueOf(i);
            }, TestingUtils.infiniteTime());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase$TestingSlotPoolSlotProvider.class */
    private static final class TestingSlotPoolSlotProvider implements TestingSlotProvider {
        private final TestingSlotPool slotPool;
        private final SlotProvider slotProvider;
        private final AtomicInteger numberOfLocalizedAssignments;
        private final AtomicInteger numberOfNonLocalizedAssignments;
        private final AtomicInteger numberOfUnconstrainedAssignments;
        private final AtomicInteger numberOfHostLocalizedAssignments;

        private TestingSlotPoolSlotProvider(TestingSlotPool testingSlotPool) {
            this.slotPool = (TestingSlotPool) Preconditions.checkNotNull(testingSlotPool);
            this.slotProvider = testingSlotPool.getSlotProvider();
            this.numberOfLocalizedAssignments = new AtomicInteger();
            this.numberOfNonLocalizedAssignments = new AtomicInteger();
            this.numberOfUnconstrainedAssignments = new AtomicInteger();
            this.numberOfHostLocalizedAssignments = new AtomicInteger();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public TaskManagerLocation addTaskManager(int i) {
            LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
            ResourceID resourceID = localTaskManagerLocation.getResourceID();
            SlotPoolGateway selfGateway = this.slotPool.getSelfGateway(SlotPoolGateway.class);
            try {
                selfGateway.registerTaskManager(resourceID).get();
                SimpleAckingTaskManagerGateway simpleAckingTaskManagerGateway = new SimpleAckingTaskManagerGateway();
                ArrayList arrayList = new ArrayList(i);
                for (int i2 = 0; i2 < i; i2++) {
                    arrayList.add(new SlotOffer(new AllocationID(), i2, ResourceProfile.UNKNOWN));
                }
                try {
                    Preconditions.checkState(((Collection) selfGateway.offerSlots(localTaskManagerLocation, simpleAckingTaskManagerGateway, arrayList).get()).size() == i);
                    return localTaskManagerLocation;
                } catch (Exception e) {
                    throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e);
                }
            } catch (Exception e2) {
                throw new RuntimeException("Unexpected exception occurred. This indicates a programming bug.", e2);
            }
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public void releaseTaskManager(ResourceID resourceID) {
            try {
                this.slotPool.releaseTaskManager(resourceID, null).get();
            } catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfAvailableSlots() {
            try {
                return this.slotPool.getNumberOfAvailableSlots().get().intValue();
            } catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfLocalizedAssignments() {
            return this.numberOfLocalizedAssignments.get();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfNonLocalizedAssignments() {
            return this.numberOfNonLocalizedAssignments.get();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfUnconstrainedAssignments() {
            return this.numberOfUnconstrainedAssignments.get();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfHostLocalizedAssignments() {
            return this.numberOfHostLocalizedAssignments.get();
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfSlots(SlotSharingGroup slotSharingGroup) {
            try {
                return this.slotPool.getNumberOfSharedSlots(slotSharingGroup.getSlotSharingGroupId()).get().intValue();
            } catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexID) {
            try {
                return this.slotPool.getNumberOfAvailableSlotsForGroup(slotSharingGroup.getSlotSharingGroupId(), jobVertexID).get().intValue();
            } catch (Exception e) {
                throw new RuntimeException("Should not have happened.", e);
            }
        }

        @Override // org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestBase.TestingSlotProvider
        public void shutdown() throws Exception {
            RpcUtils.terminateRpcEndpoint(this.slotPool, TestingUtils.TIMEOUT());
        }

        public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, boolean z, SlotProfile slotProfile, Time time) {
            return this.slotProvider.allocateSlot(scheduledUnit, z, slotProfile, time).thenApply(logicalSlot -> {
                switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$jobmanager$scheduler$Locality[logicalSlot.getLocality().ordinal()]) {
                    case 1:
                        this.numberOfLocalizedAssignments.incrementAndGet();
                        break;
                    case 2:
                        this.numberOfUnconstrainedAssignments.incrementAndGet();
                        break;
                    case 3:
                        this.numberOfNonLocalizedAssignments.incrementAndGet();
                        break;
                    case 4:
                        this.numberOfHostLocalizedAssignments.incrementAndGet();
                        break;
                }
                return logicalSlot;
            });
        }

        public CompletableFuture<Acknowledge> cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable th) {
            return CompletableFuture.completedFuture(Acknowledge.get());
        }

        /* synthetic */ TestingSlotPoolSlotProvider(TestingSlotPool testingSlotPool, AnonymousClass1 anonymousClass1) {
            this(testingSlotPool);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase$TestingSlotProvider.class */
    protected interface TestingSlotProvider extends SlotProvider {
        TaskManagerLocation addTaskManager(int i);

        void releaseTaskManager(ResourceID resourceID);

        int getNumberOfAvailableSlots();

        int getNumberOfLocalizedAssignments();

        int getNumberOfNonLocalizedAssignments();

        int getNumberOfUnconstrainedAssignments();

        int getNumberOfHostLocalizedAssignments();

        int getNumberOfSlots(SlotSharingGroup slotSharingGroup);

        int getNumberOfAvailableSlotsForGroup(SlotSharingGroup slotSharingGroup, JobVertexID jobVertexID);

        void shutdown() throws Exception;
    }

    @Parameterized.Parameters(name = "Scheduler type = {0}")
    public static Collection<Object[]> schedulerTypes() {
        return Arrays.asList(new Object[]{SchedulerType.SCHEDULER}, new Object[]{SchedulerType.SLOT_POOL});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SchedulerTestBase(SchedulerType schedulerType) {
        this.schedulerType = (SchedulerType) Preconditions.checkNotNull(schedulerType);
    }

    @Before
    public void setup() throws Exception {
        switch (this.schedulerType) {
            case SCHEDULER:
                this.testingSlotProvider = new TestingSchedulerSlotProvider(new Scheduler(TestingUtils.defaultExecutionContext()), null);
                return;
            case SLOT_POOL:
                this.rpcService = new TestingRpcService();
                TestingSlotPool testingSlotPool = new TestingSlotPool(this.rpcService, new JobID(), LocationPreferenceSchedulingStrategy.getInstance());
                this.testingSlotProvider = new TestingSlotPoolSlotProvider(testingSlotPool, null);
                testingSlotPool.start(JobMasterId.generate(), "localhost");
                return;
            default:
                return;
        }
    }

    @After
    public void teardown() throws Exception {
        if (this.testingSlotProvider != null) {
            this.testingSlotProvider.shutdown();
            this.testingSlotProvider = null;
        }
        if (this.rpcService != null) {
            this.rpcService.stopService().get();
            this.rpcService = null;
        }
    }
}
