package org.apache.inlong.manager.workflow.core.event.task;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.inlong.manager.workflow.core.event.EventListenerManager;
import org.apache.inlong.manager.workflow.core.event.EventListenerNotifier;
import org.apache.inlong.manager.workflow.core.event.LogableEventListener;
import org.apache.inlong.manager.workflow.model.WorkflowContext;
import org.apache.inlong.manager.workflow.model.definition.Task;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/manager/workflow/core/event/task/TaskEventNotifier.class */
public class TaskEventNotifier implements EventListenerNotifier<TaskEvent> {
    private static final Logger log = LoggerFactory.getLogger(TaskEventNotifier.class);
    private final ExecutorService executorService = new ThreadPoolExecutor(20, 20, 0, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), new ThreadFactoryBuilder().setNameFormat("async-task-event-notifier-%s").build(), new ThreadPoolExecutor.CallerRunsPolicy());
    private EventListenerManager<TaskEvent, TaskEventListener> eventListenerManager;

    public TaskEventNotifier(TaskEventListenerManager taskEventListenerManager) {
        this.eventListenerManager = taskEventListenerManager;
    }

    @Override // org.apache.inlong.manager.workflow.core.event.EventListenerNotifier
    public void notify(TaskEvent taskEvent, WorkflowContext workflowContext) {
        WorkflowContext m24clone = workflowContext.m24clone();
        Task task = (Task) m24clone.getCurrentElement();
        this.eventListenerManager.syncListeners(taskEvent).forEach(syncLogableNotify(m24clone));
        task.syncListeners(taskEvent).forEach(syncLogableNotify(m24clone));
        this.eventListenerManager.asyncListeners(taskEvent).forEach(asyncLogableNotify(m24clone));
        task.asyncListeners(taskEvent).forEach(asyncLogableNotify(m24clone));
    }

    @Override // org.apache.inlong.manager.workflow.core.event.EventListenerNotifier
    public void notify(String str, boolean z, WorkflowContext workflowContext) {
        WorkflowContext m24clone = workflowContext.m24clone();
        Optional.ofNullable(this.eventListenerManager.listener(str)).ifPresent(logableNotify(z, m24clone));
        Optional.ofNullable(((Task) m24clone.getCurrentElement()).listener(str)).ifPresent(logableNotify(z, m24clone));
    }

    private Consumer<TaskEventListener> logableNotify(boolean z, WorkflowContext workflowContext) {
        return taskEventListener -> {
            if (z || !taskEventListener.async()) {
                syncLogableNotify(workflowContext).accept(taskEventListener);
            } else {
                asyncLogableNotify(workflowContext).accept(taskEventListener);
            }
        };
    }

    private Consumer<TaskEventListener> asyncLogableNotify(WorkflowContext workflowContext) {
        return taskEventListener -> {
            this.executorService.execute(() -> {
                try {
                    logableEventListener(taskEventListener).listen(workflowContext);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
        };
    }

    private Consumer<TaskEventListener> syncLogableNotify(WorkflowContext workflowContext) {
        return taskEventListener -> {
            logableEventListener(taskEventListener).listen(workflowContext);
        };
    }

    private LogableTaskEventListener logableEventListener(TaskEventListener taskEventListener) {
        return taskEventListener instanceof LogableEventListener ? (LogableTaskEventListener) taskEventListener : new LogableTaskEventListener(taskEventListener, this.eventListenerManager.eventLogStorage());
    }
}
