package org.apache.gobblin.service.monitoring;

import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.AbstractIdleService;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.flow.MultiHopFlowCompiler;
import org.apache.gobblin.service.modules.flowgraph.BaseFlowGraphHelper;
import org.apache.gobblin.service.modules.flowgraph.FSPathAlterationFlowGraphListener;
import org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor;
import org.apache.gobblin.service.modules.template_catalog.UpdatableFSFlowTemplateCatalog;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.filesystem.PathAlterationObserver;
import org.apache.gobblin.util.filesystem.PathAlterationObserverScheduler;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/monitoring/FsFlowGraphMonitor.class */
public class FsFlowGraphMonitor extends AbstractIdleService implements FlowGraphMonitor {
    public static final String FS_FLOWGRAPH_MONITOR_PREFIX = "gobblin.service.fsFlowGraphMonitor";
    private volatile boolean isActive = false;
    private final long pollingInterval;
    private BaseFlowGraphHelper flowGraphHelper;
    private final PathAlterationObserverScheduler pathAlterationDetector;
    private final FSPathAlterationFlowGraphListener listener;
    private final PathAlterationObserver observer;
    private Path flowGraphPath;
    private Path observedPath;
    private final MultiHopFlowCompiler compiler;
    private final CountDownLatch initComplete;
    private static final Logger log = LoggerFactory.getLogger(FsFlowGraphMonitor.class);
    private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR = "/tmp/fsFlowgraph";
    private static final String DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR = "gobblin-flowgraph";
    private static final long DEFAULT_FLOWGRAPH_POLLING_INTERVAL = 60;
    public static final String MONITOR_TEMPLATE_CATALOG_CHANGES = "monitorTemplateChanges";
    private static final Config DEFAULT_FALLBACK = ConfigFactory.parseMap(ImmutableMap.builder().put("flowGraph.absoluteDirectory", DEFAULT_FS_FLOWGRAPH_MONITOR_ABSOLUTE_DIR).put("flowGraph.configBaseDirectory", DEFAULT_FS_FLOWGRAPH_MONITOR_FLOWGRAPH_DIR).put("flowGraph.pollingInterval", Long.valueOf(DEFAULT_FLOWGRAPH_POLLING_INTERVAL)).put("flowGraph.javaPropsExtensions", "properties").put(MONITOR_TEMPLATE_CATALOG_CHANGES, false).put("flowGraph.hoconFileExtensions", "conf").build());

    public FsFlowGraphMonitor(Config config, Optional<UpdatableFSFlowTemplateCatalog> optional, MultiHopFlowCompiler multiHopFlowCompiler, Map<URI, TopologySpec> map, CountDownLatch countDownLatch, boolean z) throws IOException {
        Config withFallback = config.getConfig(FS_FLOWGRAPH_MONITOR_PREFIX).withFallback(DEFAULT_FALLBACK);
        this.pollingInterval = TimeUnit.SECONDS.toMillis(withFallback.getLong("flowGraph.pollingInterval"));
        this.flowGraphPath = new Path(withFallback.getString("flowGraph.absoluteDirectory"));
        boolean z2 = withFallback.getBoolean(MONITOR_TEMPLATE_CATALOG_CHANGES);
        this.observedPath = z2 ? this.flowGraphPath.getParent() : this.flowGraphPath;
        this.observer = new PathAlterationObserver(this.observedPath);
        try {
            this.flowGraphHelper = (BaseFlowGraphHelper) ConstructorUtils.invokeConstructor(Class.forName(new ClassAliasResolver(BaseFlowGraphHelper.class).resolve(ConfigUtils.getString(config, "gobblin.service.flowGraphHelper.class", BaseFlowGraphHelper.class.getCanonicalName()))), new Object[]{optional, map, this.flowGraphPath.toString(), withFallback.getString("flowGraph.configBaseDirectory"), withFallback.getString("flowGraph.javaPropsExtensions"), withFallback.getString("flowGraph.hoconFileExtensions"), Boolean.valueOf(z), config});
            this.listener = new FSPathAlterationFlowGraphListener(optional, multiHopFlowCompiler, this.flowGraphPath.toString(), this.flowGraphHelper, z2);
            this.compiler = multiHopFlowCompiler;
            this.initComplete = countDownLatch;
            if (this.pollingInterval == -1) {
                this.pathAlterationDetector = null;
            } else {
                this.pathAlterationDetector = new PathAlterationObserverScheduler(this.pollingInterval);
                this.pathAlterationDetector.addPathAlterationObserver(this.listener, Optional.fromNullable(this.observer), this.observedPath);
            }
        } catch (ClassNotFoundException | IllegalAccessException | InstantiationException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException(e);
        }
    }

    protected void startUp() throws IOException {
    }

    @Override // org.apache.gobblin.service.modules.flowgraph.FlowGraphMonitor
    public synchronized void setActive(boolean z) {
        log.info("Setting the flow graph monitor to be " + z + " from " + this.isActive);
        if (this.isActive == z) {
            return;
        }
        if (z) {
            if (this.pathAlterationDetector != null) {
                log.info("Starting the " + getClass().getSimpleName());
                log.info("Polling folder {} with interval {} ", this.observedPath, Long.valueOf(this.pollingInterval));
                try {
                    this.pathAlterationDetector.start();
                    this.compiler.setFlowGraph(this.flowGraphHelper.generateFlowGraph());
                    this.initComplete.countDown();
                    log.info("Finished populating FSFlowgraph");
                } catch (IOException e) {
                    log.error("Could not initialize pathAlterationDetector due to error: ", e);
                }
            } else {
                log.warn("No path alteration detector found");
            }
        }
        this.isActive = z;
    }

    protected void shutDown() throws Exception {
        this.pathAlterationDetector.stop();
    }
}
