package org.apache.helix.messaging;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.Criteria;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.messaging.handling.AsyncCallbackService;
import org.apache.helix.messaging.handling.HelixTaskExecutor;
import org.apache.helix.messaging.handling.MessageHandlerFactory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.builder.ConfigScopeBuilder;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.monitoring.mbeans.MessageQueueMonitor;
import org.apache.helix.monitoring.mbeans.ParticipantStatusMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/messaging/DefaultMessagingService.class */
public class DefaultMessagingService implements ClusterMessagingService {
    private final HelixManager _manager;
    private final HelixTaskExecutor _taskExecutor;
    private final AsyncCallbackService _asyncCallbackService;
    private static Logger _logger = LoggerFactory.getLogger((Class<?>) DefaultMessagingService.class);
    ConcurrentHashMap<String, MessageHandlerFactory> _messageHandlerFactoriestobeAdded = new ConcurrentHashMap<>();
    private final CriteriaEvaluator _evaluator = new CriteriaEvaluator();

    public DefaultMessagingService(HelixManager helixManager) {
        this._manager = helixManager;
        this._taskExecutor = new HelixTaskExecutor(new ParticipantStatusMonitor(helixManager.getInstanceType() == InstanceType.PARTICIPANT || helixManager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT, helixManager.getInstanceName()), new MessageQueueMonitor(helixManager.getClusterName(), helixManager.getInstanceName()));
        this._asyncCallbackService = new AsyncCallbackService();
        this._taskExecutor.registerMessageHandlerFactory(Message.MessageType.TASK_REPLY.name(), this._asyncCallbackService);
    }

    @Override // org.apache.helix.ClusterMessagingService
    public int send(Criteria criteria, Message message) {
        return send(criteria, message, null, -1);
    }

    @Override // org.apache.helix.ClusterMessagingService
    public int send(Criteria criteria, Message message, AsyncCallback asyncCallback, int i) {
        return send(criteria, message, asyncCallback, i, 0);
    }

    @Override // org.apache.helix.ClusterMessagingService
    public int send(Criteria criteria, Message message, AsyncCallback asyncCallback, int i, int i2) {
        Map<InstanceType, List<Message>> generateMessage = generateMessage(criteria, message);
        int i3 = 0;
        Iterator<List<Message>> it = generateMessage.values().iterator();
        while (it.hasNext()) {
            i3 += it.next().size();
        }
        _logger.info("Send " + i3 + " messages with criteria " + criteria);
        if (i3 == 0) {
            return 0;
        }
        String str = null;
        if (asyncCallback != null) {
            int i4 = i * (i2 + 1);
            if (i4 < 0) {
                i4 = -1;
            }
            asyncCallback.setTimeout(i4);
            str = UUID.randomUUID().toString();
            Iterator<List<Message>> it2 = generateMessage.values().iterator();
            while (it2.hasNext()) {
                asyncCallback.setMessagesSent(it2.next());
            }
            this._asyncCallbackService.registerAsyncCallback(str, asyncCallback);
        }
        HelixDataAccessor recipientDataAccessor = getRecipientDataAccessor(criteria);
        for (InstanceType instanceType : generateMessage.keySet()) {
            for (Message message2 : generateMessage.get(instanceType)) {
                message2.setRetryCount(i2);
                message2.setExecutionTimeout(i);
                message2.setSrcInstanceType(this._manager.getInstanceType());
                if (str != null) {
                    message2.setCorrelationId(str);
                }
                message2.setSrcClusterName(this._manager.getClusterName());
                PropertyKey.Builder keyBuilder = recipientDataAccessor.keyBuilder();
                if (instanceType == InstanceType.CONTROLLER) {
                    recipientDataAccessor.setProperty(keyBuilder.controllerMessage(message2.getId()), message2);
                } else if (instanceType == InstanceType.PARTICIPANT) {
                    recipientDataAccessor.setProperty(keyBuilder.message(message2.getTgtName(), message2.getId()), message2);
                }
            }
        }
        if (asyncCallback != null) {
            asyncCallback.startTimer();
        }
        return i3;
    }

    private HelixDataAccessor getRecipientDataAccessor(Criteria criteria) {
        HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
        String clusterName = criteria.getClusterName();
        if (clusterName != null && !clusterName.equals(this._manager.getClusterName())) {
            helixDataAccessor = new ZKHelixDataAccessor(clusterName, helixDataAccessor.getBaseDataAccessor());
        }
        return helixDataAccessor;
    }

    @Override // org.apache.helix.ClusterMessagingService
    public Map<InstanceType, List<Message>> generateMessage(Criteria criteria, Message message) {
        HashMap hashMap = new HashMap();
        InstanceType recipientInstanceType = criteria.getRecipientInstanceType();
        HelixDataAccessor recipientDataAccessor = getRecipientDataAccessor(criteria);
        List<Message> list = Collections.EMPTY_LIST;
        if (recipientInstanceType == InstanceType.CONTROLLER) {
            list = generateMessagesForController(message);
        } else if (recipientInstanceType == InstanceType.PARTICIPANT) {
            list = generateMessagesForParticipant(criteria, message, recipientDataAccessor);
        }
        hashMap.put(recipientInstanceType, list);
        return hashMap;
    }

    private List<Message> generateMessagesForParticipant(Criteria criteria, Message message, HelixDataAccessor helixDataAccessor) {
        ArrayList arrayList = new ArrayList();
        List<Map<String, String>> evaluateCriteria = this._evaluator.evaluateCriteria(criteria, helixDataAccessor);
        if (!evaluateCriteria.isEmpty()) {
            HashMap hashMap = new HashMap();
            if (criteria.isSessionSpecific()) {
                for (LiveInstance liveInstance : helixDataAccessor.getChildValues(helixDataAccessor.keyBuilder().liveInstances())) {
                    hashMap.put(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner());
                }
            }
            for (Map<String, String> map : evaluateCriteria) {
                Message message2 = new Message(message.getRecord(), UUID.randomUUID().toString());
                String instanceName = this._manager.getInstanceName();
                String str = map.get("instanceName");
                if (!criteria.isSelfExcluded() || !instanceName.equalsIgnoreCase(str)) {
                    message2.setSrcName(instanceName);
                    message2.setTgtName(str);
                    message2.setResourceName(map.get(ClusterStatusMonitor.RESOURCE_DN_KEY));
                    message2.setPartitionName(map.get("partitionName"));
                    if (criteria.isSessionSpecific()) {
                        message2.setTgtSessionId((String) hashMap.get(str));
                    }
                    arrayList.add(message2);
                }
            }
        }
        return arrayList;
    }

    private List<Message> generateMessagesForController(Message message) {
        ArrayList arrayList = new ArrayList();
        String uuid = message.getMsgId() == null ? UUID.randomUUID().toString() : message.getMsgId();
        Message message2 = new Message(message.getRecord(), uuid);
        message2.setMsgId(uuid);
        message2.setSrcName(this._manager.getInstanceName());
        message2.setTgtName(InstanceType.CONTROLLER.name());
        arrayList.add(message2);
        return arrayList;
    }

    @Override // org.apache.helix.ClusterMessagingService
    public synchronized void registerMessageHandlerFactory(String str, MessageHandlerFactory messageHandlerFactory) {
        registerMessageHandlerFactory(Collections.singletonList(str), messageHandlerFactory);
    }

    @Override // org.apache.helix.ClusterMessagingService
    public synchronized void registerMessageHandlerFactory(List<String> list, MessageHandlerFactory messageHandlerFactory) {
        if (this._manager.isConnected()) {
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                registerMessageHandlerFactoryInternal(it.next(), messageHandlerFactory);
            }
        } else {
            Iterator<String> it2 = list.iterator();
            while (it2.hasNext()) {
                this._messageHandlerFactoriestobeAdded.put(it2.next(), messageHandlerFactory);
            }
        }
    }

    public synchronized void onConnected() {
        Iterator it = this._messageHandlerFactoriestobeAdded.keySet().iterator();
        while (it.hasNext()) {
            String str = (String) it.next();
            registerMessageHandlerFactoryInternal(str, this._messageHandlerFactoriestobeAdded.get(str));
        }
        this._messageHandlerFactoriestobeAdded.clear();
    }

    void registerMessageHandlerFactoryInternal(String str, MessageHandlerFactory messageHandlerFactory) {
        _logger.info("registering msg factory for type " + str);
        int i = 40;
        String str2 = null;
        String str3 = str + "." + HelixTaskExecutor.MAX_THREADS;
        ConfigAccessor configAccessor = this._manager.getConfigAccessor();
        if (configAccessor != null) {
            if (this._manager.getInstanceType() == InstanceType.PARTICIPANT || this._manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
                str2 = configAccessor.get(new ConfigScopeBuilder().forCluster(this._manager.getClusterName()).forParticipant(this._manager.getInstanceName()).build(), str3);
            }
            if (str2 == null) {
                str2 = configAccessor.get(new ConfigScopeBuilder().forCluster(this._manager.getClusterName()).build(), str3);
            }
        }
        if (str2 != null) {
            try {
                i = Integer.parseInt(str2);
                if (i <= 0) {
                    i = 1;
                }
            } catch (Exception e) {
                _logger.error("", (Throwable) e);
            }
        }
        this._taskExecutor.registerMessageHandlerFactory(str, messageHandlerFactory, i);
        sendNopMessageInternal();
    }

    @Deprecated
    public void sendNopMessage() {
        sendNopMessageInternal();
    }

    private void sendNopMessageInternal() {
        try {
            Message message = new Message(Message.MessageType.NO_OP, UUID.randomUUID().toString());
            message.setSrcName(this._manager.getInstanceName());
            HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
            PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
            if (this._manager.getInstanceType() == InstanceType.CONTROLLER || this._manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
                message.setTgtName(InstanceType.CONTROLLER.name());
                helixDataAccessor.setProperty(keyBuilder.controllerMessage(message.getId()), message);
            }
            if (this._manager.getInstanceType() == InstanceType.PARTICIPANT || this._manager.getInstanceType() == InstanceType.CONTROLLER_PARTICIPANT) {
                message.setTgtName(this._manager.getInstanceName());
                helixDataAccessor.setProperty(keyBuilder.message(message.getTgtName(), message.getId()), message);
            }
        } catch (Exception e) {
            _logger.error(e.toString());
        }
    }

    public HelixTaskExecutor getExecutor() {
        return this._taskExecutor;
    }

    @Override // org.apache.helix.ClusterMessagingService
    public int sendAndWait(Criteria criteria, Message message, AsyncCallback asyncCallback, int i, int i2) {
        int send = send(criteria, message, asyncCallback, i, i2);
        if (send > 0) {
            synchronized (asyncCallback) {
                while (!asyncCallback.isDone() && !asyncCallback.isTimedOut()) {
                    try {
                        asyncCallback.wait();
                    } catch (InterruptedException e) {
                        _logger.error(e.toString());
                        asyncCallback.setInterrupted(true);
                    }
                }
            }
        } else {
            _logger.warn("No messages sent. For Criteria:" + criteria);
        }
        return send;
    }

    @Override // org.apache.helix.ClusterMessagingService
    public int sendAndWait(Criteria criteria, Message message, AsyncCallback asyncCallback, int i) {
        return sendAndWait(criteria, message, asyncCallback, i, 0);
    }
}
