package org.apache.gobblin.data.management.copy.replication;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterators;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.gobblin.config.client.ConfigClient;
import org.apache.gobblin.config.client.api.ConfigStoreFactoryDoesNotExistsException;
import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
import org.apache.gobblin.config.store.api.ConfigStoreCreationException;
import org.apache.gobblin.config.store.api.VersionDoesNotExistException;
import org.apache.gobblin.data.management.copy.CopySource;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.dataset.DatasetsFinder;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.PathUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/data/management/copy/replication/ConfigBasedDatasetsFinder.class */
public abstract class ConfigBasedDatasetsFinder implements DatasetsFinder {
    private static final Logger log = LoggerFactory.getLogger(ConfigBasedDatasetsFinder.class);
    public static final String GOBBLIN_CONFIG_STORE_WHITELIST_TAG = "gobblin.configBased.whitelist.tag";
    public static final String GOBBLIN_CONFIG_STORE_BLACKLIST_TAGS = "gobblin.configBased.blacklist.tags";
    public static final String GOBBLIN_CONFIG_STORE_DATASET_COMMON_ROOT = "gobblin.configBased.dataset.common.root";
    public static final String JOB_LEVEL_BLACKLIST = "gobblin.copy.configBased.blacklist";
    public static final String WATERMARK_ENABLE = "gobblin.copy.configBased.watermark.enabled";
    protected final String storeRoot;
    protected final Path commonRoot;
    protected final Path whitelistTag;
    protected final Optional<List<Path>> blacklistTags;
    protected final ConfigClient configClient;
    protected final Properties props;
    private final int threadPoolSize;
    private final Optional<List<String>> blacklistPatterns;

    public ConfigBasedDatasetsFinder(FileSystem fileSystem, Properties properties) throws IOException {
        Preconditions.checkArgument(properties.containsKey("gobblin.config.management.store.uri"), "missing required config entery gobblin.config.management.store.uri");
        Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_STORE_WHITELIST_TAG), "missing required config entery gobblin.configBased.whitelist.tag");
        Preconditions.checkArgument(properties.containsKey(GOBBLIN_CONFIG_STORE_DATASET_COMMON_ROOT), "missing required config entery gobblin.configBased.dataset.common.root");
        this.storeRoot = properties.getProperty("gobblin.config.management.store.uri");
        this.commonRoot = PathUtils.mergePaths(new Path(this.storeRoot), new Path(properties.getProperty(GOBBLIN_CONFIG_STORE_DATASET_COMMON_ROOT)));
        this.whitelistTag = PathUtils.mergePaths(new Path(this.storeRoot), new Path(properties.getProperty(GOBBLIN_CONFIG_STORE_WHITELIST_TAG)));
        this.threadPoolSize = properties.containsKey(CopySource.MAX_CONCURRENT_LISTING_SERVICES) ? Integer.parseInt(properties.getProperty(CopySource.MAX_CONCURRENT_LISTING_SERVICES)) : 20;
        if (properties.containsKey(GOBBLIN_CONFIG_STORE_BLACKLIST_TAGS)) {
            List splitToList = Splitter.on(",").omitEmptyStrings().splitToList(properties.getProperty(GOBBLIN_CONFIG_STORE_BLACKLIST_TAGS));
            ArrayList arrayList = new ArrayList();
            Iterator it = splitToList.iterator();
            while (it.hasNext()) {
                arrayList.add(PathUtils.mergePaths(new Path(this.storeRoot), new Path((String) it.next())));
            }
            this.blacklistTags = Optional.of(arrayList);
        } else {
            this.blacklistTags = Optional.absent();
        }
        this.configClient = ConfigClient.createConfigClient(VersionStabilityPolicy.WEAK_LOCAL_STABILITY);
        this.props = properties;
        if (this.props.containsKey(JOB_LEVEL_BLACKLIST)) {
            this.blacklistPatterns = Optional.of(Splitter.on(",").omitEmptyStrings().splitToList(this.props.getProperty(JOB_LEVEL_BLACKLIST)));
        } else {
            this.blacklistPatterns = Optional.absent();
        }
    }

    protected Set<URI> getValidDatasetURIs(Path path) {
        HashSet hashSet = new HashSet();
        try {
            Collection importedBy = this.configClient.getImportedBy(new URI(this.whitelistTag.toString()), true);
            enhanceDisabledURIsWithBlackListTag(hashSet);
            return getValidDatasetURIsHelper(importedBy, hashSet, path);
        } catch (ConfigStoreFactoryDoesNotExistsException | ConfigStoreCreationException | URISyntaxException e) {
            log.error("Caught error while getting all the datasets URIs " + e.getMessage());
            throw new RuntimeException((Throwable) e);
        }
    }

    protected static Set<URI> getValidDatasetURIsHelper(Collection<URI> collection, Set<URI> set, Path path) {
        if (collection == null || collection.isEmpty()) {
            return ImmutableSet.of();
        }
        Comparator<URI> comparator = new Comparator<URI>() { // from class: org.apache.gobblin.data.management.copy.replication.ConfigBasedDatasetsFinder.1
            @Override // java.util.Comparator
            public int compare(URI uri, URI uri2) {
                return uri.getPath().length() - uri2.getPath().length();
            }
        };
        ArrayList<URI> arrayList = new ArrayList(collection);
        Collections.sort(arrayList, comparator);
        TreeSet treeSet = new TreeSet();
        HashSet hashSet = new HashSet();
        for (URI uri : arrayList) {
            if (PathUtils.isAncestor(path, new Path(uri.getPath()))) {
                URI uri2 = (URI) treeSet.floor(uri);
                if (uri2 != null && PathUtils.isAncestor(new Path(uri2.getPath()), new Path(uri.getPath()))) {
                    hashSet.add(uri2);
                }
                treeSet.add(uri);
            }
        }
        HashSet hashSet2 = new HashSet();
        Iterator it = treeSet.iterator();
        while (it.hasNext()) {
            URI uri3 = (URI) it.next();
            if (!hashSet.contains(uri3)) {
                hashSet2.add(uri3);
            }
        }
        for (URI uri4 : set) {
            if (hashSet2.remove(uri4)) {
                log.info("skip disabled dataset " + uri4);
            } else {
                log.info("There's no URI " + uri4 + " available in validURISet.");
            }
        }
        return hashSet2;
    }

    private void enhanceDisabledURIsWithBlackListTag(Set<URI> set) throws URISyntaxException, ConfigStoreFactoryDoesNotExistsException, ConfigStoreCreationException, VersionDoesNotExistException {
        if (this.blacklistTags.isPresent()) {
            Iterator it = ((List) this.blacklistTags.get()).iterator();
            while (it.hasNext()) {
                set.addAll(this.configClient.getImportedBy(new URI(((Path) it.next()).toString()), true));
            }
        }
    }

    public Path commonDatasetRoot() {
        return this.commonRoot;
    }

    public List<Dataset> findDatasets() throws IOException {
        Set<URI> validDatasetURIs = getValidDatasetURIs(this.commonRoot);
        if (validDatasetURIs.isEmpty()) {
            return ImmutableList.of();
        }
        final CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        executeItertorExecutor(Iterators.transform(validDatasetURIs.iterator(), new Function<URI, Callable<Void>>() { // from class: org.apache.gobblin.data.management.copy.replication.ConfigBasedDatasetsFinder.2
            public Callable<Void> apply(URI uri) {
                return ConfigBasedDatasetsFinder.this.findDatasetsCallable(ConfigBasedDatasetsFinder.this.configClient, uri, ConfigBasedDatasetsFinder.this.props, ConfigBasedDatasetsFinder.this.blacklistPatterns, copyOnWriteArrayList);
            }
        }));
        log.info("found {} datasets in ConfigBasedDatasetsFinder", Integer.valueOf(copyOnWriteArrayList.size()));
        return copyOnWriteArrayList;
    }

    protected void executeItertorExecutor(Iterator<Callable<Void>> it) throws IOException {
        try {
            IteratorExecutor.logFailures(new IteratorExecutor(it, this.threadPoolSize, ExecutorsUtils.newDaemonThreadFactory(Optional.of(log), Optional.of(getClass().getSimpleName()))).executeAndGetResults(), log, 10);
        } catch (InterruptedException e) {
            throw new IOException("Dataset finder is interrupted.", e);
        }
    }

    private URI datasetURNtoURI(String str) {
        try {
            return new URI(PathUtils.mergePaths(new Path(this.storeRoot), new Path(str)).toString());
        } catch (URISyntaxException e) {
            log.error("Dataset with URN:" + str + " cannot be converted into URI. Skip the dataset");
            return null;
        }
    }

    protected abstract Callable<Void> findDatasetsCallable(ConfigClient configClient, URI uri, Properties properties, Optional<List<String>> optional, Collection<Dataset> collection);
}
