package org.apache.pulsar.functions.worker;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.client.api.CompressionType;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.Request;
import org.apache.pulsar.functions.utils.Reflections;
import org.apache.pulsar.functions.worker.scheduler.IScheduler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/SchedulerManager.class */
public class SchedulerManager implements AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(SchedulerManager.class);
    private final WorkerConfig workerConfig;
    private FunctionMetaDataManager functionMetaDataManager;
    private MembershipManager membershipManager;
    private FunctionRuntimeManager functionRuntimeManager;
    private final IScheduler scheduler;
    private final Producer<byte[]> producer;
    private final ExecutorService executorService;

    public SchedulerManager(WorkerConfig workerConfig, PulsarClient pulsarClient) {
        this.workerConfig = workerConfig;
        this.scheduler = (IScheduler) Reflections.createInstance(workerConfig.getSchedulerClassName(), IScheduler.class, Thread.currentThread().getContextClassLoader());
        try {
            this.producer = pulsarClient.newProducer().topic(this.workerConfig.getFunctionAssignmentTopic()).enableBatching(true).blockIfQueueFull(true).compressionType(CompressionType.LZ4).sendTimeout(0, TimeUnit.MILLISECONDS).create();
            this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
        } catch (PulsarClientException e) {
            log.error("Failed to create producer to function assignment topic " + this.workerConfig.getFunctionAssignmentTopic(), e);
            throw new RuntimeException(e);
        }
    }

    public Future<?> schedule() {
        return this.executorService.submit(() -> {
            synchronized (this) {
                if (this.membershipManager.isLeader()) {
                    invokeScheduler();
                }
            }
        });
    }

    private void invokeScheduler() {
        List<String> list = (List) this.membershipManager.getCurrentMembership().stream().map(workerInfo -> {
            return workerInfo.getWorkerId();
        }).collect(Collectors.toList());
        Map<String, Function.Instance> computeAllInstances = computeAllInstances(this.functionMetaDataManager.getAllFunctionMetaData());
        Map<String, Map<String, Function.Assignment>> currentAssignments = this.functionRuntimeManager.getCurrentAssignments();
        Iterator<Map.Entry<String, Map<String, Function.Assignment>>> it = currentAssignments.entrySet().iterator();
        while (it.hasNext()) {
            Map<String, Function.Assignment> value = it.next().getValue();
            value.entrySet().removeIf(entry -> {
                return !computeAllInstances.containsKey((String) entry.getKey());
            });
            for (Map.Entry<String, Function.Assignment> entry2 : value.entrySet()) {
                String key = entry2.getKey();
                Function.Assignment value2 = entry2.getValue();
                Function.Instance instance = computeAllInstances.get(key);
                if (!value2.getInstance().equals(instance)) {
                    value.put(key, value2.toBuilder().setInstance(instance).build());
                }
            }
            if (value.isEmpty()) {
                it.remove();
            }
        }
        List<Function.Assignment> schedule = this.scheduler.schedule(getUnassignedFunctionInstances(currentAssignments, computeAllInstances), (List) currentAssignments.entrySet().stream().flatMap(entry3 -> {
            return ((Map) entry3.getValue()).values().stream();
        }).collect(Collectors.toList()), list);
        log.debug("New assignments computed: {}", schedule);
        long currentAssignmentVersion = this.functionRuntimeManager.getCurrentAssignmentVersion() + 1;
        try {
            this.producer.sendAsync((Producer<byte[]>) Request.AssignmentsUpdate.newBuilder().setVersion(currentAssignmentVersion).addAllAssignments(schedule).build().toByteArray()).get();
            int i = 0;
            while (this.functionRuntimeManager.getCurrentAssignmentVersion() < currentAssignmentVersion) {
                if (i >= this.workerConfig.getAssignmentWriteMaxRetries()) {
                    log.warn("Max number of retries reached for waiting for assignment to propagate. Will continue now.");
                    return;
                }
                log.info("Waiting for assignments to propagate...");
                try {
                    Thread.sleep(500L);
                    i++;
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            }
        } catch (InterruptedException | ExecutionException e2) {
            log.error("Failed to send assignment update", e2);
            throw new RuntimeException(e2);
        }
    }

    public static Map<String, Function.Instance> computeAllInstances(List<Function.FunctionMetaData> list) {
        HashMap hashMap = new HashMap();
        Iterator<Function.FunctionMetaData> it = list.iterator();
        while (it.hasNext()) {
            for (Function.Instance instance : computeInstances(it.next())) {
                hashMap.put(Utils.getFullyQualifiedInstanceId(instance), instance);
            }
        }
        return hashMap;
    }

    public static List<Function.Instance> computeInstances(Function.FunctionMetaData functionMetaData) {
        LinkedList linkedList = new LinkedList();
        int parallelism = functionMetaData.getFunctionDetails().getParallelism();
        for (int i = 0; i < parallelism; i++) {
            linkedList.add(Function.Instance.newBuilder().setFunctionMetaData(functionMetaData).setInstanceId(i).build());
        }
        return linkedList;
    }

    private List<Function.Instance> getUnassignedFunctionInstances(Map<String, Map<String, Function.Assignment>> map, Map<String, Function.Instance> map2) {
        LinkedList linkedList = new LinkedList();
        HashMap hashMap = new HashMap();
        Iterator<Map<String, Function.Assignment>> it = map.values().iterator();
        while (it.hasNext()) {
            hashMap.putAll(it.next());
        }
        for (Map.Entry<String, Function.Instance> entry : map2.entrySet()) {
            String key = entry.getKey();
            Function.Instance value = entry.getValue();
            if (!hashMap.containsKey(key)) {
                linkedList.add(value);
            }
        }
        return linkedList;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            this.producer.close();
        } catch (PulsarClientException e) {
            log.warn("Failed to shutdown scheduler manager assignment producer", e);
        }
        this.executorService.shutdown();
    }

    public void setFunctionMetaDataManager(FunctionMetaDataManager functionMetaDataManager) {
        this.functionMetaDataManager = functionMetaDataManager;
    }

    public void setMembershipManager(MembershipManager membershipManager) {
        this.membershipManager = membershipManager;
    }

    public void setFunctionRuntimeManager(FunctionRuntimeManager functionRuntimeManager) {
        this.functionRuntimeManager = functionRuntimeManager;
    }
}
