package org.apache.helix.messaging.handling;

import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRollbackException;
import org.apache.helix.InstanceType;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.messaging.handling.MessageHandler;
import org.apache.helix.model.Message;
import org.apache.helix.monitoring.StateTransitionContext;
import org.apache.helix.monitoring.StateTransitionDataPoint;
import org.apache.helix.monitoring.mbeans.ParticipantMessageMonitor;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.StatusUpdateUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/messaging/handling/HelixTask.class */
public class HelixTask implements MessageTask {
    private static Logger logger = LoggerFactory.getLogger(HelixTask.class);
    private final Message _message;
    private final MessageHandler _handler;
    private final NotificationContext _notificationContext;
    private final HelixManager _manager;
    HelixTaskExecutor _executor;
    volatile boolean _isTimeout = false;
    volatile boolean _isStarted = false;
    volatile boolean _isCancelled = false;
    StatusUpdateUtil _statusUpdateUtil = new StatusUpdateUtil();

    public HelixTask(Message message, NotificationContext notificationContext, MessageHandler messageHandler, HelixTaskExecutor helixTaskExecutor) {
        this._notificationContext = notificationContext;
        this._message = message;
        this._handler = messageHandler;
        this._manager = notificationContext.getManager();
        this._executor = helixTaskExecutor;
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public HelixTaskResult call() {
        HelixTaskResult helixTaskResult;
        MessageHandler.ErrorType errorType = null;
        MessageHandler.ErrorCode errorCode = null;
        Long l = null;
        Long l2 = null;
        long currentTimeMillis = System.currentTimeMillis();
        logger.info("handling task: " + getTaskId() + " begin, at: " + currentTimeMillis);
        this._statusUpdateUtil.logInfo(this._message, HelixTask.class, "Message handling task begin execute", this._manager);
        this._message.setExecuteStartTimeStamp(new Date().getTime());
        if (this._message.getBatchMessageMode()) {
            this._notificationContext.add(NotificationContext.MapKey.CURRENT_STATE_UPDATE.toString(), new ConcurrentHashMap());
        }
        try {
            setStarted();
            l = Long.valueOf(System.currentTimeMillis());
            helixTaskResult = this._handler.handleMessage();
            l2 = Long.valueOf(System.currentTimeMillis());
            this._executor.cancelTimeoutTask(this);
        } catch (InterruptedException e) {
            helixTaskResult = new HelixTaskResult();
            helixTaskResult.setException(e);
            helixTaskResult.setInterrupted(true);
            this._statusUpdateUtil.logError(this._message, HelixTask.class, e, "State transition interrupted, timeout:" + this._isTimeout, this._manager);
            logger.info("Message " + this._message.getMsgId() + " is interrupted");
        } catch (Exception e2) {
            helixTaskResult = new HelixTaskResult();
            helixTaskResult.setException(e2);
            helixTaskResult.setMessage(e2.getMessage());
            String str = "Exception while executing a message. " + e2 + " msgId: " + this._message.getMsgId() + " type: " + this._message.getMsgType();
            logger.error(str, e2);
            this._statusUpdateUtil.logError(this._message, HelixTask.class, e2, str, this._manager);
        }
        try {
            try {
                if (helixTaskResult.isSuccess()) {
                    this._statusUpdateUtil.logInfo(this._message, this._handler.getClass(), "Message handling task completed successfully", this._manager);
                    logger.info("Message " + this._message.getMsgId() + " completed.");
                    this._executor.getParticipantMonitor().reportProcessedMessage(this._message, ParticipantMessageMonitor.ProcessedMessageState.COMPLETED);
                } else {
                    errorType = MessageHandler.ErrorType.INTERNAL;
                    if (helixTaskResult.isInterrupted()) {
                        logger.info("Message " + this._message.getMsgId() + " is interrupted");
                        errorCode = this._isTimeout ? MessageHandler.ErrorCode.TIMEOUT : MessageHandler.ErrorCode.CANCEL;
                        if (this._isTimeout) {
                            int retryCount = this._message.getRetryCount();
                            logger.info("Message timeout, retry count: " + retryCount + " msgId:" + this._message.getMsgId());
                            this._statusUpdateUtil.logInfo(this._message, this._handler.getClass(), "Message handling task timeout, retryCount:" + retryCount, this._manager);
                            if (retryCount > 0) {
                                this._message.setRetryCount(retryCount - 1);
                                this._executor.scheduleTask(new HelixTask(this._message, this._notificationContext, this._handler, this._executor));
                                HelixTaskResult helixTaskResult2 = helixTaskResult;
                                long currentTimeMillis2 = System.currentTimeMillis();
                                long j = currentTimeMillis2 - currentTimeMillis;
                                long longValue = (l == null || l2 == null) ? -1L : l2.longValue() - l.longValue();
                                logger.info("Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.", new Object[]{this._message.getMsgId(), this._message.getAttribute(Message.Attributes.PARENT_MSG_ID), this._message.getResourceName(), this._message.getPartitionName(), Long.valueOf(currentTimeMillis2), Boolean.valueOf(helixTaskResult.isSuccess()), Long.valueOf(j - longValue), Long.valueOf(longValue)});
                                if (errorType == MessageHandler.ErrorType.INTERNAL) {
                                    this._handler.onError(helixTaskResult.getException(), errorCode, errorType);
                                } else if (errorType == MessageHandler.ErrorType.FRAMEWORK) {
                                    this._handler.onError(null, errorCode, errorType);
                                }
                                return helixTaskResult2;
                            }
                        }
                        this._executor.getParticipantMonitor().reportProcessedMessage(this._message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
                    } else if (helixTaskResult.isCancelled()) {
                        errorType = null;
                        this._statusUpdateUtil.logInfo(this._message, this._handler.getClass(), "Cancellation completed successfully", this._manager);
                        this._executor.getParticipantMonitor().reportProcessedMessage(this._message, ParticipantMessageMonitor.ProcessedMessageState.DISCARDED);
                    } else {
                        errorCode = MessageHandler.ErrorCode.ERROR;
                        String str2 = "Message execution failed. msgId: " + getTaskId() + ", errorMsg: " + helixTaskResult.getMessage();
                        logger.error(str2);
                        this._statusUpdateUtil.logError(this._message, this._handler.getClass(), str2, this._manager);
                        this._executor.getParticipantMonitor().reportProcessedMessage(this._message, ParticipantMessageMonitor.ProcessedMessageState.FAILED);
                    }
                }
                HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
                if (helixTaskResult.isSuccess()) {
                    try {
                        forwardRelayMessages(helixDataAccessor, this._message, helixTaskResult.getCompleteTime());
                    } catch (Exception e3) {
                        logger.warn("Failed to send relay messages.", e3);
                    }
                }
                finalCleanup(helixTaskResult);
                long currentTimeMillis3 = System.currentTimeMillis();
                long j2 = currentTimeMillis3 - currentTimeMillis;
                long longValue2 = (l == null || l2 == null) ? -1L : l2.longValue() - l.longValue();
                logger.info("Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.", new Object[]{this._message.getMsgId(), this._message.getAttribute(Message.Attributes.PARENT_MSG_ID), this._message.getResourceName(), this._message.getPartitionName(), Long.valueOf(currentTimeMillis3), Boolean.valueOf(helixTaskResult.isSuccess()), Long.valueOf(j2 - longValue2), Long.valueOf(longValue2)});
                if (errorType == MessageHandler.ErrorType.INTERNAL) {
                    this._handler.onError(helixTaskResult.getException(), errorCode, errorType);
                } else if (errorType == MessageHandler.ErrorType.FRAMEWORK) {
                    this._handler.onError(null, errorCode, errorType);
                }
            } catch (Exception e4) {
                finalCleanup(helixTaskResult);
                MessageHandler.ErrorType errorType2 = MessageHandler.ErrorType.FRAMEWORK;
                MessageHandler.ErrorCode errorCode2 = MessageHandler.ErrorCode.ERROR;
                String str3 = "Exception after executing a message, msgId: " + this._message.getMsgId() + e4;
                logger.error(str3, e4);
                this._statusUpdateUtil.logError(this._message, HelixTask.class, str3, this._manager);
                long currentTimeMillis4 = System.currentTimeMillis();
                long j3 = currentTimeMillis4 - currentTimeMillis;
                long longValue3 = (l == null || l2 == null) ? -1L : l2.longValue() - l.longValue();
                logger.info("Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.", new Object[]{this._message.getMsgId(), this._message.getAttribute(Message.Attributes.PARENT_MSG_ID), this._message.getResourceName(), this._message.getPartitionName(), Long.valueOf(currentTimeMillis4), Boolean.valueOf(helixTaskResult.isSuccess()), Long.valueOf(j3 - longValue3), Long.valueOf(longValue3)});
                if (errorType2 == MessageHandler.ErrorType.INTERNAL) {
                    this._handler.onError(helixTaskResult.getException(), errorCode2, errorType2);
                } else if (errorType2 == MessageHandler.ErrorType.FRAMEWORK) {
                    this._handler.onError(e4, errorCode2, errorType2);
                }
            }
            return helixTaskResult;
        } catch (Throwable th) {
            long currentTimeMillis5 = System.currentTimeMillis();
            long j4 = currentTimeMillis5 - currentTimeMillis;
            long longValue4 = (l == null || l2 == null) ? -1L : l2.longValue() - l.longValue();
            logger.info("Message: {} (parent: {}) handling task for {}:{} completed at: {}, results: {}. FrameworkTime: {} ms; HandlerTime: {} ms.", new Object[]{this._message.getMsgId(), this._message.getAttribute(Message.Attributes.PARENT_MSG_ID), this._message.getResourceName(), this._message.getPartitionName(), Long.valueOf(currentTimeMillis5), Boolean.valueOf(helixTaskResult.isSuccess()), Long.valueOf(j4 - longValue4), Long.valueOf(longValue4)});
            if (null == MessageHandler.ErrorType.INTERNAL) {
                this._handler.onError(helixTaskResult.getException(), null, null);
            } else if (null == MessageHandler.ErrorType.FRAMEWORK) {
                this._handler.onError(null, null, null);
            }
            throw th;
        }
    }

    private void removeMessageFromZk(HelixDataAccessor helixDataAccessor, Message message) {
        if (HelixUtil.removeMessageFromZK(helixDataAccessor, message, this._manager.getInstanceName())) {
            logger.info("Delete message " + message.getId() + " from zk!");
        } else {
            logger.warn("Failed to delete message " + message.getId() + " from zk!");
        }
    }

    private void forwardRelayMessages(HelixDataAccessor helixDataAccessor, Message message, long j) {
        if (message.hasRelayMessages()) {
            Map<String, Message> relayMessages = message.getRelayMessages();
            PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
            if (!this._manager.getSessionId().equals(message.getTgtSessionId())) {
                logger.info("Session id has been changed, ignore all relay messages attached with " + message.getId());
                return;
            }
            for (String str : relayMessages.keySet()) {
                Message message2 = relayMessages.get(str);
                if (message2.getMsgSubType().equals(Message.MessageType.RELAYED_MESSAGE.name())) {
                    message2.setRelayTime(j);
                    if (message2.isExpired()) {
                        logger.info("Relay message expired, ignore " + message2.getId() + " to instance " + str);
                    } else {
                        if (helixDataAccessor.getBaseDataAccessor().create(keyBuilder.message(str, message2.getId()).getPath(), message2.getRecord(), AccessOption.PERSISTENT)) {
                            logger.info("Send relay message " + message2.getId() + " to " + str);
                        } else {
                            logger.warn("Failed to send relay message " + message2.getId() + " to " + str);
                        }
                    }
                }
            }
        }
    }

    private HelixDataAccessor getSrcClusterDataAccessor(Message message) {
        HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
        String srcClusterName = message.getSrcClusterName();
        if (srcClusterName != null && !srcClusterName.equals(this._manager.getClusterName())) {
            helixDataAccessor = new ZKHelixDataAccessor(srcClusterName, helixDataAccessor.getBaseDataAccessor());
        }
        return helixDataAccessor;
    }

    private void sendReply(HelixDataAccessor helixDataAccessor, Message message, HelixTaskResult helixTaskResult) {
        if (message.getCorrelationId() == null || message.getMsgType().equals(Message.MessageType.TASK_REPLY.name())) {
            return;
        }
        logger.info("Sending reply for message " + message.getCorrelationId());
        this._statusUpdateUtil.logInfo(message, HelixTask.class, "Sending reply", this._manager);
        helixTaskResult.getTaskResultMap().put("SUCCESS", helixTaskResult.isSuccess());
        helixTaskResult.getTaskResultMap().put("INTERRUPTED", helixTaskResult.isInterrupted());
        if (!helixTaskResult.isSuccess()) {
            helixTaskResult.getTaskResultMap().put("ERRORINFO", helixTaskResult.getMessage());
        }
        Message createReplyMessage = Message.createReplyMessage(message, this._manager.getInstanceName(), helixTaskResult.getTaskResultMap());
        createReplyMessage.setSrcInstanceType(this._manager.getInstanceType());
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        if (message.getSrcInstanceType() == InstanceType.PARTICIPANT) {
            helixDataAccessor.setProperty(keyBuilder.message(message.getMsgSrc(), createReplyMessage.getMsgId()), createReplyMessage);
        } else if (message.getSrcInstanceType() == InstanceType.CONTROLLER) {
            helixDataAccessor.setProperty(keyBuilder.controllerMessage(createReplyMessage.getMsgId()), createReplyMessage);
        }
        StatusUpdateUtil statusUpdateUtil = this._statusUpdateUtil;
        Object[] objArr = new Object[2];
        objArr[0] = createReplyMessage.getTgtName();
        objArr[1] = message.getSrcClusterName() == null ? this._manager.getClusterName() : message.getSrcClusterName();
        statusUpdateUtil.logInfo(message, HelixTask.class, String.format("1 msg replied to %s in cluster %s.", objArr), this._manager);
    }

    private void reportMessageStat(HelixManager helixManager, Message message, HelixTaskResult helixTaskResult) {
        if (message.getMsgType().equals(Message.MessageType.STATE_TRANSITION.name())) {
            long time = new Date().getTime();
            long readTimeStamp = message.getReadTimeStamp();
            long executeStartTimeStamp = message.getExecuteStartTimeStamp();
            if (readTimeStamp == 0 || executeStartTimeStamp == 0) {
                logger.warn("message read time and start execution time not recorded. State transition delay time is not available, message read time {}, Execute start time {}.", Long.valueOf(readTimeStamp), Long.valueOf(executeStartTimeStamp));
                return;
            }
            long j = time - readTimeStamp;
            long j2 = time - executeStartTimeStamp;
            long createTimeStamp = readTimeStamp - message.getCreateTimeStamp();
            if (j < 0 || j2 < 0) {
                return;
            }
            this._executor.getParticipantMonitor().reportTransitionStat(new StateTransitionContext(helixManager.getClusterName(), helixManager.getInstanceName(), message.getResourceName(), message.getFromState() + "--" + message.getToState()), new StateTransitionDataPoint(j, j2, createTimeStamp, helixTaskResult.isSuccess()));
        }
    }

    @Override // org.apache.helix.messaging.handling.MessageTask
    public String getTaskId() {
        return this._message.getId();
    }

    @Override // org.apache.helix.messaging.handling.MessageTask
    public Message getMessage() {
        return this._message;
    }

    @Override // org.apache.helix.messaging.handling.MessageTask
    public NotificationContext getNotificationContext() {
        return this._notificationContext;
    }

    @Override // org.apache.helix.messaging.handling.MessageTask
    public void onTimeout() {
        this._isTimeout = true;
        this._handler.onTimeout();
    }

    @Override // org.apache.helix.messaging.handling.MessageTask
    public synchronized boolean cancel() {
        if (this._isStarted) {
            return false;
        }
        this._isCancelled = true;
        this._handler.cancel();
        return true;
    }

    private synchronized void setStarted() {
        if (this._isCancelled) {
            throw new HelixRollbackException("Task has already been cancelled");
        }
        this._isStarted = true;
    }

    private void finalCleanup(HelixTaskResult helixTaskResult) {
        try {
            if (this._message.getAttribute(Message.Attributes.PARENT_MSG_ID) == null) {
                removeMessageFromZk(this._manager.getHelixDataAccessor(), this._message);
                reportMessageStat(this._manager, this._message, helixTaskResult);
                sendReply(getSrcClusterDataAccessor(this._message), this._message, helixTaskResult);
                this._executor.finishTask(this);
            }
        } catch (Exception e) {
            logger.error(String.format("Error to final clean up for message : %s", this._message.getId()));
        }
    }
}
