package org.apache.helix.controller.stages;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.common.ResourcesStateMap;
import org.apache.helix.controller.dataproviders.BaseControllerDataProvider;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.MessageUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/controller/stages/MessageGenerationPhase.class */
public class MessageGenerationPhase extends AbstractBaseStage {
    private static final String NO_DESIRED_STATE = "NoDesiredState";
    private static final String PENDING_MESSAGE = "pending message";
    private static final String STALE_MESSAGE = "stale message";
    public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60000);
    private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);

    @Override // org.apache.helix.controller.pipeline.AbstractBaseStage, org.apache.helix.controller.pipeline.Stage
    public void process(ClusterEvent clusterEvent) throws Exception {
        BestPossibleStateOutput bestPossibleStateOutput = (BestPossibleStateOutput) clusterEvent.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
        this._eventId = clusterEvent.getEventId();
        HelixManager helixManager = (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name());
        BaseControllerDataProvider baseControllerDataProvider = (BaseControllerDataProvider) clusterEvent.getAttribute(AttributeName.ControllerDataProvider.name());
        Map map = (Map) clusterEvent.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
        CurrentStateOutput currentStateOutput = (CurrentStateOutput) clusterEvent.getAttribute(AttributeName.CURRENT_STATE.name());
        HashMap hashMap = new HashMap();
        if (helixManager == null || baseControllerDataProvider == null || map == null || currentStateOutput == null || bestPossibleStateOutput == null) {
            throw new StageException("Missing attributes in event:" + clusterEvent + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BESTPOSSIBLE_STATE");
        }
        Map<String, LiveInstance> liveInstances = baseControllerDataProvider.getLiveInstances();
        HashMap hashMap2 = new HashMap();
        for (LiveInstance liveInstance : liveInstances.values()) {
            hashMap2.put(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
        }
        MessageOutput messageOutput = new MessageOutput();
        for (Resource resource : map.values()) {
            try {
                generateMessage(resource, baseControllerDataProvider, bestPossibleStateOutput, currentStateOutput, helixManager, hashMap2, clusterEvent.getEventType(), messageOutput, hashMap);
            } catch (HelixException e) {
                LogUtil.logError(logger, this._eventId, "Failed to generate message for resource " + resource.getResourceName(), e);
            }
        }
        if (!hashMap.isEmpty()) {
            schedulePendingMessageCleanUp(hashMap, baseControllerDataProvider.getAsyncTasksThreadPool(), helixManager.getHelixDataAccessor());
        }
        clusterEvent.addAttribute(AttributeName.MESSAGES_ALL.name(), messageOutput);
    }

    private void generateMessage(Resource resource, BaseControllerDataProvider baseControllerDataProvider, ResourcesStateMap resourcesStateMap, CurrentStateOutput currentStateOutput, HelixManager helixManager, Map<String, String> map, ClusterEventType clusterEventType, MessageOutput messageOutput, Map<String, Map<String, Message>> map2) {
        String resourceName = resource.getResourceName();
        StateModelDefinition stateModelDef = baseControllerDataProvider.getStateModelDef(resource.getStateModelDefRef());
        if (stateModelDef == null) {
            LogUtil.logError(logger, this._eventId, "State Model Definition null, skip generating messages for resource: " + resourceName);
            return;
        }
        for (Partition partition : resource.getPartitions()) {
            HashMap hashMap = new HashMap(resourcesStateMap.getInstanceStateMap(resourceName, partition));
            Map<String, String> pendingStateMap = currentStateOutput.getPendingStateMap(resourceName, partition);
            Map<String, String> currentStateMap = currentStateOutput.getCurrentStateMap(resourceName, partition);
            for (String str : pendingStateMap.keySet()) {
                if (!hashMap.containsKey(str)) {
                    hashMap.put(str, NO_DESIRED_STATE);
                }
            }
            for (String str2 : currentStateMap.keySet()) {
                if (!hashMap.containsKey(str2)) {
                    hashMap.put(str2, HelixDefinedState.DROPPED.name());
                }
            }
            HashMap hashMap2 = new HashMap();
            for (String str3 : hashMap.keySet()) {
                Set<Message> staleMessagesByInstance = baseControllerDataProvider.getStaleMessagesByInstance(str3);
                String str4 = (String) hashMap.get(str3);
                String currentState = currentStateOutput.getCurrentState(resourceName, partition, str3);
                Message pendingMessage = currentStateOutput.getPendingMessage(resourceName, partition, str3);
                boolean isStateTransitionCancelEnabled = baseControllerDataProvider.getClusterConfig().isStateTransitionCancelEnabled();
                Message cancellationMessage = currentStateOutput.getCancellationMessage(resourceName, partition, str3);
                String nextStateForTransition = stateModelDef.getNextStateForTransition(currentState, str4);
                Message message = null;
                if (currentState == null) {
                    currentState = stateModelDef.getInitialState();
                    nextStateForTransition = stateModelDef.getNextStateForTransition(currentState, str4);
                    if (str4.equals(HelixDefinedState.DROPPED.name())) {
                        LogUtil.logDebug(logger, this._eventId, String.format("No current state for partition %s in resource %s, skip the drop message", partition.getPartitionName(), resourceName));
                        addGeneratedMessageToMap(generateCancellationMessageForPendingMessage(str4, currentState, nextStateForTransition, pendingMessage, helixManager, resource, partition, map, str3, stateModelDef, cancellationMessage, isStateTransitionCancelEnabled), hashMap2, clusterEventType, baseControllerDataProvider, str4, resourceName, partition, currentState, nextStateForTransition);
                        if (baseControllerDataProvider instanceof ResourceControllerDataProvider) {
                            ((ResourceControllerDataProvider) baseControllerDataProvider).invalidateCachedIdealStateMapping(resourceName);
                        }
                    }
                }
                if (shouldCleanUpPendingMessage(pendingMessage, map, str3, currentState, currentStateOutput.getEndTime(resourceName, partition, str3))) {
                    logAndAddToCleanUp(map2, pendingMessage, str3, resourceName, partition, currentState, PENDING_MESSAGE);
                }
                for (Message message2 : staleMessagesByInstance) {
                    if (System.currentTimeMillis() - currentStateOutput.getEndTime(resourceName, partition, str3).longValue() > DEFAULT_OBSELETE_MSG_PURGE_DELAY && message2.getResourceName().equals(resourceName) && map.containsKey(str3) && (message2.getPartitionName().equals(partition.getPartitionName()) || (message2.getBatchMessageMode() && message2.getPartitionNames().contains(partition.getPartitionName())))) {
                        logAndAddToCleanUp(map2, message2, str3, resourceName, partition, currentState, STALE_MESSAGE);
                    }
                }
                if (str4.equals(NO_DESIRED_STATE) || str4.equalsIgnoreCase(currentState)) {
                    if (shouldCreateSTCancellation(pendingMessage, str4, stateModelDef.getInitialState())) {
                        message = MessageUtil.createStateTransitionCancellationMessage(helixManager.getInstanceName(), helixManager.getSessionId(), resource, partition.getPartitionName(), str3, map.get(str3), stateModelDef.getId(), pendingMessage.getFromState(), pendingMessage.getToState(), null, cancellationMessage, isStateTransitionCancelEnabled, currentState);
                    }
                } else if (nextStateForTransition == null) {
                    LogUtil.logError(logger, this._eventId, "Unable to find a next state for resource: " + resource.getResourceName() + " partition: " + partition.getPartitionName() + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState + " to:" + str4);
                } else if (pendingMessage != null) {
                    message = generateCancellationMessageForPendingMessage(str4, currentState, nextStateForTransition, pendingMessage, helixManager, resource, partition, map, str3, stateModelDef, cancellationMessage, isStateTransitionCancelEnabled);
                } else {
                    message = MessageUtil.createStateTransitionMessage(helixManager.getInstanceName(), helixManager.getSessionId(), resource, partition.getPartitionName(), str3, currentState, nextStateForTransition, map.get(str3), stateModelDef.getId());
                    if (logger.isDebugEnabled()) {
                        LogUtil.logDebug(logger, this._eventId, String.format("Resource %s partition %s for instance %s with currentState %s and nextState %s", resource.getResourceName(), partition.getPartitionName(), str3, currentState, nextStateForTransition));
                    }
                }
                addGeneratedMessageToMap(message, hashMap2, clusterEventType, baseControllerDataProvider, str4, resourceName, partition, currentState, nextStateForTransition);
            }
            for (String str5 : stateModelDef.getStatesPriorityList()) {
                if (hashMap2.containsKey(str5)) {
                    for (Message message3 : hashMap2.get(str5)) {
                        if (message3.isValid()) {
                            messageOutput.addMessage(resourceName, partition, message3);
                        } else {
                            LogUtil.logError(logger, this._eventId, String.format("An invalid message was generated! Discarding this message. sessionIdMap: %s, CurrentStateMap: %s, InstanceStateMap: %s, AllInstances: %s, LiveInstances: %s, Message: %s", map, currentStateOutput.getCurrentStateMap(resourceName, partition), hashMap, baseControllerDataProvider.getAllInstances(), baseControllerDataProvider.getLiveInstances().keySet(), message3));
                        }
                    }
                }
            }
        }
    }

    private boolean shouldCreateSTCancellation(Message message, String str, String str2) {
        if (message == null) {
            return false;
        }
        if (NO_DESIRED_STATE.equals(str)) {
            return true;
        }
        return (str.equalsIgnoreCase(message.getToState()) || (HelixDefinedState.ERROR.name().equals(message.getFromState()) && str2.equals(message.getToState()))) ? false : true;
    }

    private void logAndAddToCleanUp(Map<String, Map<String, Message>> map, Message message, String str, String str2, Partition partition, String str3, String str4) {
        LogUtil.logInfo(logger, this._eventId, String.format("Adding %s %s on instance %s to clean up. Msg: %s->%s, current state of resource %s:%s is %s", str4, message.getMsgId(), str, message.getFromState(), message.getToState(), str2, partition, str3));
        if (!map.containsKey(str)) {
            map.put(str, new HashMap());
        }
        map.get(str).put(message.getMsgId(), message);
    }

    private Message generateCancellationMessageForPendingMessage(String str, String str2, String str3, Message message, HelixManager helixManager, Resource resource, Partition partition, Map<String, String> map, String str4, StateModelDefinition stateModelDefinition, Message message2, boolean z) {
        Message message3 = null;
        if (message != null) {
            String toState = message.getToState();
            if (str3.equalsIgnoreCase(toState)) {
                LogUtil.logInfo(logger, this._eventId, "Message already exists for " + str4 + " to transit " + resource.getResourceName() + "." + partition.getPartitionName() + " from " + str2 + " to " + str3 + ", isRelay: " + message.isRelayMessage());
            } else if (str2.equalsIgnoreCase(toState)) {
                LogUtil.logDebug(logger, this._eventId, "Message hasn't been removed for " + str4 + " to transit " + resource.getResourceName() + "." + partition.getPartitionName() + " to " + toState + ", desiredState: " + str + ", isRelay: " + message.isRelayMessage());
            } else {
                LogUtil.logDebug(logger, this._eventId, "IdealState changed before state transition completes for " + resource.getResourceName() + "." + partition.getPartitionName() + " on " + str4 + ", pendingState: " + toState + ", currentState: " + str2 + ", nextState: " + str3 + ", isRelay: " + message.isRelayMessage());
                message3 = MessageUtil.createStateTransitionCancellationMessage(helixManager.getInstanceName(), helixManager.getSessionId(), resource, partition.getPartitionName(), str4, map.get(str4), stateModelDefinition.getId(), message.getFromState(), toState, str3, message2, z, str2);
            }
        }
        return message3;
    }

    private void addGeneratedMessageToMap(Message message, Map<String, List<Message>> map, ClusterEventType clusterEventType, BaseControllerDataProvider baseControllerDataProvider, String str, String str2, Partition partition, String str3, String str4) {
        if (message != null) {
            IdealState idealState = baseControllerDataProvider.getIdealState(str2);
            if (idealState != null && idealState.getStateModelDefRef().equalsIgnoreCase("SchedulerTaskQueue") && idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
            }
            int timeOut = getTimeOut(baseControllerDataProvider.getClusterConfig(), baseControllerDataProvider.getResourceConfig(str2), str3, str4, idealState, partition);
            if (timeOut > 0) {
                message.setExecutionTimeout(timeOut);
            }
            message.setAttribute(Message.Attributes.ClusterEventName, clusterEventType.name());
            if (!map.containsKey(str)) {
                map.put(str, new ArrayList());
            }
            map.get(str).add(message);
        }
    }

    private void schedulePendingMessageCleanUp(final Map<String, Map<String, Message>> map, ExecutorService executorService, final HelixDataAccessor helixDataAccessor) {
        executorService.submit(new Callable<Object>() { // from class: org.apache.helix.controller.stages.MessageGenerationPhase.1
            @Override // java.util.concurrent.Callable
            public Object call() {
                for (Map.Entry entry : map.entrySet()) {
                    String str = (String) entry.getKey();
                    for (Message message : ((Map) entry.getValue()).values()) {
                        if (helixDataAccessor.removeProperty(message.getKey(helixDataAccessor.keyBuilder(), str))) {
                            LogUtil.logInfo(MessageGenerationPhase.logger, MessageGenerationPhase.this._eventId, String.format("Deleted message %s from instance %s", message.getMsgId(), str));
                        }
                    }
                }
                return null;
            }
        });
    }

    private boolean shouldCleanUpPendingMessage(Message message, Map<String, String> map, String str, String str2, Long l) {
        if (message == null || !map.containsKey(str)) {
            return false;
        }
        return str2.equalsIgnoreCase(message.getToState()) ? System.currentTimeMillis() - l.longValue() > DEFAULT_OBSELETE_MSG_PURGE_DELAY : !str2.equalsIgnoreCase(message.getFromState());
    }

    private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfig, String str, String str2, IdealState idealState, Partition partition) {
        StateTransitionTimeoutConfig stateTransitionTimeoutConfig = clusterConfig.getStateTransitionTimeoutConfig();
        int stateTransitionTimeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig.getStateTransitionTimeout(str, str2) : -1;
        String str3 = null;
        if (idealState != null) {
            str3 = idealState.getRecord().getSimpleField(str + "-" + str2 + "_" + Message.Attributes.TIMEOUT);
            if (str3 == null && idealState.getStateModelDefRef().equalsIgnoreCase("SchedulerTaskQueue") && idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
                str3 = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
            }
        }
        if (str3 != null) {
            try {
                stateTransitionTimeout = Integer.parseInt(str3);
            } catch (Exception e) {
                LogUtil.logError(logger, this._eventId, "", e);
            }
        }
        if (resourceConfig != null) {
            StateTransitionTimeoutConfig stateTransitionTimeoutConfig2 = resourceConfig.getStateTransitionTimeoutConfig();
            stateTransitionTimeout = stateTransitionTimeoutConfig2 != null ? stateTransitionTimeoutConfig2.getStateTransitionTimeout(str, str2) : -1;
        }
        return stateTransitionTimeout;
    }
}
