package org.apache.apex.engine.plugin;

import com.datatorrent.api.DAG;
import com.datatorrent.stram.StramAppContext;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.webapp.AppInfo;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.Lists;
import com.google.common.collect.Table;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.URI;
import java.util.Collection;
import java.util.Iterator;
import java.util.Set;
import org.apache.apex.api.plugin.Event;
import org.apache.apex.api.plugin.Plugin;
import org.apache.apex.engine.api.plugin.DAGExecutionEvent;
import org.apache.apex.engine.api.plugin.DAGExecutionPlugin;
import org.apache.apex.engine.api.plugin.PluginLocator;
import org.apache.apex.engine.plugin.ApexPluginDispatcher;
import org.apache.commons.lang3.SerializationUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/engine/plugin/AbstractApexPluginDispatcher.class */
public abstract class AbstractApexPluginDispatcher extends AbstractService implements ApexPluginDispatcher {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractApexPluginDispatcher.class);
    protected final Collection<DAGExecutionPlugin> plugins;
    protected final StramAppContext appContext;
    protected final StreamingContainerManager dmgr;
    private final PluginLocator<DAGExecutionPlugin> locator;
    private final AppInfo.AppStats stats;
    protected Configuration launchConfig;
    protected FileContext fileContext;
    protected final Table<DAGExecutionEvent.Type, DAGExecutionPlugin, Plugin.EventHandler<DAGExecutionEvent>> table;
    private volatile DAG clonedDAG;

    /* loaded from: input_file:org/apache/apex/engine/plugin/AbstractApexPluginDispatcher$PluginManagerImpl.class */
    private class PluginManagerImpl extends AbstractDAGExecutionPluginContext<DAGExecutionEvent> {
        private final DAGExecutionPlugin owner;

        PluginManagerImpl(DAGExecutionPlugin dAGExecutionPlugin) {
            super(AbstractApexPluginDispatcher.this.appContext, AbstractApexPluginDispatcher.this.dmgr, AbstractApexPluginDispatcher.this.stats, AbstractApexPluginDispatcher.this.launchConfig);
            this.owner = dAGExecutionPlugin;
        }

        public void register(DAGExecutionEvent.Type type, Plugin.EventHandler<DAGExecutionEvent> eventHandler) {
            AbstractApexPluginDispatcher.this.register(type, eventHandler, this.owner);
        }

        @Override // org.apache.apex.engine.plugin.AbstractDAGExecutionPluginContext, org.apache.apex.engine.api.plugin.DAGExecutionPlugin.Context
        public DAG getDAG() {
            return AbstractApexPluginDispatcher.this.clonedDAG;
        }

        public /* bridge */ /* synthetic */ void register(Event.Type type, Plugin.EventHandler eventHandler) {
            register((DAGExecutionEvent.Type) type, (Plugin.EventHandler<DAGExecutionEvent>) eventHandler);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractApexPluginDispatcher(String str, PluginLocator<DAGExecutionPlugin> pluginLocator, StramAppContext stramAppContext, StreamingContainerManager streamingContainerManager, AppInfo.AppStats appStats) {
        super(str);
        this.plugins = Lists.newArrayList();
        this.table = HashBasedTable.create();
        this.clonedDAG = null;
        this.locator = pluginLocator;
        this.appContext = stramAppContext;
        this.dmgr = streamingContainerManager;
        this.stats = appStats;
        LOG.debug("Creating Plugin Dispatcher service {}", str);
    }

    private Configuration readLaunchConfiguration() throws IOException {
        Path path = new Path(this.appContext.getApplicationPath());
        Path path2 = new Path(path, LogicalPlan.LAUNCH_CONFIG_FILE_NAME);
        try {
            LOG.debug("Reading launch configuration file ");
            URI uri = path.toUri();
            YarnConfiguration yarnConfiguration = new YarnConfiguration();
            this.fileContext = uri.getScheme() == null ? FileContext.getFileContext(yarnConfiguration) : FileContext.getFileContext(uri, yarnConfiguration);
            yarnConfiguration.addResource(this.fileContext.open(path2));
            LOG.debug("Read launch configuration");
            return yarnConfiguration;
        } catch (FileNotFoundException e) {
            LOG.warn("Configuration file not found {}", path2);
            return new Configuration();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void serviceInit(Configuration configuration) throws Exception {
        Set<DAGExecutionPlugin> discoverPlugins;
        super.serviceInit(configuration);
        this.launchConfig = readLaunchConfiguration();
        if (this.locator != null && (discoverPlugins = this.locator.discoverPlugins(this.launchConfig)) != null) {
            this.plugins.addAll(discoverPlugins);
            Iterator<DAGExecutionPlugin> it = discoverPlugins.iterator();
            while (it.hasNext()) {
                LOG.info("Detected plugin {}", it.next());
            }
        }
        for (DAGExecutionPlugin dAGExecutionPlugin : this.plugins) {
            dAGExecutionPlugin.setup(new PluginManagerImpl(dAGExecutionPlugin));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void serviceStop() throws Exception {
        for (DAGExecutionPlugin dAGExecutionPlugin : this.plugins) {
            try {
                dAGExecutionPlugin.teardown();
            } catch (Exception e) {
                LOG.warn("Exception during {} teardown", dAGExecutionPlugin, e);
            }
        }
        super.serviceStop();
    }

    public void register(DAGExecutionEvent.Type type, Plugin.EventHandler<DAGExecutionEvent> eventHandler, DAGExecutionPlugin dAGExecutionPlugin) {
        synchronized (this.table) {
            this.table.put(type, dAGExecutionPlugin, eventHandler);
        }
    }

    protected abstract void dispatchExecutionEvent(DAGExecutionEvent dAGExecutionEvent);

    @Override // org.apache.apex.engine.plugin.ApexPluginDispatcher
    public void dispatch(Event event) {
        if (event.getType() == ApexPluginDispatcher.DAG_CHANGE) {
            this.clonedDAG = SerializationUtils.clone(((ApexPluginDispatcher.DAGChangeEvent) event).dag);
        } else {
            if (this.plugins.isEmpty() || !(event instanceof DAGExecutionEvent)) {
                return;
            }
            dispatchExecutionEvent((DAGExecutionEvent) event);
        }
    }
}
