/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.functions.worker.scheduler;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;

public class RoundRobinScheduler
implements IScheduler {
    @Override
    public List<Function.Assignment> schedule(List<Function.Instance> unassignedFunctionInstances, List<Function.Assignment> currentAssignments, List<String> workers) {
        HashMap<String, List<Function.Assignment>> workerIdToAssignment = new HashMap<String, List<Function.Assignment>>();
        for (String workerId : workers) {
            workerIdToAssignment.put(workerId, new LinkedList());
        }
        for (Function.Assignment existingAssignment : currentAssignments) {
            ((List)workerIdToAssignment.get(existingAssignment.getWorkerId())).add(existingAssignment);
        }
        for (Function.Instance unassignedFunctionInstance : unassignedFunctionInstances) {
            String workerId = this.findNextWorker(workerIdToAssignment);
            Function.Assignment newAssignment = Function.Assignment.newBuilder().setInstance(unassignedFunctionInstance).setWorkerId(workerId).build();
            ((List)workerIdToAssignment.get(workerId)).add(newAssignment);
        }
        List<Function.Assignment> assignments = workerIdToAssignment.entrySet().stream().flatMap(entry -> ((List)entry.getValue()).stream()).collect(Collectors.toList());
        return assignments;
    }

    private String findNextWorker(Map<String, List<Function.Assignment>> workerIdToAssignment) {
        String targetWorkerId = null;
        int least = Integer.MAX_VALUE;
        for (Map.Entry<String, List<Function.Assignment>> entry : workerIdToAssignment.entrySet()) {
            String workerId = entry.getKey();
            List<Function.Assignment> workerAssigments = entry.getValue();
            if (workerAssigments.size() >= least) continue;
            targetWorkerId = workerId;
            least = workerAssigments.size();
        }
        return targetWorkerId;
    }
}

