package org.apache.storm.daemon.supervisor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import org.apache.storm.cluster.IStormClusterState;
import org.apache.storm.daemon.supervisor.Slot;
import org.apache.storm.generated.Assignment;
import org.apache.storm.generated.ExecutorInfo;
import org.apache.storm.generated.LocalAssignment;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.generated.ProfileRequest;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.localizer.AsyncLocalizer;
import org.apache.storm.metricstore.MetricStoreConfig;
import org.apache.storm.metricstore.WorkerMetricsProcessor;
import org.apache.storm.scheduler.ISupervisor;
import org.apache.storm.utils.LocalState;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/daemon/supervisor/ReadClusterState.class */
public class ReadClusterState implements Runnable, AutoCloseable {
    private static final long ERROR_MILLIS = 60000;
    private static final long WARN_MILLIS = 1000;
    private final Map<String, Object> superConf;
    private final IStormClusterState stormClusterState;
    private final Map<Integer, Slot> slots = new HashMap();
    private final AtomicInteger readRetry = new AtomicInteger(0);
    private final String assignmentId;
    private final int supervisorPort;
    private final ISupervisor supervisor;
    private final AsyncLocalizer localizer;
    private final ContainerLauncher launcher;
    private final String host;
    private final LocalState localState;
    private final AtomicReference<Map<Long, LocalAssignment>> cachedAssignments;
    private final OnlyLatestExecutor<Integer> metricsExec;
    private final SlotMetrics slotMetrics;
    private WorkerMetricsProcessor metricsProcessor;
    private static final Logger LOG = LoggerFactory.getLogger(ReadClusterState.class);
    public static final UniFunc<Slot> DEFAULT_ON_ERROR_TIMEOUT = slot -> {
        throw new IllegalStateException("It took over 60000ms to shut down slot " + slot);
    };
    public static final UniFunc<Slot> THREAD_DUMP_ON_ERROR = slot -> {
        LOG.warn("Shutdown of slot {} appears to be stuck\n{}", slot, Utils.threadDump());
        DEFAULT_ON_ERROR_TIMEOUT.call(slot);
    };
    public static final BiConsumer<Slot, Long> DEFAULT_ON_WARN_TIMEOUT = (slot, l) -> {
        LOG.warn("It has taken {}ms so far and {} is still not shut down.", l, slot);
    };

    public ReadClusterState(Supervisor supervisor) throws Exception {
        this.superConf = supervisor.getConf();
        this.stormClusterState = supervisor.getStormClusterState();
        this.assignmentId = supervisor.getAssignmentId();
        this.supervisorPort = supervisor.getThriftServerPort();
        this.supervisor = supervisor.getiSupervisor();
        this.localizer = supervisor.getAsyncLocalizer();
        this.host = supervisor.getHostName();
        this.localState = supervisor.getLocalState();
        this.cachedAssignments = supervisor.getCurrAssignment();
        this.metricsExec = new OnlyLatestExecutor<>(supervisor.getHeartbeatExecutor());
        this.slotMetrics = supervisor.getSlotMetrics();
        this.launcher = ContainerLauncher.make(this.superConf, this.assignmentId, this.supervisorPort, supervisor.getSharedContext(), supervisor.getMetricsRegistry(), supervisor.getContainerMemoryTracker(), supervisor.getSupervisorThriftInterface());
        this.metricsProcessor = null;
        try {
            this.metricsProcessor = MetricStoreConfig.configureMetricProcessor(this.superConf);
        } catch (Exception e) {
            LOG.error("Failed to initialize metric processor", e);
        }
        for (Integer num : SupervisorUtils.getSlotsPorts(this.superConf)) {
            this.slots.put(num, mkSlot(num.intValue()));
        }
        try {
            Collection<String> supervisorWorkerIds = SupervisorUtils.supervisorWorkerIds(this.superConf);
            Iterator<Slot> it = this.slots.values().iterator();
            while (it.hasNext()) {
                String workerId = it.next().getWorkerId();
                if (workerId != null) {
                    supervisorWorkerIds.remove(workerId);
                }
            }
            if (!supervisorWorkerIds.isEmpty()) {
                LOG.info("Killing detached workers {}", supervisorWorkerIds);
                supervisor.killWorkers(supervisorWorkerIds, this.launcher);
            }
        } catch (Exception e2) {
            LOG.warn("Error trying to clean up old workers", e2);
        }
        Iterator<Slot> it2 = this.slots.values().iterator();
        while (it2.hasNext()) {
            it2.next().start();
        }
    }

    private Slot mkSlot(int i) throws Exception {
        return new Slot(this.localizer, this.superConf, this.launcher, this.host, i, this.localState, this.stormClusterState, this.supervisor, this.cachedAssignments, this.metricsExec, this.metricsProcessor, this.slotMetrics);
    }

    @Override // java.lang.Runnable
    public synchronized void run() {
        try {
            List<String> assignments = this.stormClusterState.assignments((Runnable) null);
            Map<Integer, LocalAssignment> readAssignments = readAssignments(getAssignmentsSnapshot(this.stormClusterState));
            if (readAssignments == null) {
                return;
            }
            Map<String, List<ProfileRequest>> profileActions = getProfileActions(this.stormClusterState, assignments);
            HashSet hashSet = new HashSet();
            LOG.debug("Synchronizing supervisor");
            LOG.debug("All assignment: {}", readAssignments);
            LOG.debug("Topology Ids -> Profiler Actions {}", profileActions);
            for (Integer num : readAssignments.keySet()) {
                if (this.supervisor.confirmAssigned(num.intValue())) {
                    hashSet.add(num);
                }
            }
            HashSet hashSet2 = new HashSet(hashSet);
            this.supervisor.assigned(hashSet2);
            hashSet2.addAll(this.slots.keySet());
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, List<ProfileRequest>> entry : profileActions.entrySet()) {
                String key = entry.getKey();
                if (entry.getValue() != null) {
                    for (ProfileRequest profileRequest : entry.getValue()) {
                        NodeInfo nodeInfo = profileRequest.get_nodeInfo();
                        if (this.host.equals(nodeInfo.get_node())) {
                            Long l = (Long) nodeInfo.get_port().iterator().next();
                            Set set = (Set) hashMap.get(Integer.valueOf(l.intValue()));
                            if (set == null) {
                                set = new HashSet();
                                hashMap.put(Integer.valueOf(l.intValue()), set);
                            }
                            set.add(new Slot.TopoProfileAction(key, profileRequest));
                        }
                    }
                }
            }
            Iterator it = hashSet2.iterator();
            while (it.hasNext()) {
                Integer num2 = (Integer) it.next();
                Slot slot = this.slots.get(num2);
                if (slot == null) {
                    slot = mkSlot(num2.intValue());
                    this.slots.put(num2, slot);
                    slot.start();
                }
                slot.setNewAssignment(readAssignments.get(num2));
                slot.addProfilerActions((Set) hashMap.get(num2));
            }
        } catch (Exception e) {
            LOG.error("Failed to Sync Supervisor", e);
            throw new RuntimeException(e);
        }
    }

    protected Map<String, Assignment> getAssignmentsSnapshot(IStormClusterState iStormClusterState) throws Exception {
        return iStormClusterState.assignmentsInfo();
    }

    protected Map<String, List<ProfileRequest>> getProfileActions(IStormClusterState iStormClusterState, List<String> list) throws Exception {
        HashMap hashMap = new HashMap();
        for (String str : list) {
            hashMap.put(str, iStormClusterState.getTopologyProfileRequests(str));
        }
        return hashMap;
    }

    protected Map<Integer, LocalAssignment> readAssignments(Map<String, Assignment> map) {
        try {
            HashMap hashMap = new HashMap();
            for (Map.Entry<String, Assignment> entry : map.entrySet()) {
                for (Map.Entry<Integer, LocalAssignment> entry2 : readMyExecutors(entry.getKey(), this.assignmentId, entry.getValue()).entrySet()) {
                    Integer key = entry2.getKey();
                    LocalAssignment value = entry2.getValue();
                    if (hashMap.containsKey(key)) {
                        throw new RuntimeException("Should not have multiple topologies assigned to one port " + key + " " + value + " " + hashMap);
                    }
                    hashMap.put(key, value);
                }
            }
            this.readRetry.set(0);
            return hashMap;
        } catch (RuntimeException e) {
            if (this.readRetry.get() > 2) {
                throw e;
            }
            this.readRetry.addAndGet(1);
            LOG.warn("{} : retrying {} of 3", e.getMessage(), Integer.valueOf(this.readRetry.get()));
            return null;
        }
    }

    protected Map<Integer, LocalAssignment> readMyExecutors(String str, String str2, Assignment assignment) {
        Double d;
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        Map map = assignment.get_worker_resources();
        if (map != null) {
            for (Map.Entry entry : map.entrySet()) {
                if (((NodeInfo) entry.getKey()).get_node().startsWith(str2)) {
                    Iterator it = ((NodeInfo) entry.getKey()).get_port().iterator();
                    while (it.hasNext()) {
                        hashMap2.put((Long) it.next(), (WorkerResources) entry.getValue());
                    }
                }
            }
        }
        boolean z = false;
        double d2 = 0.0d;
        if (assignment.is_set_total_shared_off_heap() && (d = (Double) assignment.get_total_shared_off_heap().get(str2)) != null) {
            d2 = d.doubleValue();
            z = true;
        }
        Map map2 = assignment.get_executor_node_port();
        if (map2 != null) {
            for (Map.Entry entry2 : map2.entrySet()) {
                if (((NodeInfo) entry2.getValue()).get_node().startsWith(str2)) {
                    for (Long l : ((NodeInfo) entry2.getValue()).get_port()) {
                        LocalAssignment localAssignment = (LocalAssignment) hashMap.get(Integer.valueOf(l.intValue()));
                        if (localAssignment == null) {
                            localAssignment = new LocalAssignment(str, new ArrayList());
                            if (hashMap2.containsKey(l)) {
                                localAssignment.set_resources((WorkerResources) hashMap2.get(l));
                            }
                            if (z) {
                                localAssignment.set_total_node_shared(d2);
                            }
                            if (assignment.is_set_owner()) {
                                localAssignment.set_owner(assignment.get_owner());
                            }
                            hashMap.put(Integer.valueOf(l.intValue()), localAssignment);
                        }
                        localAssignment.get_executors().add(new ExecutorInfo(((Long) ((List) entry2.getKey()).get(0)).intValue(), ((Long) ((List) entry2.getKey()).get(((List) entry2.getKey()).size() - 1)).intValue()));
                    }
                }
            }
        }
        return hashMap;
    }

    public synchronized void shutdownAllWorkers(BiConsumer<Slot, Long> biConsumer, UniFunc<Slot> uniFunc) {
        for (Slot slot : this.slots.values()) {
            LOG.info("Setting {} assignment to null", slot);
            slot.setNewAssignment(null);
        }
        if (biConsumer == null) {
            biConsumer = DEFAULT_ON_WARN_TIMEOUT;
        }
        if (uniFunc == null) {
            uniFunc = DEFAULT_ON_ERROR_TIMEOUT;
        }
        long currentTimeMillis = Time.currentTimeMillis();
        Exception exc = null;
        for (Slot slot2 : this.slots.values()) {
            LOG.info("Waiting for {} to be EMPTY, currently {}", slot2, slot2.getMachineState());
            while (slot2.getMachineState() != Slot.MachineState.EMPTY) {
                try {
                    long currentTimeMillis2 = Time.currentTimeMillis() - currentTimeMillis;
                    if (currentTimeMillis2 > ERROR_MILLIS) {
                        uniFunc.call(slot2);
                    }
                    if (currentTimeMillis2 > WARN_MILLIS) {
                        biConsumer.accept(slot2, Long.valueOf(currentTimeMillis2));
                    }
                    if (Time.isSimulating()) {
                        Time.advanceTime(100L);
                    }
                    Thread.sleep(100L);
                } catch (Exception e) {
                    LOG.error("Error trying to shutdown workers in {}", slot2, e);
                    exc = e;
                }
            }
        }
        if (exc != null) {
            if (!(exc instanceof RuntimeException)) {
                throw new RuntimeException(exc);
            }
            throw ((RuntimeException) exc);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        for (Slot slot : this.slots.values()) {
            try {
                slot.close();
            } catch (Exception e) {
                LOG.error("Error trying to shutdown {}", slot, e);
            }
        }
    }
}
