package org.apache.gobblin.service.modules.flowgraph;

import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.collect.Sets;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigValueFactory;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.ContextAwareMeter;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.TopologySpec;
import org.apache.gobblin.service.modules.template_catalog.FSFlowTemplateCatalog;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.util.PullFileLoader;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/service/modules/flowgraph/BaseFlowGraphHelper.class */
public class BaseFlowGraphHelper {
    private static final Logger log = LoggerFactory.getLogger(BaseFlowGraphHelper.class);
    private static final int NODE_FILE_DEPTH = 3;
    private static final int EDGE_FILE_DEPTH = 4;
    private static final String FLOW_EDGE_LABEL_JOINER_CHAR = "_";
    final String baseDirectory;
    private final Config emptyConfig = ConfigFactory.empty();
    private final Optional<? extends FSFlowTemplateCatalog> flowTemplateCatalog;
    private final Map<URI, TopologySpec> topologySpecMap;
    protected MetricContext metricContext;
    final String flowGraphFolderName;
    final PullFileLoader pullFileLoader;
    protected final Set<String> javaPropsExtensions;
    protected final Set<String> hoconFileExtensions;
    protected final Optional<ContextAwareMeter> flowGraphUpdateFailedMeter;

    public BaseFlowGraphHelper(Optional<? extends FSFlowTemplateCatalog> optional, Map<URI, TopologySpec> map, String str, String str2, String str3, String str4, boolean z, Config config) {
        this.flowTemplateCatalog = optional;
        this.topologySpecMap = map;
        this.baseDirectory = str;
        this.flowGraphFolderName = str2;
        Path path = new Path(str, this.flowGraphFolderName);
        this.javaPropsExtensions = Sets.newHashSet(str3.split(","));
        this.hoconFileExtensions = Sets.newHashSet(str4.split(","));
        if (z) {
            this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), BaseFlowGraphHelper.class);
            this.flowGraphUpdateFailedMeter = Optional.of(this.metricContext.contextAwareMeter("GobblinService.FlowgraphUpdateFailed"));
        } else {
            this.flowGraphUpdateFailedMeter = Optional.absent();
        }
        try {
            this.pullFileLoader = new PullFileLoader(path, FileSystem.get(URI.create("file:///"), new Configuration()), this.javaPropsExtensions, this.hoconFileExtensions);
        } catch (IOException e) {
            throw new RuntimeException("Could not create pull file loader", e);
        }
    }

    protected void addDataNode(FlowGraph flowGraph, java.nio.file.Path path) {
        if (Files.isDirectory(path, new LinkOption[0]) || !checkFilePath(path.toString(), getNodeFileDepth())) {
            return;
        }
        try {
            Config loadNodeFileWithOverrides = loadNodeFileWithOverrides(new Path(this.baseDirectory, path.toString()));
            DataNode dataNode = (DataNode) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(ConfigUtils.getString(loadNodeFileWithOverrides, FlowGraphConfigurationKeys.DATA_NODE_CLASS, FlowGraphConfigurationKeys.DEFAULT_DATA_NODE_CLASS)), new Object[]{loadNodeFileWithOverrides});
            if (flowGraph.addDataNode(dataNode)) {
                log.info("Added Datanode {} to FlowGraph", dataNode.getId());
            } else {
                log.warn("Could not add DataNode {} to FlowGraph; skipping", dataNode.getId());
            }
        } catch (Exception e) {
            if (this.flowGraphUpdateFailedMeter.isPresent()) {
                ((ContextAwareMeter) this.flowGraphUpdateFailedMeter.get()).mark();
            }
            log.warn(String.format("Could not add DataNode defined in %s due to exception: ", path), e);
        }
    }

    protected void addFlowEdge(FlowGraph flowGraph, java.nio.file.Path path) {
        if (Files.isDirectory(path, new LinkOption[0]) || !checkFilePath(path.toString(), getEdgeFileDepth())) {
            return;
        }
        try {
            Config loadEdgeFileWithOverrides = loadEdgeFileWithOverrides(new Path(this.baseDirectory, path.toString()));
            List<SpecExecutor> specExecutors = getSpecExecutors(loadEdgeFileWithOverrides);
            FlowEdgeFactory flowEdgeFactory = (FlowEdgeFactory) GobblinConstructorUtils.invokeLongestConstructor(Class.forName(ConfigUtils.getString(loadEdgeFileWithOverrides, FlowGraphConfigurationKeys.FLOW_EDGE_FACTORY_CLASS, FlowGraphConfigurationKeys.DEFAULT_FLOW_EDGE_FACTORY_CLASS)), new Object[]{loadEdgeFileWithOverrides});
            if (this.flowTemplateCatalog.isPresent()) {
                FlowEdge createFlowEdge = flowEdgeFactory.createFlowEdge(loadEdgeFileWithOverrides, (FSFlowTemplateCatalog) this.flowTemplateCatalog.get(), specExecutors);
                if (flowGraph.addFlowEdge(createFlowEdge)) {
                    log.info("Added edge {} to FlowGraph", createFlowEdge.getId());
                } else {
                    log.warn("Could not add edge {} to FlowGraph; skipping", createFlowEdge.getId());
                }
            } else {
                log.warn("Could not add edge defined in {} to FlowGraph as FlowTemplateCatalog is absent", path);
            }
        } catch (Exception e) {
            log.warn("Could not add edge defined in {} due to exception", path, e);
            if (this.flowGraphUpdateFailedMeter.isPresent()) {
                ((ContextAwareMeter) this.flowGraphUpdateFailedMeter.get()).mark();
            }
        }
    }

    protected boolean checkFilePath(String str, int i) {
        Path path = new Path(str);
        String fileExtension = com.google.common.io.Files.getFileExtension(path.getName());
        if (checkFileLevelRelativeToRoot(path, i) && (this.javaPropsExtensions.contains(fileExtension) || this.hoconFileExtensions.contains(fileExtension))) {
            return true;
        }
        log.warn("Changed file does not conform to directory structure and file name format, skipping: " + path);
        return false;
    }

    public boolean checkFileLevelRelativeToRoot(Path path, int i) {
        if (path == null) {
            return false;
        }
        Path path2 = path;
        for (int i2 = 0; i2 < i - 1; i2++) {
            path2 = path2.getParent();
        }
        if (path2 != null) {
            return path2.getName().equals(this.flowGraphFolderName);
        }
        return false;
    }

    protected Config getNodeConfigWithOverrides(Config config, Path path) {
        return config.withValue(FlowGraphConfigurationKeys.DATA_NODE_ID_KEY, ConfigValueFactory.fromAnyRef(path.getParent().getName()));
    }

    protected Config getEdgeConfigWithOverrides(Config config, Path path) {
        String name = path.getParent().getParent().getName();
        String name2 = path.getParent().getName();
        return config.withValue(FlowGraphConfigurationKeys.FLOW_EDGE_SOURCE_KEY, ConfigValueFactory.fromAnyRef(name)).withValue(FlowGraphConfigurationKeys.FLOW_EDGE_DESTINATION_KEY, ConfigValueFactory.fromAnyRef(name2)).withValue(FlowGraphConfigurationKeys.FLOW_EDGE_ID_KEY, ConfigValueFactory.fromAnyRef(getEdgeId(name, name2, com.google.common.io.Files.getNameWithoutExtension(path.getName()))));
    }

    private List<SpecExecutor> getSpecExecutors(Config config) throws URISyntaxException, IOException {
        List stringList = ConfigUtils.getStringList(config, FlowGraphConfigurationKeys.FLOW_EDGE_SPEC_EXECUTORS_KEY);
        ArrayList arrayList = new ArrayList(stringList.size());
        Iterator it = stringList.iterator();
        while (it.hasNext()) {
            URI uri = new URI((String) it.next());
            if (!this.topologySpecMap.containsKey(uri)) {
                throw new IOException(String.format("Spec executor %s does not exist in the topologySpecStore.", uri));
            }
            arrayList.add(this.topologySpecMap.get(uri).getSpecExecutor());
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Config loadNodeFileWithOverrides(Path path) throws IOException {
        return getNodeConfigWithOverrides(this.pullFileLoader.loadPullFile(path, this.emptyConfig, false, false), path);
    }

    protected Config loadEdgeFileWithOverrides(Path path) throws IOException {
        return getEdgeConfigWithOverrides(this.pullFileLoader.loadPullFile(path, this.emptyConfig, false, false), path);
    }

    public FlowGraph generateFlowGraph() {
        BaseFlowGraph baseFlowGraph = new BaseFlowGraph();
        java.nio.file.Path path = new File(this.baseDirectory).toPath();
        try {
            ArrayList arrayList = new ArrayList();
            Files.walk(path, new FileVisitOption[0]).forEach(path2 -> {
                if (checkFileLevelRelativeToRoot(new Path(path2.toString()), getNodeFileDepth())) {
                    addDataNode(baseFlowGraph, path2);
                } else if (checkFileLevelRelativeToRoot(new Path(path2.toString()), getEdgeFileDepth())) {
                    arrayList.add(path2);
                }
            });
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                addFlowEdge(baseFlowGraph, (java.nio.file.Path) it.next());
            }
            return baseFlowGraph;
        } catch (IOException e) {
            if (this.flowGraphUpdateFailedMeter.isPresent()) {
                ((ContextAwareMeter) this.flowGraphUpdateFailedMeter.get()).mark();
            }
            log.error(String.format("Error while populating file based flowgraph at path %s", path), e);
            return null;
        }
    }

    public String getEdgeId(String str, String str2, String str3) {
        return Joiner.on("_").join(str, str2, new Object[]{str3});
    }

    protected int getNodeFileDepth() {
        return NODE_FILE_DEPTH;
    }

    protected int getEdgeFileDepth() {
        return EDGE_FILE_DEPTH;
    }
}
