package org.apache.crunch.impl.mr.exec;

import com.google.common.base.Function;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.Target;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchControlledJob;
import org.apache.crunch.hadoop.mapreduce.lib.jobcontrol.CrunchJobControl;
import org.apache.crunch.impl.mr.MRJob;
import org.apache.crunch.impl.mr.MRPipelineExecution;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:org/apache/crunch/impl/mr/exec/MRExecutor.class */
public class MRExecutor implements MRPipelineExecution {
    private static final Log LOG = LogFactory.getLog(MRExecutor.class);
    private final CrunchJobControl control;
    private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
    private final Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
    private final CappedExponentialCounter pollInterval;
    private PipelineResult result;
    private String planDotFile;
    private final CountDownLatch doneSignal = new CountDownLatch(1);
    private final CountDownLatch killSignal = new CountDownLatch(1);
    private AtomicReference<PipelineExecution.Status> status = new AtomicReference<>(PipelineExecution.Status.READY);
    private Thread monitorThread = new Thread(new Runnable() { // from class: org.apache.crunch.impl.mr.exec.MRExecutor.1
        @Override // java.lang.Runnable
        public void run() {
            MRExecutor.this.monitorLoop();
        }
    });

    public MRExecutor(Class<?> cls, Map<PCollectionImpl<?>, Set<Target>> map, Map<PCollectionImpl<?>, MaterializableIterable> map2) {
        this.control = new CrunchJobControl(cls.toString());
        this.outputTargets = map;
        this.toMaterialize = map2;
        this.pollInterval = isLocalMode() ? new CappedExponentialCounter(50L, 1000L) : new CappedExponentialCounter(500L, 10000L);
    }

    public void addJob(CrunchControlledJob crunchControlledJob) {
        this.control.addJob(crunchControlledJob);
    }

    public void setPlanDotFile(String str) {
        this.planDotFile = str;
    }

    public MRPipelineExecution execute() {
        this.monitorThread.start();
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void monitorLoop() {
        while (this.killSignal.getCount() > 0 && !this.control.allFinished()) {
            try {
                try {
                    this.control.pollJobStatusAndStartNewOnes();
                    this.killSignal.await(this.pollInterval.get(), TimeUnit.MILLISECONDS);
                } catch (IOException e) {
                    LOG.error("Pipeline failed due to exception", e);
                    this.status.set(PipelineExecution.Status.FAILED);
                    this.doneSignal.countDown();
                    return;
                } catch (InterruptedException e2) {
                    throw new AssertionError(e2);
                }
            } catch (Throwable th) {
                this.doneSignal.countDown();
                throw th;
            }
        }
        this.control.killAllRunningJobs();
        List<CrunchControlledJob> failedJobList = this.control.getFailedJobList();
        if (!failedJobList.isEmpty()) {
            System.err.println(failedJobList.size() + " job failure(s) occurred:");
            for (CrunchControlledJob crunchControlledJob : failedJobList) {
                System.err.println(crunchControlledJob.getJobName() + "(" + crunchControlledJob.getJobID() + "): " + crunchControlledJob.getMessage());
            }
        }
        ArrayList newArrayList = Lists.newArrayList();
        for (CrunchControlledJob crunchControlledJob2 : this.control.getSuccessfulJobList()) {
            newArrayList.add(new PipelineResult.StageResult(crunchControlledJob2.getJobName(), crunchControlledJob2.getJob().getCounters()));
        }
        for (PCollectionImpl<?> pCollectionImpl : this.outputTargets.keySet()) {
            if (this.toMaterialize.containsKey(pCollectionImpl)) {
                MaterializableIterable materializableIterable = this.toMaterialize.get(pCollectionImpl);
                if (materializableIterable.isSourceTarget()) {
                    materializableIterable.materialize();
                    pCollectionImpl.materializeAt((SourceTarget) materializableIterable.getSource());
                }
            } else {
                boolean z = false;
                for (Target target : this.outputTargets.get(pCollectionImpl)) {
                    if (!z) {
                        if (target instanceof SourceTarget) {
                            pCollectionImpl.materializeAt((SourceTarget) target);
                            z = true;
                        } else {
                            SourceTarget<?> asSourceTarget = target.asSourceTarget(pCollectionImpl.getPType());
                            if (asSourceTarget != null) {
                                pCollectionImpl.materializeAt(asSourceTarget);
                                z = true;
                            }
                        }
                    }
                }
            }
        }
        synchronized (this) {
            if (this.killSignal.getCount() == 0) {
                this.status.set(PipelineExecution.Status.KILLED);
            } else if (failedJobList.isEmpty()) {
                this.status.set(PipelineExecution.Status.SUCCEEDED);
            } else {
                this.status.set(PipelineExecution.Status.FAILED);
            }
            this.result = new PipelineResult(newArrayList, this.status.get());
        }
        this.doneSignal.countDown();
    }

    @Override // org.apache.crunch.PipelineExecution
    public String getPlanDotFile() {
        return this.planDotFile;
    }

    @Override // org.apache.crunch.PipelineExecution
    public void waitFor(long j, TimeUnit timeUnit) throws InterruptedException {
        this.doneSignal.await(j, timeUnit);
    }

    @Override // org.apache.crunch.PipelineExecution
    public void waitUntilDone() throws InterruptedException {
        this.doneSignal.await();
    }

    @Override // org.apache.crunch.PipelineExecution
    public synchronized PipelineExecution.Status getStatus() {
        return this.status.get();
    }

    @Override // org.apache.crunch.PipelineExecution
    public synchronized PipelineResult getResult() {
        return this.result;
    }

    @Override // org.apache.crunch.PipelineExecution
    public void kill() throws InterruptedException {
        this.killSignal.countDown();
    }

    private static boolean isLocalMode() {
        Configuration configuration = new Configuration();
        return "local".equals(configuration.get("mapreduce.jobtracker.address", configuration.get("mapred.job.tracker", "local")));
    }

    @Override // org.apache.crunch.impl.mr.MRPipelineExecution
    public List<MRJob> getJobs() {
        return Lists.transform(this.control.getAllJobs(), new Function<CrunchControlledJob, MRJob>() { // from class: org.apache.crunch.impl.mr.exec.MRExecutor.2
            public MRJob apply(CrunchControlledJob crunchControlledJob) {
                return crunchControlledJob;
            }
        });
    }
}
