package org.apache.apex.engine.plugin;

import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.webapp.AppInfo;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.apex.api.plugin.Event;
import org.apache.apex.api.plugin.Plugin;
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.PluginLocator;
import org.apache.hadoop.conf.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/engine/plugin/DefaultApexPluginDispatcher.class */
public class DefaultApexPluginDispatcher extends AbstractApexPluginDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultApexPluginDispatcher.class);
    private int qsize;
    private ArrayBlockingQueue<Runnable> blockingQueue;
    private ExecutorService executorService;

    /* loaded from: input_file:org/apache/apex/engine/plugin/DefaultApexPluginDispatcher$ProcessEventTask.class */
    private class ProcessEventTask<T extends DAGExecutionEvent.Type> implements Runnable {
        private final Event<T> event;

        public ProcessEventTask(Event<T> event) {
            this.event = event;
        }

        @Override // java.lang.Runnable
        public void run() {
            synchronized (DefaultApexPluginDispatcher.this.table) {
                for (Plugin.EventHandler eventHandler : DefaultApexPluginDispatcher.this.table.row(this.event.getType()).values()) {
                    try {
                        eventHandler.handle(this.event);
                    } catch (RuntimeException e) {
                        DefaultApexPluginDispatcher.LOG.warn("Event {} caused exception in handler {}", new Object[]{this.event, eventHandler, e});
                    }
                }
            }
        }
    }

    public DefaultApexPluginDispatcher(PluginLocator pluginLocator, StramAppContext stramAppContext, StreamingContainerManager streamingContainerManager, AppInfo.AppStats appStats) {
        super(pluginLocator, stramAppContext, streamingContainerManager, appStats);
        this.qsize = 4098;
    }

    @Override // org.apache.apex.engine.plugin.AbstractApexPluginDispatcher
    protected void dispatchExecutionEvent(DAGExecutionEvent dAGExecutionEvent) {
        if (this.executorService != null) {
            this.executorService.submit(new ProcessEventTask(dAGExecutionEvent));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.apex.engine.plugin.AbstractApexPluginDispatcher
    public void serviceInit(Configuration configuration) throws Exception {
        super.serviceInit(configuration);
        LOG.debug("Creating plugin dispatch queue with size {}", Integer.valueOf(this.qsize));
        this.blockingQueue = new ArrayBlockingQueue<>(this.qsize);
        this.executorService = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, this.blockingQueue, new NameableThreadFactory("PluginExecutorThread"), new RejectedExecutionHandler() { // from class: org.apache.apex.engine.plugin.DefaultApexPluginDispatcher.1
            @Override // java.util.concurrent.RejectedExecutionHandler
            public void rejectedExecution(Runnable runnable, ThreadPoolExecutor threadPoolExecutor) {
                try {
                    DefaultApexPluginDispatcher.this.blockingQueue.remove();
                    threadPoolExecutor.submit(runnable);
                } catch (NoSuchElementException e) {
                }
            }
        });
    }

    @Override // org.apache.apex.engine.plugin.AbstractApexPluginDispatcher
    protected void serviceStop() throws Exception {
        this.executorService.shutdownNow();
        this.executorService.awaitTermination(10L, TimeUnit.SECONDS);
        if (!this.executorService.isTerminated()) {
            LOG.warn("Executor service still active for plugins");
        }
        this.executorService = null;
    }
}
