package org.apache.gobblin.compaction.source;

import com.google.common.base.Function;
import com.google.common.base.Optional;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.gobblin.compaction.mapreduce.MRCompactionTaskFactory;
import org.apache.gobblin.compaction.mapreduce.MRCompactor;
import org.apache.gobblin.compaction.source.CompactionFailedTask;
import org.apache.gobblin.compaction.suite.CompactionSuite;
import org.apache.gobblin.compaction.suite.CompactionSuiteUtils;
import org.apache.gobblin.compaction.verify.CompactionVerifier;
import org.apache.gobblin.config.ConfigBuilder;
import org.apache.gobblin.configuration.SourceState;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.configuration.WorkUnitState;
import org.apache.gobblin.data.management.dataset.DatasetUtils;
import org.apache.gobblin.data.management.dataset.DefaultFileSystemGlobFinder;
import org.apache.gobblin.data.management.dataset.SimpleDatasetRequest;
import org.apache.gobblin.data.management.dataset.SimpleDatasetRequestor;
import org.apache.gobblin.dataset.Dataset;
import org.apache.gobblin.runtime.JobState;
import org.apache.gobblin.runtime.task.FailedTask;
import org.apache.gobblin.runtime.task.TaskUtils;
import org.apache.gobblin.source.WorkUnitStreamSource;
import org.apache.gobblin.source.extractor.Extractor;
import org.apache.gobblin.source.workunit.BasicWorkUnitStream;
import org.apache.gobblin.source.workunit.WorkUnit;
import org.apache.gobblin.source.workunit.WorkUnitStream;
import org.apache.gobblin.util.ClassAliasResolver;
import org.apache.gobblin.util.Either;
import org.apache.gobblin.util.ExecutorsUtils;
import org.apache.gobblin.util.HadoopUtils;
import org.apache.gobblin.util.executors.IteratorExecutor;
import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
import org.apache.gobblin.util.request_allocation.GreedyAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalAllocator;
import org.apache.gobblin.util.request_allocation.HierarchicalPrioritizer;
import org.apache.gobblin.util.request_allocation.RequestAllocator;
import org.apache.gobblin.util.request_allocation.RequestAllocatorConfig;
import org.apache.gobblin.util.request_allocation.RequestAllocatorUtils;
import org.apache.gobblin.util.request_allocation.ResourceEstimator;
import org.apache.gobblin.util.request_allocation.ResourcePool;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTimeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource.class */
public class CompactionSource implements WorkUnitStreamSource<String, String> {
    private static final Logger log = LoggerFactory.getLogger(CompactionSource.class);
    public static final String COMPACTION_INIT_TIME = "compaction.init.time";
    private CompactionSuite suite;
    private Path tmpJobDir;
    private FileSystem fs;
    private RequestAllocator<SimpleDatasetRequest> allocator;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource$CompactionWorkUnitIterator.class */
    public static class CompactionWorkUnitIterator implements Iterator<WorkUnit> {
        private LinkedBlockingDeque<WorkUnit> workUnits = new LinkedBlockingDeque<>();
        private AtomicBoolean isDone = new AtomicBoolean(false);
        private WorkUnit last = null;

        @Override // java.util.Iterator
        public boolean hasNext() {
            while (this.last == null) {
                try {
                    if (this.isDone.get() && this.workUnits.isEmpty()) {
                        return false;
                    }
                    this.last = this.workUnits.poll(1L, TimeUnit.SECONDS);
                } catch (InterruptedException e) {
                    CompactionSource.log.error(e.toString());
                    return false;
                }
            }
            return true;
        }

        public void done() {
            this.isDone.set(true);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public WorkUnit next() {
            if (!hasNext()) {
                throw new NoSuchElementException("work units queue has been exhausted");
            }
            if (this.last == null) {
                throw new IllegalStateException("last variable cannot be empty");
            }
            WorkUnit workUnit = this.last;
            this.last = null;
            return workUnit;
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException("No remove supported on " + getClass().getName());
        }

        protected void addWorkUnit(WorkUnit workUnit) {
            this.workUnits.add(workUnit);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource$DatasetVerificationException.class */
    public static class DatasetVerificationException extends Exception {
        private Dataset dataset;
        private Throwable cause;

        public DatasetVerificationException(Dataset dataset, Throwable th) {
            super("Dataset:" + dataset.datasetURN() + " Exception:" + th);
            this.dataset = dataset;
            this.cause = th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource$DatasetVerifier.class */
    public class DatasetVerifier implements Callable {
        private Dataset dataset;
        private CompactionWorkUnitIterator workUnitIterator;
        private List<CompactionVerifier> verifiers;

        @Override // java.util.concurrent.Callable
        public VerifiedDataset call() throws DatasetVerificationException {
            try {
                VerifiedResult verify = verify(this.dataset);
                if (verify.allVerificationPassed) {
                    this.workUnitIterator.addWorkUnit(CompactionSource.this.createWorkUnit(this.dataset));
                }
                return new VerifiedDataset(this.dataset, verify);
            } catch (Exception e) {
                throw new DatasetVerificationException(this.dataset, e);
            }
        }

        public VerifiedResult verify(Dataset dataset) throws Exception {
            boolean z = true;
            boolean z2 = true;
            String str = "";
            if (this.verifiers != null) {
                Iterator<CompactionVerifier> it = this.verifiers.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    CompactionVerifier next = it.next();
                    CompactionVerifier.Result verify = next.verify(dataset);
                    if (!verify.isSuccessful()) {
                        z = false;
                        str = verify.getFailureReason();
                        if (!next.isRetriable()) {
                            z2 = false;
                            break;
                        }
                    }
                }
            }
            return new VerifiedResult(z, z2, str);
        }

        @ConstructorProperties({"dataset", "workUnitIterator", "verifiers"})
        public DatasetVerifier(Dataset dataset, CompactionWorkUnitIterator compactionWorkUnitIterator, List<CompactionVerifier> list) {
            this.dataset = dataset;
            this.workUnitIterator = compactionWorkUnitIterator;
            this.verifiers = list;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource$SingleWorkUnitGeneratorService.class */
    public class SingleWorkUnitGeneratorService implements Runnable {
        private SourceState state;
        private List<Dataset> datasets;
        private CompactionWorkUnitIterator workUnitIterator;
        private IteratorExecutor executor;

        public SingleWorkUnitGeneratorService(SourceState sourceState, List<Dataset> list, CompactionWorkUnitIterator compactionWorkUnitIterator) {
            this.state = sourceState;
            this.datasets = list;
            this.workUnitIterator = compactionWorkUnitIterator;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Stopwatch createStarted = Stopwatch.createStarted();
                int propAsInt = this.state.getPropAsInt(CompactionVerifier.COMPACTION_VERIFICATION_THREADS, 5);
                long propAsLong = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_TIMEOUT_MINUTES, 30L);
                long propAsLong2 = this.state.getPropAsLong(CompactionVerifier.COMPACTION_VERIFICATION_ITERATION_COUNT_LIMIT, 100L);
                long j = 0;
                HashMap hashMap = null;
                while (this.datasets.size() > 0) {
                    long j2 = j;
                    j = j2 + 1;
                    if (j2 >= propAsLong2) {
                        break;
                    }
                    this.executor = new IteratorExecutor(Iterators.transform(this.datasets.iterator(), new Function<Dataset, Callable<VerifiedDataset>>() { // from class: org.apache.gobblin.compaction.source.CompactionSource.SingleWorkUnitGeneratorService.1
                        public Callable<VerifiedDataset> apply(Dataset dataset) {
                            return new DatasetVerifier(dataset, SingleWorkUnitGeneratorService.this.workUnitIterator, CompactionSource.this.suite.getDatasetsFinderVerifiers());
                        }
                    }), propAsInt, ExecutorsUtils.newThreadFactory(Optional.of(CompactionSource.log), Optional.of("Verifier-compaction-dataset-pool-%d")));
                    ArrayList newArrayList = Lists.newArrayList();
                    hashMap = Maps.newHashMap();
                    for (Either.Right right : this.executor.executeAndGetResults()) {
                        if (right instanceof Either.Right) {
                            DatasetVerificationException datasetVerificationException = (DatasetVerificationException) ((ExecutionException) right.getRight()).getCause();
                            newArrayList.add(datasetVerificationException.dataset);
                            hashMap.put(datasetVerificationException.dataset.getUrn(), ExceptionUtils.getFullStackTrace(datasetVerificationException.cause));
                        } else {
                            VerifiedDataset verifiedDataset = (VerifiedDataset) ((Either.Left) right).getLeft();
                            if (!verifiedDataset.verifiedResult.allVerificationPassed) {
                                if (verifiedDataset.verifiedResult.shouldRetry) {
                                    CompactionSource.log.debug("Dataset {} verification has failure but should retry", verifiedDataset.dataset.datasetURN());
                                    newArrayList.add(verifiedDataset.dataset);
                                    hashMap.put(verifiedDataset.dataset.getUrn(), verifiedDataset.verifiedResult.failedReason);
                                } else {
                                    CompactionSource.log.debug("Dataset {} verification has failure but no need to retry", verifiedDataset.dataset.datasetURN());
                                }
                            }
                        }
                    }
                    this.datasets = CompactionSource.this.prioritize(newArrayList, this.state);
                    if (createStarted.elapsed(TimeUnit.MINUTES) > propAsLong) {
                        break;
                    }
                }
                if (this.datasets.size() > 0) {
                    for (Dataset dataset : this.datasets) {
                        CompactionSource.log.info("{} is timed out and give up the verification, adding a failed task", dataset.datasetURN());
                        this.workUnitIterator.addWorkUnit(CompactionSource.this.createWorkUnitForFailure(dataset, (String) hashMap.get(dataset.getUrn())));
                    }
                }
                this.workUnitIterator.done();
            } catch (RuntimeException e) {
                throw e;
            } catch (Exception e2) {
                throw new RuntimeException(e2);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource$VerifiedDataset.class */
    public static class VerifiedDataset {
        private Dataset dataset;
        private VerifiedResult verifiedResult;

        @ConstructorProperties({"dataset", "verifiedResult"})
        public VerifiedDataset(Dataset dataset, VerifiedResult verifiedResult) {
            this.dataset = dataset;
            this.verifiedResult = verifiedResult;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/gobblin/compaction/source/CompactionSource$VerifiedResult.class */
    public static class VerifiedResult {
        private boolean allVerificationPassed;
        private boolean shouldRetry;
        private String failedReason;

        @ConstructorProperties({"allVerificationPassed", "shouldRetry", "failedReason"})
        public VerifiedResult(boolean z, boolean z2, String str) {
            this.allVerificationPassed = z;
            this.shouldRetry = z2;
            this.failedReason = str;
        }
    }

    public List<WorkUnit> getWorkunits(SourceState sourceState) {
        throw new UnsupportedOperationException("Please use getWorkunitStream");
    }

    public WorkUnitStream getWorkunitStream(SourceState sourceState) {
        try {
            this.fs = HadoopUtils.getSourceFileSystem(sourceState);
            List<Dataset> findDatasets = DatasetUtils.instantiateDatasetFinder(sourceState.getProperties(), this.fs, DefaultFileSystemGlobFinder.class.getName(), new Object[0]).findDatasets();
            CompactionWorkUnitIterator compactionWorkUnitIterator = new CompactionWorkUnitIterator();
            if (findDatasets.size() == 0) {
                return new BasicWorkUnitStream.Builder(compactionWorkUnitIterator).build();
            }
            initCompactionSource(sourceState);
            new Thread(new SingleWorkUnitGeneratorService(sourceState, prioritize(findDatasets, sourceState), compactionWorkUnitIterator), "SingleWorkUnitGeneratorService").start();
            return new BasicWorkUnitStream.Builder(compactionWorkUnitIterator).build();
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initCompactionSource(SourceState sourceState) throws IOException {
        sourceState.setProp(COMPACTION_INIT_TIME, Long.valueOf(DateTimeUtils.currentTimeMillis()));
        this.suite = CompactionSuiteUtils.getCompactionSuiteFactory(sourceState).createSuite(sourceState);
        initRequestAllocator(sourceState);
        initJobDir(sourceState);
        copyJarDependencies(sourceState);
        optionalInit(sourceState);
    }

    protected void optionalInit(SourceState sourceState) {
    }

    private void initRequestAllocator(State state) {
        try {
            RequestAllocatorConfig.Builder withLimitedScopeConfig = RequestAllocatorConfig.builder((ResourceEstimator) GobblinConstructorUtils.invokeLongestConstructor(new ClassAliasResolver(ResourceEstimator.class).resolveClass(state.getProp("compaction.prioritization.estimator", SimpleDatasetRequest.SimpleDatasetCountEstimator.class.getName())), new Object[0])).allowParallelization(1).withLimitedScopeConfig(ConfigBuilder.create().loadProps(state.getProperties(), "compaction.prioritization.").build());
            if (!state.contains("compaction.prioritization.prioritizerAlias")) {
                this.allocator = new GreedyAllocator(withLimitedScopeConfig.build());
                return;
            }
            Comparator comparator = (Comparator) GobblinConstructorUtils.invokeLongestConstructor(new ClassAliasResolver(Comparator.class).resolveClass(state.getProp("compaction.prioritization.prioritizerAlias")), new Object[]{state});
            withLimitedScopeConfig.withPrioritizer(comparator);
            if (comparator instanceof HierarchicalPrioritizer) {
                this.allocator = new HierarchicalAllocator.Factory().createRequestAllocator(withLimitedScopeConfig.build());
            } else {
                this.allocator = RequestAllocatorUtils.inferFromConfig(withLimitedScopeConfig.build());
            }
        } catch (RuntimeException e) {
            throw e;
        } catch (Exception e2) {
            throw new RuntimeException("Cannot initialize allocator", e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public List<Dataset> prioritize(List<Dataset> list, State state) {
        return Lists.newArrayList(Iterators.transform(this.allocator.allocateRequests(list.stream().map(SimpleDatasetRequestor::new).iterator(), ResourcePool.builder().maxResource("count", Double.valueOf(state.getPropAsDouble(MRCompactor.COMPACTION_DATASETS_MAX_COUNT, 1.0E9d))).build()), simpleDatasetRequest -> {
            return simpleDatasetRequest.getDataset();
        }));
    }

    protected WorkUnit createWorkUnit(Dataset dataset) throws IOException {
        State workUnit = new WorkUnit();
        TaskUtils.setTaskFactoryClass(workUnit, MRCompactionTaskFactory.class);
        this.suite.save(dataset, workUnit);
        workUnit.setProp("dataset.urn", dataset.getUrn());
        return workUnit;
    }

    protected WorkUnit createWorkUnitForFailure(Dataset dataset) throws IOException {
        State failedWorkUnit = new FailedTask.FailedWorkUnit();
        TaskUtils.setTaskFactoryClass(failedWorkUnit, CompactionFailedTask.CompactionFailedTaskFactory.class);
        failedWorkUnit.setProp("dataset.urn", dataset.getUrn());
        this.suite.save(dataset, failedWorkUnit);
        return failedWorkUnit;
    }

    protected WorkUnit createWorkUnitForFailure(Dataset dataset, String str) throws IOException {
        State failedWorkUnit = new FailedTask.FailedWorkUnit();
        failedWorkUnit.setProp(CompactionVerifier.COMPACTION_VERIFICATION_FAIL_REASON, str);
        TaskUtils.setTaskFactoryClass(failedWorkUnit, CompactionFailedTask.CompactionFailedTaskFactory.class);
        failedWorkUnit.setProp("dataset.urn", dataset.getUrn());
        this.suite.save(dataset, failedWorkUnit);
        return failedWorkUnit;
    }

    public Extractor getExtractor(WorkUnitState workUnitState) throws IOException {
        throw new UnsupportedOperationException();
    }

    public void shutdown(SourceState sourceState) {
        try {
            log.info("Job dir is removed from {} with status {}", this.tmpJobDir, Boolean.valueOf(this.fs.delete(this.tmpJobDir, true)));
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void initJobDir(SourceState sourceState) throws IOException {
        this.tmpJobDir = new Path(sourceState.getProp(MRCompactor.COMPACTION_TMP_DEST_DIR, MRCompactor.DEFAULT_COMPACTION_TMP_DEST_DIR), sourceState instanceof JobState ? ((JobState) sourceState).getJobId() : UUID.randomUUID().toString());
        this.fs.mkdirs(this.tmpJobDir);
        sourceState.setProp(MRCompactor.COMPACTION_JOB_DIR, this.tmpJobDir.toString());
        log.info("Job dir is created under {}", this.tmpJobDir);
    }

    private void copyJarDependencies(State state) throws IOException {
        if (this.tmpJobDir == null) {
            throw new RuntimeException("Job directory is not created");
        }
        if (state.contains("job.jars")) {
            LocalFileSystem local = FileSystem.getLocal(HadoopUtils.getConfFromState(state));
            Path path = new Path(this.tmpJobDir, MRCompactor.COMPACTION_JAR_SUBDIR);
            this.fs.mkdirs(path);
            state.setProp(MRCompactor.COMPACTION_JARS, path.toString());
            Iterator it = state.getPropAsList("job.jars").iterator();
            while (it.hasNext()) {
                for (FileStatus fileStatus : local.globStatus(new Path((String) it.next()))) {
                    Path path2 = new Path(this.fs.makeQualified(path), fileStatus.getPath().getName());
                    this.fs.copyFromLocalFile(fileStatus.getPath(), path2);
                    log.info(String.format("%s will be added to classpath", path2));
                }
            }
        }
    }
}
