package org.apache.dolphinscheduler.server.worker.message;

import com.google.common.base.Objects;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import lombok.Generated;
import lombok.NonNull;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.dolphinscheduler.common.lifecycle.ServerLifeCycleManager;
import org.apache.dolphinscheduler.common.thread.BaseDaemonThread;
import org.apache.dolphinscheduler.extract.master.transportor.ITaskInstanceExecutionEvent;
import org.apache.dolphinscheduler.plugin.task.api.utils.LogUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Component
/* loaded from: input_file:org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner.class */
public class MessageRetryRunner extends BaseDaemonThread {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(MessageRetryRunner.class);
    private static final long MESSAGE_RETRY_WINDOW = Duration.ofMinutes(5).toMillis();

    @Autowired
    @Lazy
    private List<TaskInstanceExecutionEventSender> messageSenders;
    private final Map<ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType, TaskInstanceExecutionEventSender<ITaskInstanceExecutionEvent>> messageSenderMap;
    private final Map<Integer, List<TaskInstanceMessage>> needToRetryMessages;

    /* loaded from: input_file:org/apache/dolphinscheduler/server/worker/message/MessageRetryRunner$TaskInstanceMessage.class */
    public static class TaskInstanceMessage {
        private long taskInstanceId;
        private ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType eventType;
        private ITaskInstanceExecutionEvent event;

        public static TaskInstanceMessage of(long j, ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType taskInstanceExecutionEventType, ITaskInstanceExecutionEvent iTaskInstanceExecutionEvent) {
            TaskInstanceMessage taskInstanceMessage = new TaskInstanceMessage();
            taskInstanceMessage.setTaskInstanceId(j);
            taskInstanceMessage.setEventType(taskInstanceExecutionEventType);
            taskInstanceMessage.setEvent(iTaskInstanceExecutionEvent);
            return taskInstanceMessage;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            TaskInstanceMessage taskInstanceMessage = (TaskInstanceMessage) obj;
            return this.taskInstanceId == taskInstanceMessage.taskInstanceId && this.eventType == taskInstanceMessage.eventType;
        }

        public int hashCode() {
            return Objects.hashCode(new Object[]{Long.valueOf(this.taskInstanceId), this.eventType});
        }

        @Generated
        public TaskInstanceMessage() {
        }

        @Generated
        public long getTaskInstanceId() {
            return this.taskInstanceId;
        }

        @Generated
        public ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType getEventType() {
            return this.eventType;
        }

        @Generated
        public ITaskInstanceExecutionEvent getEvent() {
            return this.event;
        }

        @Generated
        public void setTaskInstanceId(long j) {
            this.taskInstanceId = j;
        }

        @Generated
        public void setEventType(ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType taskInstanceExecutionEventType) {
            this.eventType = taskInstanceExecutionEventType;
        }

        @Generated
        public void setEvent(ITaskInstanceExecutionEvent iTaskInstanceExecutionEvent) {
            this.event = iTaskInstanceExecutionEvent;
        }

        @Generated
        public String toString() {
            return "MessageRetryRunner.TaskInstanceMessage(taskInstanceId=" + getTaskInstanceId() + ", eventType=" + getEventType() + ", event=" + getEvent() + ")";
        }
    }

    protected MessageRetryRunner() {
        super("WorkerMessageRetryRunnerThread");
        this.messageSenderMap = new HashMap();
        this.needToRetryMessages = new ConcurrentHashMap();
    }

    public synchronized void start() {
        log.info("Message retry runner staring");
        this.messageSenders.forEach(taskInstanceExecutionEventSender -> {
            this.messageSenderMap.put(taskInstanceExecutionEventSender.getMessageType(), taskInstanceExecutionEventSender);
            log.info("Injected message sender: {}", taskInstanceExecutionEventSender.getClass().getSimpleName());
        });
        super.start();
        log.info("Message retry runner started");
    }

    public void addRetryMessage(int i, @NonNull ITaskInstanceExecutionEvent iTaskInstanceExecutionEvent) {
        if (iTaskInstanceExecutionEvent == null) {
            throw new NullPointerException("iTaskInstanceExecutionEvent is marked non-null but is null");
        }
        this.needToRetryMessages.computeIfAbsent(Integer.valueOf(i), num -> {
            return Collections.synchronizedList(new ArrayList());
        }).add(TaskInstanceMessage.of(i, iTaskInstanceExecutionEvent.getEventType(), iTaskInstanceExecutionEvent));
    }

    public void removeRetryMessage(int i, @NonNull ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType taskInstanceExecutionEventType) {
        if (taskInstanceExecutionEventType == null) {
            throw new NullPointerException("eventType is marked non-null but is null");
        }
        List<TaskInstanceMessage> list = this.needToRetryMessages.get(Integer.valueOf(i));
        if (list != null) {
            list.remove(TaskInstanceMessage.of(i, taskInstanceExecutionEventType, null));
        }
    }

    public void removeRetryMessages(int i) {
        this.needToRetryMessages.remove(Integer.valueOf(i));
    }

    public boolean updateMessageHost(int i, String str) {
        List<TaskInstanceMessage> list = this.needToRetryMessages.get(Integer.valueOf(i));
        if (CollectionUtils.isEmpty(list)) {
            return false;
        }
        list.forEach(taskInstanceMessage -> {
            taskInstanceMessage.getEvent().setWorkflowInstanceHost(str);
        });
        return true;
    }

    public void run() {
        while (!ServerLifeCycleManager.isStopped()) {
            try {
                if (MapUtils.isEmpty(this.needToRetryMessages)) {
                    Thread.sleep(MESSAGE_RETRY_WINDOW);
                }
                long currentTimeMillis = System.currentTimeMillis();
                Iterator<Map.Entry<Integer, List<TaskInstanceMessage>>> it = this.needToRetryMessages.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Integer, List<TaskInstanceMessage>> next = it.next();
                    Integer key = next.getKey();
                    List<TaskInstanceMessage> value = next.getValue();
                    if (!value.isEmpty()) {
                        LogUtils.setTaskInstanceIdMDC(key);
                        try {
                            try {
                                for (TaskInstanceMessage taskInstanceMessage : value) {
                                    ITaskInstanceExecutionEvent.TaskInstanceExecutionEventType eventType = taskInstanceMessage.getEventType();
                                    ITaskInstanceExecutionEvent event = taskInstanceMessage.getEvent();
                                    if (currentTimeMillis - event.getEventSendTime() > MESSAGE_RETRY_WINDOW) {
                                        log.info("Begin retry send message to master, event: {}", event);
                                        event.setEventSendTime(currentTimeMillis);
                                        this.messageSenderMap.get(eventType).sendEvent(event);
                                        log.info("Success send message to master, event: {}", event);
                                    }
                                }
                                LogUtils.removeTaskInstanceIdMDC();
                            } catch (Exception e) {
                                log.warn("Retry send message to master error", e);
                                LogUtils.removeTaskInstanceIdMDC();
                            }
                        } catch (Throwable th) {
                            LogUtils.removeTaskInstanceIdMDC();
                            throw th;
                            break;
                        }
                    } else {
                        it.remove();
                    }
                }
                Thread.sleep(1000L);
            } catch (InterruptedException e2) {
                log.warn("The message retry thread is interrupted, will break this loop", e2);
                Thread.currentThread().interrupt();
                return;
            } catch (Exception e3) {
                log.error("Retry send message failed, get an known exception.", e3);
            }
        }
    }

    public void clearMessage() {
        this.needToRetryMessages.clear();
    }
}
