package org.apache.helix.messaging.handling;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.messaging.AsyncCallback;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/helix/messaging/handling/AsyncCallbackService.class */
public class AsyncCallbackService implements MessageHandlerFactory {
    private final ConcurrentHashMap<String, AsyncCallback> _callbackMap = new ConcurrentHashMap<>();
    private static Logger _logger = Logger.getLogger(AsyncCallbackService.class);

    /* loaded from: input_file:org/apache/helix/messaging/handling/AsyncCallbackService$AsyncCallbackMessageHandler.class */
    public class AsyncCallbackMessageHandler extends MessageHandler {
        private final String _correlationId;
        static final /* synthetic */ boolean $assertionsDisabled;

        public AsyncCallbackMessageHandler(String str, Message message, NotificationContext notificationContext) {
            super(message, notificationContext);
            this._correlationId = str;
        }

        @Override // org.apache.helix.messaging.handling.MessageHandler
        public HelixTaskResult handleMessage() throws InterruptedException {
            AsyncCallbackService.this.verifyMessage(this._message);
            HelixTaskResult helixTaskResult = new HelixTaskResult();
            if (!$assertionsDisabled && !this._correlationId.equalsIgnoreCase(this._message.getCorrelationId())) {
                throw new AssertionError();
            }
            AsyncCallbackService._logger.info("invoking reply message " + this._message.getMsgId() + ", correlationid:" + this._correlationId);
            AsyncCallback asyncCallback = (AsyncCallback) AsyncCallbackService.this._callbackMap.get(this._correlationId);
            synchronized (asyncCallback) {
                asyncCallback.onReply(this._message);
                if (asyncCallback.isDone()) {
                    AsyncCallbackService._logger.info("Removing finished callback, correlationid:" + this._correlationId);
                    AsyncCallbackService.this._callbackMap.remove(this._correlationId);
                }
            }
            helixTaskResult.setSuccess(true);
            return helixTaskResult;
        }

        @Override // org.apache.helix.messaging.handling.MessageHandler
        public void onError(Exception exc, MessageHandler.ErrorCode errorCode, MessageHandler.ErrorType errorType) {
            AsyncCallbackService._logger.error("Message handling pipeline get an exception. MsgId:" + this._message.getMsgId(), exc);
        }

        static {
            $assertionsDisabled = !AsyncCallbackService.class.desiredAssertionStatus();
        }
    }

    public void registerAsyncCallback(String str, AsyncCallback asyncCallback) {
        if (this._callbackMap.containsKey(str)) {
            _logger.warn("correlation id " + str + " already registered");
        }
        _logger.info("registering correlation id " + str);
        this._callbackMap.put(str, asyncCallback);
    }

    void verifyMessage(Message message) {
        if (!message.getMsgType().toString().equalsIgnoreCase(Message.MessageType.TASK_REPLY.toString())) {
            String str = "Unexpected msg type for message " + message.getMsgId() + " type:" + message.getMsgType() + " Expected : " + Message.MessageType.TASK_REPLY;
            _logger.error(str);
            throw new HelixException(str);
        }
        String correlationId = message.getCorrelationId();
        if (correlationId == null) {
            String str2 = "Message " + message.getMsgId() + " does not have correlation id";
            _logger.error(str2);
            throw new HelixException(str2);
        }
        if (this._callbackMap.containsKey(correlationId)) {
            _logger.info("Verified reply message " + message.getMsgId() + " correlation:" + correlationId);
        } else {
            String str3 = "Message " + message.getMsgId() + " does not have correponding callback. Probably timed out already. Correlation id: " + correlationId;
            _logger.error(str3);
            throw new HelixException(str3);
        }
    }

    @Override // org.apache.helix.messaging.handling.MessageHandlerFactory
    public MessageHandler createHandler(Message message, NotificationContext notificationContext) {
        verifyMessage(message);
        return new AsyncCallbackMessageHandler(message.getCorrelationId(), message, notificationContext);
    }

    @Override // org.apache.helix.messaging.handling.MessageHandlerFactory
    public String getMessageType() {
        return Message.MessageType.TASK_REPLY.toString();
    }

    @Override // org.apache.helix.messaging.handling.MessageHandlerFactory
    public void reset() {
    }
}
