package org.apache.helix.controller.stages;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.config.StateTransitionTimeoutConfig;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.HealthStat;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/controller/stages/MessageGenerationPhase.class */
public class MessageGenerationPhase extends AbstractBaseStage {
    private static final String NO_DESIRED_STATE = "NoDesiredState";
    public static final long DEFAULT_OBSELETE_MSG_PURGE_DELAY = HelixUtil.getSystemPropertyAsLong(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, 60000);
    private static Logger logger = LoggerFactory.getLogger(MessageGenerationPhase.class);

    @Override // org.apache.helix.controller.pipeline.AbstractBaseStage, org.apache.helix.controller.pipeline.Stage
    public void process(ClusterEvent clusterEvent) throws Exception {
        this._eventId = clusterEvent.getEventId();
        HelixManager helixManager = (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name());
        ClusterDataCache clusterDataCache = (ClusterDataCache) clusterEvent.getAttribute(AttributeName.ClusterDataCache.name());
        Map map = (Map) clusterEvent.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
        HashMap hashMap = new HashMap();
        CurrentStateOutput currentStateOutput = (CurrentStateOutput) clusterEvent.getAttribute(AttributeName.CURRENT_STATE.name());
        IntermediateStateOutput intermediateStateOutput = (IntermediateStateOutput) clusterEvent.getAttribute(AttributeName.INTERMEDIATE_STATE.name());
        if (helixManager == null || clusterDataCache == null || map == null || currentStateOutput == null || intermediateStateOutput == null) {
            throw new StageException("Missing attributes in event:" + clusterEvent + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|INTERMEDIATE_STATE");
        }
        Map<String, LiveInstance> liveInstances = clusterDataCache.getLiveInstances();
        HashMap hashMap2 = new HashMap();
        for (LiveInstance liveInstance : liveInstances.values()) {
            hashMap2.put(liveInstance.getInstanceName(), liveInstance.getSessionId());
        }
        MessageGenerationOutput messageGenerationOutput = new MessageGenerationOutput();
        for (String str : map.keySet()) {
            Resource resource = (Resource) map.get(str);
            StateModelDefinition stateModelDef = clusterDataCache.getStateModelDef(resource.getStateModelDefRef());
            if (stateModelDef == null) {
                LogUtil.logError(logger, this._eventId, "State Model Definition null, skip generating messages for resource: " + str);
            } else {
                for (Partition partition : resource.getPartitions()) {
                    HashMap hashMap3 = new HashMap(intermediateStateOutput.getInstanceStateMap(str, partition));
                    for (String str2 : currentStateOutput.getPendingStateMap(str, partition).keySet()) {
                        if (!hashMap3.containsKey(str2)) {
                            hashMap3.put(str2, NO_DESIRED_STATE);
                        }
                    }
                    HashMap hashMap4 = new HashMap();
                    for (String str3 : hashMap3.keySet()) {
                        String str4 = (String) hashMap3.get(str3);
                        String currentState = currentStateOutput.getCurrentState(str, partition, str3);
                        if (currentState == null) {
                            currentState = stateModelDef.getInitialState();
                        }
                        Message pendingState = currentStateOutput.getPendingState(str, partition, str3);
                        boolean isStateTransitionCancelEnabled = clusterDataCache.getClusterConfig().isStateTransitionCancelEnabled();
                        Message cancellationState = currentStateOutput.getCancellationState(str, partition, str3);
                        String nextStateForTransition = stateModelDef.getNextStateForTransition(currentState, str4);
                        Message message = null;
                        if (shouldCleanUpPendingMessage(pendingState, currentState, currentStateOutput.getEndTime(str, partition, str3))) {
                            LogUtil.logInfo(logger, this._eventId, String.format("Adding pending message %s on instance %s to clean up. Msg: %s->%s, current state of resource %s:%s is %s", pendingState.getMsgId(), str3, pendingState.getFromState(), pendingState.getToState(), str, partition, currentState));
                            if (!hashMap.containsKey(str3)) {
                                hashMap.put(str3, new HashMap());
                            }
                            hashMap.get(str3).put(pendingState.getMsgId(), pendingState);
                        }
                        if (str4.equals(NO_DESIRED_STATE) || str4.equalsIgnoreCase(currentState)) {
                            if (str4.equals(NO_DESIRED_STATE) || (pendingState != null && !currentState.equalsIgnoreCase(pendingState.getToState()))) {
                                message = createStateTransitionCancellationMessage(helixManager, resource, partition.getPartitionName(), str3, (String) hashMap2.get(str3), stateModelDef.getId(), pendingState.getFromState(), pendingState.getToState(), null, cancellationState, isStateTransitionCancelEnabled, currentState);
                            }
                        } else if (nextStateForTransition == null) {
                            LogUtil.logError(logger, this._eventId, "Unable to find a next state for resource: " + resource.getResourceName() + " partition: " + partition.getPartitionName() + " from stateModelDefinition" + stateModelDef.getClass() + " from:" + currentState + " to:" + str4);
                        } else if (pendingState != null) {
                            String toState = pendingState.getToState();
                            if (nextStateForTransition.equalsIgnoreCase(toState)) {
                                LogUtil.logDebug(logger, this._eventId, "Message already exists for " + str3 + " to transit " + resource.getResourceName() + HealthStat.statFieldDelim + partition.getPartitionName() + " from " + currentState + " to " + nextStateForTransition);
                            } else if (currentState.equalsIgnoreCase(toState)) {
                                LogUtil.logInfo(logger, this._eventId, "Message hasn't been removed for " + str3 + " to transit " + resource.getResourceName() + HealthStat.statFieldDelim + partition.getPartitionName() + " to " + toState + ", desiredState: " + str4);
                            } else {
                                LogUtil.logInfo(logger, this._eventId, "IdealState changed before state transition completes for " + resource.getResourceName() + HealthStat.statFieldDelim + partition.getPartitionName() + " on " + str3 + ", pendingState: " + toState + ", currentState: " + currentState + ", nextState: " + nextStateForTransition);
                                message = createStateTransitionCancellationMessage(helixManager, resource, partition.getPartitionName(), str3, (String) hashMap2.get(str3), stateModelDef.getId(), pendingState.getFromState(), toState, nextStateForTransition, cancellationState, isStateTransitionCancelEnabled, currentState);
                            }
                        } else {
                            message = createStateTransitionMessage(helixManager, resource, partition.getPartitionName(), str3, currentState, nextStateForTransition, (String) hashMap2.get(str3), stateModelDef.getId());
                            if (logger.isDebugEnabled()) {
                                LogUtil.logDebug(logger, this._eventId, String.format("Resource %s partition %s for instance %s with currentState %s and nextState %s", resource, partition.getPartitionName(), str3, currentState, nextStateForTransition));
                            }
                        }
                        if (message != null) {
                            IdealState idealState = clusterDataCache.getIdealState(str);
                            if (idealState != null && idealState.getStateModelDefRef().equalsIgnoreCase("SchedulerTaskQueue") && idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
                                message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(), idealState.getRecord().getMapField(partition.getPartitionName()));
                            }
                            int timeOut = getTimeOut(clusterDataCache.getClusterConfig(), clusterDataCache.getResourceConfig(str), currentState, nextStateForTransition, idealState, partition);
                            if (timeOut > 0) {
                                message.setExecutionTimeout(timeOut);
                            }
                            message.setAttribute(Message.Attributes.ClusterEventName, clusterEvent.getEventType().name());
                            if (!hashMap4.containsKey(str4)) {
                                hashMap4.put(str4, new ArrayList());
                            }
                            ((List) hashMap4.get(str4)).add(message);
                        }
                    }
                    for (String str5 : stateModelDef.getStatesPriorityList()) {
                        if (hashMap4.containsKey(str5)) {
                            Iterator it = ((List) hashMap4.get(str5)).iterator();
                            while (it.hasNext()) {
                                messageGenerationOutput.addMessage(str, partition, (Message) it.next());
                            }
                        }
                    }
                }
            }
        }
        if (!hashMap.isEmpty()) {
            schedulePendingMessageCleanUp(hashMap, clusterDataCache.getAsyncTasksThreadPool(), helixManager.getHelixDataAccessor());
        }
        clusterEvent.addAttribute(AttributeName.MESSAGES_ALL.name(), messageGenerationOutput);
    }

    private void schedulePendingMessageCleanUp(final Map<String, Map<String, Message>> map, ExecutorService executorService, final HelixDataAccessor helixDataAccessor) {
        executorService.submit(new Callable<Object>() { // from class: org.apache.helix.controller.stages.MessageGenerationPhase.1
            @Override // java.util.concurrent.Callable
            public Object call() {
                for (Map.Entry entry : map.entrySet()) {
                    String str = (String) entry.getKey();
                    for (Message message : ((Map) entry.getValue()).values()) {
                        if (helixDataAccessor.removeProperty(message.getKey(helixDataAccessor.keyBuilder(), str))) {
                            LogUtil.logInfo(MessageGenerationPhase.logger, MessageGenerationPhase.this._eventId, String.format("Deleted message %s from instance %s", message.getMsgId(), str));
                        }
                    }
                }
                return null;
            }
        });
    }

    private boolean shouldCleanUpPendingMessage(Message message, String str, Long l) {
        if (message == null) {
            return false;
        }
        return str.equalsIgnoreCase(message.getToState()) ? System.currentTimeMillis() - l.longValue() > DEFAULT_OBSELETE_MSG_PURGE_DELAY : !str.equalsIgnoreCase(message.getFromState());
    }

    private Message createStateTransitionMessage(HelixManager helixManager, Resource resource, String str, String str2, String str3, String str4, String str5, String str6) {
        Message message = new Message(Message.MessageType.STATE_TRANSITION, UUID.randomUUID().toString());
        message.setSrcName(helixManager.getInstanceName());
        message.setTgtName(str2);
        message.setMsgState(Message.MessageState.NEW);
        message.setPartitionName(str);
        message.setResourceName(resource.getResourceName());
        message.setFromState(str3);
        message.setToState(str4);
        message.setTgtSessionId(str5);
        message.setSrcSessionId(helixManager.getSessionId());
        message.setStateModelDef(str6);
        message.setStateModelFactoryName(resource.getStateModelFactoryname());
        message.setBucketSize(resource.getBucketSize());
        if (resource.getResourceGroupName() != null) {
            message.setResourceGroupName(resource.getResourceGroupName());
        }
        if (resource.getResourceTag() != null) {
            message.setResourceTag(resource.getResourceTag());
        }
        return message;
    }

    private Message createStateTransitionCancellationMessage(HelixManager helixManager, Resource resource, String str, String str2, String str3, String str4, String str5, String str6, String str7, Message message, boolean z, String str8) {
        if (!z || message != null) {
            return null;
        }
        LogUtil.logInfo(logger, this._eventId, "Send cancellation message of the state transition for " + resource.getResourceName() + HealthStat.statFieldDelim + str + " on " + str2 + ", currentState: " + str8 + ", nextState: " + (str7 == null ? "N/A" : str7));
        Message message2 = new Message(Message.MessageType.STATE_TRANSITION_CANCELLATION, UUID.randomUUID().toString());
        message2.setSrcName(helixManager.getInstanceName());
        message2.setTgtName(str2);
        message2.setMsgState(Message.MessageState.NEW);
        message2.setPartitionName(str);
        message2.setResourceName(resource.getResourceName());
        message2.setFromState(str5);
        message2.setToState(str6);
        message2.setTgtSessionId(str3);
        message2.setSrcSessionId(helixManager.getSessionId());
        message2.setStateModelDef(str4);
        message2.setStateModelFactoryName(resource.getStateModelFactoryname());
        message2.setBucketSize(resource.getBucketSize());
        return message2;
    }

    private int getTimeOut(ClusterConfig clusterConfig, ResourceConfig resourceConfig, String str, String str2, IdealState idealState, Partition partition) {
        StateTransitionTimeoutConfig stateTransitionTimeoutConfig = clusterConfig.getStateTransitionTimeoutConfig();
        int stateTransitionTimeout = stateTransitionTimeoutConfig != null ? stateTransitionTimeoutConfig.getStateTransitionTimeout(str, str2) : -1;
        String str3 = null;
        if (idealState != null) {
            str3 = idealState.getRecord().getSimpleField(str + "-" + str2 + "_" + Message.Attributes.TIMEOUT);
            if (str3 == null && idealState.getStateModelDefRef().equalsIgnoreCase("SchedulerTaskQueue") && idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
                str3 = idealState.getRecord().getMapField(partition.getPartitionName()).get(Message.Attributes.TIMEOUT.toString());
            }
        }
        if (str3 != null) {
            try {
                stateTransitionTimeout = Integer.parseInt(str3);
            } catch (Exception e) {
                LogUtil.logError(logger, this._eventId, "", e);
            }
        }
        if (resourceConfig != null) {
            StateTransitionTimeoutConfig stateTransitionTimeoutConfig2 = resourceConfig.getStateTransitionTimeoutConfig();
            stateTransitionTimeout = stateTransitionTimeoutConfig2 != null ? stateTransitionTimeoutConfig2.getStateTransitionTimeout(str, str2) : -1;
        }
        return stateTransitionTimeout;
    }
}
