package gobblin.compaction.mapreduce;

import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import gobblin.compaction.Compactor;
import gobblin.compaction.Dataset;
import gobblin.compaction.event.CompactionSlaEventHelper;
import gobblin.compaction.mapreduce.MRCompactorJobPropCreator;
import gobblin.compaction.mapreduce.MRCompactorJobRunner;
import gobblin.compaction.verify.DataCompletenessVerifier;
import gobblin.configuration.State;
import gobblin.metrics.GobblinMetrics;
import gobblin.metrics.MetricContext;
import gobblin.metrics.Tag;
import gobblin.metrics.event.EventSubmitter;
import gobblin.metrics.event.sla.SlaEventSubmitter;
import gobblin.util.DatasetFilterUtils;
import gobblin.util.ExecutorsUtils;
import gobblin.util.HadoopUtils;
import gobblin.util.TagUtils;
import gobblin.util.recordcount.CompactionRecordCountProvider;
import gobblin.util.recordcount.IngestionRecordCountProvider;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactor.class */
public class MRCompactor implements Compactor {
    public static final String COMPACTION_PREFIX = "compaction.";
    public static final String COMPACTION_THREAD_POOL_SIZE = "compaction.thread.pool.size";
    public static final int DEFAULT_COMPACTION_THREAD_POOL_SIZE = 30;
    public static final String COMPACTION_INPUT_DIR = "compaction.input.dir";
    public static final String COMPACTION_INPUT_SUBDIR = "compaction.input.subdir";
    public static final String DEFAULT_COMPACTION_INPUT_SUBDIR = "hourly";
    public static final String COMPACTION_DEST_DIR = "compaction.dest.dir";
    public static final String COMPACTION_DEST_SUBDIR = "compaction.dest.subdir";
    public static final String DEFAULT_COMPACTION_DEST_SUBDIR = "daily";
    public static final String COMPACTION_TMP_DEST_DIR = "compaction.tmp.dest.dir";
    public static final String DEFAULT_COMPACTION_TMP_DEST_DIR = "/tmp/gobblin-compaction";
    public static final String COMPACTION_LATE_DIR_SUFFIX = "_late";
    public static final String COMPACTION_BLACKLIST = "compaction.blacklist";
    public static final String COMPACTION_WHITELIST = "compaction.whitelist";
    public static final String COMPACTION_HIGH_PRIORITY_TOPICS = "compaction.high.priority.topics";
    public static final String COMPACTION_NORMAL_PRIORITY_TOPICS = "compaction.normal.priority.topics";
    public static final String COMPACTION_JOBPROPS_CREATOR_CLASS = "compaction.jobprops.creator.class";
    public static final String DEFAULT_COMPACTION_JOBPROPS_CREATOR_CLASS = "gobblin.compaction.mapreduce.MRCompactorTimeBasedJobPropCreator";
    public static final String COMPACTION_JOB_RUNNER_CLASS = "compaction.job.runner.class";
    public static final String DEFAULT_COMPACTION_JOB_RUNNER_CLASS = "gobblin.compaction.mapreduce.avro.MRCompactorAvroKeyDedupJobRunner";
    public static final String COMPACTION_TIMEZONE = "compaction.timezone";
    public static final String DEFAULT_COMPACTION_TIMEZONE = "America/Los_Angeles";
    public static final String COMPACTION_FILE_SYSTEM_URI = "compaction.file.system.uri";
    public static final String COMPACTION_MR_JOB_TIMEOUT_MINUTES = "compaction.mr.job.timeout.minutes";
    public static final long DEFAULT_COMPACTION_MR_JOB_TIMEOUT_MINUTES = Long.MAX_VALUE;
    public static final String COMPACTION_INPUT_RECORD_COUNT_PROVIDER = "compaction.input.record.count.provider";
    public static final String COMPACTION_OUTPUT_RECORD_COUNT_PROVIDER = "compaction.output.record.count.provider";
    public static final String COMPACTION_RECOMPACT_FROM_INPUT_FOR_LATE_DATA = "compaction.recompact.from.input.for.late.data";
    public static final boolean DEFAULT_COMPACTION_RECOMPACT_FROM_INPUT_FOR_LATE_DATA = false;
    public static final String COMPACTION_INPUT_DEDUPLICATED = "compaction.input.deduplicated";
    public static final boolean DEFAULT_COMPACTION_INPUT_DEDUPLICATED = false;
    public static final String COMPACTION_OUTPUT_DEDUPLICATED = "compaction.output.deduplicated";
    public static final boolean DEFAULT_COMPACTION_OUTPUT_DEDUPLICATED = true;
    public static final String COMPACTION_COMPLETENESS_VERIFICATION_PREFIX = "compaction.completeness.verification.";
    public static final String COMPACTION_RECOMPACT_FROM_DEST_PATHS = "compaction.recompact.from.dest.paths";
    public static final boolean DEFAULT_COMPACTION_RECOMPACT_FROM_DEST_PATHS = false;
    public static final String COMPACTION_COMPLETENESS_VERIFICATION_BLACKLIST = "compaction.completeness.verification.blacklist";
    public static final String COMPACTION_COMPLETENESS_VERIFICATION_WHITELIST = "compaction.completeness.verification.whitelist";
    public static final String COMPACTION_VERIFICATION_TIMEOUT_MINUTES = "compaction.completeness.verification.timeout.minutes";
    public static final long DEFAULT_COMPACTION_VERIFICATION_TIMEOUT_MINUTES = 30;
    public static final String COMPACTION_COMPLETENESS_VERIFICATION_ENABLED = "compaction.completeness.verification.enabled";
    public static final boolean DEFAULT_COMPACTION_COMPLETENESS_VERIFICATION_ENABLED = false;
    public static final String COMPACTION_COMPLETENESS_VERIFICATION_NUM_DATASETS_VERIFIED_TOGETHER = "compaction.completeness.verification.num.datasets.verified.together";
    public static final int DEFAULT_COMPACTION_COMPLETENESS_VERIFICATION_NUM_DATASETS_VERIFIED_TOGETHER = 10;
    public static final String COMPACTION_COMPLETENESS_VERIFICATION_PUBLISH_DATA_IF_CANNOT_VERIFY = "compaction.completeness.verification.publish.data.if.cannot.verify";
    public static final boolean DEFAULT_COMPACTION_COMPLETENESS_VERIFICATION_PUBLISH_DATA_IF_CANNOT_VERIFY = false;
    public static final String COMPACTION_SHOULD_DEDUPLICATE = "compaction.should.deduplicate";
    public static final String COMPACTION_JOB_DEST_PARTITION = "compaction.job.dest.partition";
    public static final String COMPACTION_ENABLE_SUCCESS_FILE = "compaction.fileoutputcommitter.marksuccessfuljobs";
    public static final String COMPACTION_JOB_LATE_DATA_MOVEMENT_TASK = "compaction.job.late.data.movement.task";
    public static final String COMPACTION_JOB_LATE_DATA_FILES = "compaction.job.late.data.files";
    public static final String COMPACTION_COMPLETE_FILE_NAME = "_COMPACTION_COMPLETE";
    public static final String COMPACTION_LATE_FILES_DIRECTORY = "late";
    public static final String COMPACTION_JARS = "compaction.jars";
    public static final String COMPACTION_TRACKING_EVENTS_NAMESPACE = "compaction.tracking.events";
    private static final double HIGH_PRIORITY = 3.0d;
    private static final double NORMAL_PRIORITY = 2.0d;
    private static final double LOW_PRIORITY = 1.0d;
    private static final long COMPACTION_JOB_WAIT_INTERVAL_SECONDS = 10;
    private final State state = new State();
    private final Configuration conf;
    private final String inputDir;
    private final String inputSubDir;
    private final String inputLateSubDir;
    private final String destDir;
    private final String destSubDir;
    private final String destLateSubDir;
    private final String tmpOutputDir;
    private final FileSystem fs;
    private final JobRunnerExecutor jobExecutor;
    private final Set<Dataset> datasets;
    private final Map<Dataset, MRCompactorJobRunner> jobRunnables;
    private final Closer closer;
    private final Optional<DataCompletenessVerifier> verifier;
    private final Stopwatch stopwatch;
    private final GobblinMetrics gobblinMetrics;
    private final EventSubmitter eventSubmitter;
    private final long dataVerifTimeoutMinutes;
    private final long compactionTimeoutMinutes;
    private final boolean shouldVerifDataCompl;
    private final boolean shouldPublishDataIfCannotVerifyCompl;
    private static final Logger LOG = LoggerFactory.getLogger(MRCompactor.class);
    public static final String DEFAULT_COMPACTION_INPUT_RECORD_COUNT_PROVIDER = IngestionRecordCountProvider.class.getName();
    public static final String DEFAULT_COMPACTION_OUTPUT_RECORD_COUNT_PROVIDER = CompactionRecordCountProvider.class.getName();
    private static final Map<Dataset, Job> RUNNING_MR_JOBS = Maps.newConcurrentMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: gobblin.compaction.mapreduce.MRCompactor$2, reason: invalid class name */
    /* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$gobblin$compaction$verify$DataCompletenessVerifier$Results$Result$Status = new int[DataCompletenessVerifier.Results.Result.Status.values().length];

        static {
            try {
                $SwitchMap$gobblin$compaction$verify$DataCompletenessVerifier$Results$Result$Status[DataCompletenessVerifier.Results.Result.Status.PASSED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$gobblin$compaction$verify$DataCompletenessVerifier$Results$Result$Status[DataCompletenessVerifier.Results.Result.Status.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:gobblin/compaction/mapreduce/MRCompactor$JobRunnerExecutor.class */
    public class JobRunnerExecutor extends ThreadPoolExecutor {
        public JobRunnerExecutor(int i, int i2, long j, TimeUnit timeUnit, BlockingQueue<Runnable> blockingQueue) {
            super(i, i2, j, timeUnit, blockingQueue);
        }

        @Override // java.util.concurrent.ThreadPoolExecutor
        protected void afterExecute(Runnable runnable, Throwable th) {
            Preconditions.checkArgument(runnable instanceof MRCompactorJobRunner, String.format("Runnable expected to be instance of %s, actual %s", MRCompactorJobRunner.class.getSimpleName(), runnable.getClass().getSimpleName()));
            MRCompactorJobRunner mRCompactorJobRunner = (MRCompactorJobRunner) runnable;
            MRCompactor.this.jobRunnables.remove(mRCompactorJobRunner.getDataset());
            if (th != null) {
                afterExecuteWithThrowable(mRCompactorJobRunner, th);
                return;
            }
            if (mRCompactorJobRunner.status() == MRCompactorJobRunner.Status.COMMITTED) {
                mRCompactorJobRunner.getDataset().setState(Dataset.DatasetState.COMPACTION_COMPLETE);
            } else if (mRCompactorJobRunner.getDataset().state() != Dataset.DatasetState.GIVEN_UP || MRCompactor.this.shouldPublishDataIfCannotVerifyCompl) {
                mRCompactorJobRunner.getDataset().reducePriority();
            } else {
                MRCompactor.LOG.info(String.format("Dataset %s will not be compacted, since data completeness cannot be verified", mRCompactorJobRunner.getDataset()));
                mRCompactorJobRunner.getDataset().setState(Dataset.DatasetState.COMPACTION_COMPLETE);
            }
        }

        private void afterExecuteWithThrowable(MRCompactorJobRunner mRCompactorJobRunner, Throwable th) {
            mRCompactorJobRunner.getDataset().skip(th);
        }
    }

    public MRCompactor(Properties properties) throws IOException {
        this.state.addAll(properties);
        this.conf = HadoopUtils.getConfFromState(this.state);
        this.inputDir = getInputDir();
        this.inputSubDir = getInputSubDir();
        this.inputLateSubDir = getInputLateSubDir();
        this.destDir = getDestDir();
        this.destSubDir = getDestSubDir();
        this.destLateSubDir = getDestLateSubDir();
        this.tmpOutputDir = getTmpOutputDir();
        this.fs = getFileSystem();
        this.datasets = Sets.newHashSet();
        this.jobExecutor = createJobExecutor();
        this.jobRunnables = Maps.newConcurrentMap();
        this.closer = Closer.create();
        this.stopwatch = Stopwatch.createStarted();
        this.gobblinMetrics = initializeMetrics();
        this.eventSubmitter = new EventSubmitter.Builder(GobblinMetrics.get(this.state.getProp("job.name")).getMetricContext(), COMPACTION_TRACKING_EVENTS_NAMESPACE).build();
        this.dataVerifTimeoutMinutes = getDataVerifTimeoutMinutes();
        this.compactionTimeoutMinutes = getCompactionTimeoutMinutes();
        this.shouldVerifDataCompl = shouldVerifyDataCompleteness();
        this.verifier = this.shouldVerifDataCompl ? Optional.of(this.closer.register(new DataCompletenessVerifier(this.state))) : Optional.absent();
        this.shouldPublishDataIfCannotVerifyCompl = shouldPublishDataIfCannotVerifyCompl();
    }

    private String getInputDir() {
        Preconditions.checkArgument(this.state.contains(COMPACTION_INPUT_DIR), "Missing required property compaction.input.dir");
        return this.state.getProp(COMPACTION_INPUT_DIR);
    }

    private String getInputSubDir() {
        return this.state.getProp(COMPACTION_INPUT_SUBDIR, DEFAULT_COMPACTION_INPUT_SUBDIR);
    }

    private String getInputLateSubDir() {
        return this.state.getProp(COMPACTION_INPUT_SUBDIR, DEFAULT_COMPACTION_INPUT_SUBDIR) + COMPACTION_LATE_DIR_SUFFIX;
    }

    private String getDestDir() {
        Preconditions.checkArgument(this.state.contains(COMPACTION_DEST_DIR), "Missing required property compaction.dest.dir");
        return this.state.getProp(COMPACTION_DEST_DIR);
    }

    private String getDestLateSubDir() {
        return this.state.getProp(COMPACTION_DEST_SUBDIR, DEFAULT_COMPACTION_DEST_SUBDIR) + COMPACTION_LATE_DIR_SUFFIX;
    }

    private String getDestSubDir() {
        return this.state.getProp(COMPACTION_DEST_SUBDIR, DEFAULT_COMPACTION_DEST_SUBDIR);
    }

    private String getTmpOutputDir() {
        return this.state.getProp(COMPACTION_TMP_DEST_DIR, DEFAULT_COMPACTION_TMP_DEST_DIR);
    }

    private FileSystem getFileSystem() throws IOException {
        return this.state.contains(COMPACTION_FILE_SYSTEM_URI) ? FileSystem.get(URI.create(this.state.getProp(COMPACTION_FILE_SYSTEM_URI)), this.conf) : FileSystem.get(this.conf);
    }

    private JobRunnerExecutor createJobExecutor() {
        int threadPoolSize = getThreadPoolSize();
        return new JobRunnerExecutor(threadPoolSize, threadPoolSize, DEFAULT_COMPACTION_MR_JOB_TIMEOUT_MINUTES, TimeUnit.NANOSECONDS, new PriorityBlockingQueue());
    }

    private int getThreadPoolSize() {
        return this.state.getPropAsInt(COMPACTION_THREAD_POOL_SIZE, 30);
    }

    private GobblinMetrics initializeMetrics() {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.addAll(Tag.fromMap(TagUtils.getRuntimeTags()));
        GobblinMetrics gobblinMetrics = GobblinMetrics.get(this.state.getProp("job.name"), (MetricContext) null, newArrayList);
        gobblinMetrics.startMetricReporting(this.state.getProperties());
        return gobblinMetrics;
    }

    @Override // gobblin.compaction.Compactor
    public void compact() throws IOException {
        RuntimeException propagate;
        try {
            try {
                copyDependencyJarsToHdfs();
                processTopics(findAllTopics());
                throwExceptionsIfAnyDatasetCompactionFailed();
                try {
                    shutdownExecutors();
                    this.closer.close();
                    deleteDependencyJars();
                    this.gobblinMetrics.stopMetricReporting();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th) {
            try {
                shutdownExecutors();
                this.closer.close();
                deleteDependencyJars();
                this.gobblinMetrics.stopMetricReporting();
                throw th;
            } finally {
            }
        }
    }

    private void copyDependencyJarsToHdfs() throws IOException {
        if (this.state.contains("job.jars")) {
            LocalFileSystem local = FileSystem.getLocal(this.conf);
            Path path = new Path(this.tmpOutputDir, "_gobblin_compaction_jars");
            this.state.setProp(COMPACTION_JARS, path.toString());
            this.fs.delete(path, true);
            Iterator it = this.state.getPropAsList("job.jars").iterator();
            while (it.hasNext()) {
                for (FileStatus fileStatus : local.globStatus(new Path((String) it.next()))) {
                    Path path2 = new Path(this.fs.makeQualified(path), fileStatus.getPath().getName());
                    this.fs.copyFromLocalFile(fileStatus.getPath(), path2);
                    LOG.info(String.format("%s will be added to classpath", path2));
                }
            }
        }
    }

    private void deleteDependencyJars() throws IllegalArgumentException, IOException {
        if (this.state.contains(COMPACTION_JARS)) {
            this.fs.delete(new Path(this.state.getProp(COMPACTION_JARS)), true);
        }
    }

    private List<String> findAllTopics() throws IOException {
        ArrayList newArrayList = Lists.newArrayList();
        List<Pattern> blacklist = getBlacklist();
        List<Pattern> whitelist = getWhitelist();
        for (FileStatus fileStatus : this.fs.listStatus(new Path(this.inputDir))) {
            if (fileStatus.isDir() && DatasetFilterUtils.survived(fileStatus.getPath().getName(), blacklist, whitelist)) {
                LOG.info("found topic: " + fileStatus.getPath().getName());
                newArrayList.add(fileStatus.getPath().getName());
            }
        }
        return newArrayList;
    }

    private List<Pattern> getBlacklist() {
        return DatasetFilterUtils.getPatternsFromStrings(this.state.getPropAsList(COMPACTION_BLACKLIST, ""));
    }

    private List<Pattern> getWhitelist() {
        return DatasetFilterUtils.getPatternsFromStrings(this.state.getPropAsList(COMPACTION_WHITELIST, ""));
    }

    private List<Pattern> getDataComplVerifBlacklist() {
        return DatasetFilterUtils.getPatternsFromStrings(this.state.getPropAsList(COMPACTION_COMPLETENESS_VERIFICATION_BLACKLIST, ""));
    }

    private List<Pattern> getDataComplVerifWhitelist() {
        return DatasetFilterUtils.getPatternsFromStrings(this.state.getPropAsList(COMPACTION_COMPLETENESS_VERIFICATION_WHITELIST, ""));
    }

    private void processTopics(List<String> list) throws IOException {
        createJobPropsForTopics(list);
        processCompactionJobs();
    }

    private void createJobPropsForTopics(List<String> list) {
        List<Pattern> highPriorityTopicPatterns = getHighPriorityTopicPatterns();
        List<Pattern> normalPriorityTopicPatterns = getNormalPriorityTopicPatterns();
        for (String str : list) {
            if (DatasetFilterUtils.stringInPatterns(str, highPriorityTopicPatterns)) {
                createJobPropsForTopic(str, HIGH_PRIORITY);
            } else if (DatasetFilterUtils.stringInPatterns(str, normalPriorityTopicPatterns)) {
                createJobPropsForTopic(str, NORMAL_PRIORITY);
            } else {
                createJobPropsForTopic(str, 1.0d);
            }
        }
    }

    private List<Pattern> getHighPriorityTopicPatterns() {
        return DatasetFilterUtils.getPatternsFromStrings(this.state.getPropAsList(COMPACTION_HIGH_PRIORITY_TOPICS, ""));
    }

    private List<Pattern> getNormalPriorityTopicPatterns() {
        return DatasetFilterUtils.getPatternsFromStrings(this.state.getPropAsList(COMPACTION_NORMAL_PRIORITY_TOPICS, ""));
    }

    private void createJobPropsForTopic(String str, double d) {
        LOG.info("Creating compaction jobs for topic " + str + " with priority " + d);
        MRCompactorJobPropCreator jobPropCreator = getJobPropCreator(str, d);
        try {
            this.datasets.addAll(jobPropCreator.createJobProps());
        } catch (Throwable th) {
            this.datasets.add(jobPropCreator.createFailedJobProps(th));
        }
    }

    MRCompactorJobPropCreator getJobPropCreator(String str, double d) {
        try {
            return ((MRCompactorJobPropCreator.Builder) Class.forName(this.state.getProp(COMPACTION_JOBPROPS_CREATOR_CLASS, DEFAULT_COMPACTION_JOBPROPS_CREATOR_CLASS) + "$Builder").newInstance()).withTopic(str).withPriority(d).withTopicInputDir(new Path(this.inputDir, new Path(str, this.inputSubDir))).withTopicInputLateDir(new Path(this.inputDir, new Path(str, this.inputLateSubDir))).withTopicOutputDir(new Path(this.destDir, new Path(str, this.destSubDir))).withTopicOutputLateDir(new Path(this.destDir, new Path(str, this.destLateSubDir))).withTopicTmpOutputDir(new Path(this.tmpOutputDir, new Path(str, this.destSubDir))).withFileSystem(this.fs).withState(this.state).build();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void processCompactionJobs() throws IOException {
        if (this.shouldVerifDataCompl) {
            verifyDataCompleteness();
        } else {
            setAllDatasetStatesToVerified();
        }
        submitCompactionJobsAndWaitForCompletion();
    }

    private boolean shouldVerifyDataCompleteness() {
        return this.state.getPropAsBoolean(COMPACTION_COMPLETENESS_VERIFICATION_ENABLED, false);
    }

    private void verifyDataCompleteness() {
        List<Pattern> dataComplVerifBlacklist = getDataComplVerifBlacklist();
        List<Pattern> dataComplVerifWhitelist = getDataComplVerifWhitelist();
        int numDatasetsVerifiedTogether = getNumDatasetsVerifiedTogether();
        ArrayList newArrayList = Lists.newArrayList();
        for (Dataset dataset : this.datasets) {
            if (dataset.state() == Dataset.DatasetState.UNVERIFIED) {
                if (shouldVerifyCompletenessForDataset(dataset, dataComplVerifBlacklist, dataComplVerifWhitelist)) {
                    newArrayList.add(dataset);
                    if (newArrayList.size() >= numDatasetsVerifiedTogether) {
                        addCallback(newArrayList, ((DataCompletenessVerifier) this.verifier.get()).verify(newArrayList));
                        newArrayList = Lists.newArrayList();
                    }
                } else {
                    dataset.setState(Dataset.DatasetState.VERIFIED);
                }
            }
        }
        if (newArrayList.isEmpty()) {
            return;
        }
        addCallback(newArrayList, ((DataCompletenessVerifier) this.verifier.get()).verify(newArrayList));
    }

    private boolean shouldVerifyCompletenessForDataset(Dataset dataset, List<Pattern> list, List<Pattern> list2) {
        return !datasetAlreadyCompacted(this.fs, dataset) && DatasetFilterUtils.survived(dataset.topic(), list, list2);
    }

    public static boolean datasetAlreadyCompacted(FileSystem fileSystem, Dataset dataset) {
        Path path = new Path(dataset.outputPath(), COMPACTION_COMPLETE_FILE_NAME);
        try {
            return fileSystem.exists(path);
        } catch (IOException e) {
            LOG.error("Failed to verify the existence of file " + path, e);
            return false;
        }
    }

    public static long readCompactionTimestamp(FileSystem fileSystem, Path path) throws IOException {
        RuntimeException rethrow;
        Closer create = Closer.create();
        try {
            try {
                long readLong = create.register(fileSystem.open(new Path(path, COMPACTION_COMPLETE_FILE_NAME))).readLong();
                create.close();
                return readLong;
            } finally {
            }
        } catch (Throwable th) {
            create.close();
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void addCallback(final List<Dataset> list, ListenableFuture<DataCompletenessVerifier.Results> listenableFuture) {
        Futures.addCallback(listenableFuture, new FutureCallback<DataCompletenessVerifier.Results>() { // from class: gobblin.compaction.mapreduce.MRCompactor.1
            public void onSuccess(DataCompletenessVerifier.Results results) {
                ArrayList newArrayList = Lists.newArrayList();
                Iterator<DataCompletenessVerifier.Results.Result> it = results.iterator();
                while (it.hasNext()) {
                    DataCompletenessVerifier.Results.Result next = it.next();
                    Optional fromNullable = Optional.fromNullable(MRCompactor.this.jobRunnables.get(next.dataset()));
                    switch (AnonymousClass2.$SwitchMap$gobblin$compaction$verify$DataCompletenessVerifier$Results$Result$Status[next.status().ordinal()]) {
                        case MRCompactor.DEFAULT_COMPACTION_OUTPUT_DEDUPLICATED /* 1 */:
                            MRCompactor.LOG.info("Completeness verification for dataset " + next.dataset() + " passed.");
                            MRCompactor.this.submitSlaEvent(next.dataset(), "CompletenessVerified");
                            next.dataset().setState(Dataset.DatasetState.VERIFIED);
                            if (!fromNullable.isPresent()) {
                                break;
                            } else {
                                ((MRCompactorJobRunner) fromNullable.get()).proceed();
                                break;
                            }
                        case 2:
                            if (!MRCompactor.this.shouldGiveUpVerification()) {
                                MRCompactor.LOG.info("Completeness verification for dataset " + next.dataset() + " failed. Will verify again.");
                                newArrayList.add(next.dataset());
                                break;
                            } else {
                                MRCompactor.LOG.info("Completeness verification for dataset " + next.dataset() + " has timed out.");
                                MRCompactor.this.submitSlaEvent(next.dataset(), "CompletenessCannotBeVerified");
                                next.dataset().setState(Dataset.DatasetState.GIVEN_UP);
                                next.dataset().addThrowable(new RuntimeException(String.format("Completeness verification for dataset %s failed or timed out.", next.dataset())));
                                break;
                            }
                    }
                }
                if (newArrayList.isEmpty()) {
                    return;
                }
                MRCompactor.this.addCallback(newArrayList, ((DataCompletenessVerifier) MRCompactor.this.verifier.get()).verify(newArrayList));
            }

            public void onFailure(Throwable th) {
                MRCompactor.LOG.error("Failed to verify completeness for the following datasets: " + list, th);
                if (!MRCompactor.this.shouldGiveUpVerification()) {
                    MRCompactor.this.addCallback(list, ((DataCompletenessVerifier) MRCompactor.this.verifier.get()).verify(list));
                    return;
                }
                for (Dataset dataset : list) {
                    MRCompactor.LOG.warn(String.format("Completeness verification for dataset %s has timed out.", dataset));
                    MRCompactor.this.submitSlaEvent(dataset, "CompletenessCannotBeVerified");
                    dataset.setState(Dataset.DatasetState.GIVEN_UP);
                    dataset.addThrowable(new RuntimeException(String.format("Completeness verification for dataset %s failed or timed out.", dataset)));
                }
            }
        });
    }

    private int getNumDatasetsVerifiedTogether() {
        return this.state.getPropAsInt(COMPACTION_COMPLETENESS_VERIFICATION_NUM_DATASETS_VERIFIED_TOGETHER, 10);
    }

    private void setAllDatasetStatesToVerified() {
        Iterator<Dataset> it = this.datasets.iterator();
        while (it.hasNext()) {
            it.next().compareAndSetState(Dataset.DatasetState.UNVERIFIED, Dataset.DatasetState.VERIFIED);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean shouldGiveUpVerification() {
        return this.stopwatch.elapsed(TimeUnit.MINUTES) >= this.dataVerifTimeoutMinutes;
    }

    private boolean shouldPublishDataIfCannotVerifyCompl() {
        return this.state.getPropAsBoolean(COMPACTION_COMPLETENESS_VERIFICATION_PUBLISH_DATA_IF_CANNOT_VERIFY, false);
    }

    private void submitCompactionJobsAndWaitForCompletion() throws IOException {
        LOG.info("Submitting compaction jobs. Number of datasets: " + this.datasets.size());
        boolean z = false;
        while (!z) {
            z = true;
            for (Dataset dataset : this.datasets) {
                MRCompactorJobRunner mRCompactorJobRunner = this.jobRunnables.get(dataset);
                if (dataset.state() == Dataset.DatasetState.VERIFIED || dataset.state() == Dataset.DatasetState.UNVERIFIED) {
                    z = false;
                    if (mRCompactorJobRunner == null || mRCompactorJobRunner.status() == MRCompactorJobRunner.Status.ABORTED) {
                        runCompactionForDataset(dataset, dataset.state() == Dataset.DatasetState.VERIFIED);
                    }
                } else if (dataset.state() == Dataset.DatasetState.GIVEN_UP) {
                    if (this.shouldPublishDataIfCannotVerifyCompl) {
                        z = false;
                        if (mRCompactorJobRunner == null || mRCompactorJobRunner.status() == MRCompactorJobRunner.Status.ABORTED) {
                            runCompactionForDataset(dataset, true);
                        } else {
                            mRCompactorJobRunner.proceed();
                        }
                    } else if (mRCompactorJobRunner != null) {
                        mRCompactorJobRunner.abort();
                    }
                }
            }
            if (this.stopwatch.elapsed(TimeUnit.MINUTES) >= this.compactionTimeoutMinutes) {
                LOG.error("Compaction timed-out. Killing all running jobs");
                Iterator<MRCompactorJobRunner> it = this.jobRunnables.values().iterator();
                while (it.hasNext()) {
                    it.next().abort();
                }
                return;
            }
            try {
                Thread.sleep(TimeUnit.SECONDS.toMillis(COMPACTION_JOB_WAIT_INTERVAL_SECONDS));
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException("Interrupted while waiting", e);
            }
        }
    }

    private void runCompactionForDataset(Dataset dataset, boolean z) {
        LOG.info("Running compaction for dataset " + dataset);
        try {
            MRCompactorJobRunner mRCompactorJobRunner = getMRCompactorJobRunner(dataset, dataset.priority());
            this.jobRunnables.put(dataset, mRCompactorJobRunner);
            if (z) {
                mRCompactorJobRunner.proceed();
            }
            this.jobExecutor.execute(mRCompactorJobRunner);
        } catch (Throwable th) {
            dataset.skip(th);
        }
    }

    private MRCompactorJobRunner getMRCompactorJobRunner(Dataset dataset, double d) {
        try {
            return (MRCompactorJobRunner) Class.forName(this.state.getProp(COMPACTION_JOB_RUNNER_CLASS, DEFAULT_COMPACTION_JOB_RUNNER_CLASS)).getDeclaredConstructor(Dataset.class, FileSystem.class, Double.class).newInstance(dataset, this.fs, Double.valueOf(d));
        } catch (Exception e) {
            throw new RuntimeException("Cannot instantiate MRCompactorJobRunner", e);
        }
    }

    public static void addRunningHadoopJob(Dataset dataset, Job job) {
        RUNNING_MR_JOBS.put(dataset, job);
    }

    private long getCompactionTimeoutMinutes() {
        return this.state.getPropAsLong(COMPACTION_MR_JOB_TIMEOUT_MINUTES, DEFAULT_COMPACTION_MR_JOB_TIMEOUT_MINUTES);
    }

    private long getDataVerifTimeoutMinutes() {
        return this.state.getPropAsLong(COMPACTION_VERIFICATION_TIMEOUT_MINUTES, 30L);
    }

    private void throwExceptionsIfAnyDatasetCompactionFailed() {
        int i = 0;
        for (Dataset dataset : getDatasetsWithThrowables()) {
            i++;
            Iterator<Throwable> it = dataset.throwables().iterator();
            while (it.hasNext()) {
                LOG.error("Error processing dataset " + dataset, it.next());
                submitSlaEvent(dataset, "CompactionFailed");
            }
        }
        if (i > 0) {
            throw new RuntimeException(String.format("Failed to process %d datasets.", Integer.valueOf(i)));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void submitSlaEvent(Dataset dataset, String str) {
        CompactionSlaEventHelper.populateState(dataset, Optional.absent(), this.fs);
        new SlaEventSubmitter(this.eventSubmitter, str, dataset.jobProps().getProperties()).submit();
    }

    private Set<Dataset> getDatasetsWithThrowables() {
        HashSet newHashSet = Sets.newHashSet();
        for (Dataset dataset : this.datasets) {
            if (!dataset.throwables().isEmpty()) {
                newHashSet.add(dataset);
            }
        }
        return newHashSet;
    }

    private void shutdownExecutors() throws IOException {
        LOG.info("Shutting down Executors");
        ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG));
    }

    @Override // gobblin.compaction.Compactor
    public void cancel() throws IOException {
        try {
            for (Map.Entry<Dataset, Job> entry : RUNNING_MR_JOBS.entrySet()) {
                Job value = entry.getValue();
                if (!value.isComplete()) {
                    LOG.info(String.format("Killing hadoop job %s for dataset %s", value.getJobID(), entry.getKey()));
                    value.killJob();
                }
            }
            try {
                ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG), 0L, TimeUnit.NANOSECONDS);
                ((DataCompletenessVerifier) this.verifier.get()).closeNow();
            } finally {
            }
        } catch (Throwable th) {
            try {
                ExecutorsUtils.shutdownExecutorService(this.jobExecutor, Optional.of(LOG), 0L, TimeUnit.NANOSECONDS);
                ((DataCompletenessVerifier) this.verifier.get()).closeNow();
                throw th;
            } finally {
            }
        }
    }
}
