package org.apache.crunch.impl.spark;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.crunch.CombineFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PipelineCallable;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Target;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.spark.fn.MapFunction;
import org.apache.crunch.impl.spark.fn.OutputConverterFunction;
import org.apache.crunch.impl.spark.fn.PairMapFunction;
import org.apache.crunch.io.CrunchOutputs;
import org.apache.crunch.io.MapReduceTarget;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.Converter;
import org.apache.crunch.types.PType;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.CounterGroup;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.spark.Accumulator;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/crunch/impl/spark/SparkRuntime.class */
public class SparkRuntime extends AbstractFuture<PipelineResult> implements PipelineExecution {
    private SparkPipeline pipeline;
    private JavaSparkContext sparkContext;
    private Configuration conf;
    private CombineFn combineFn;
    private SparkRuntimeContext ctxt;
    private Accumulator<Map<String, Map<String, Long>>> counters;
    private Map<PCollectionImpl<?>, MaterializableIterable> toMaterialize;
    private Map<PCollection<?>, StorageLevel> toCache;
    private Map<PipelineCallable<?>, Set<Target>> allPipelineCallables;
    private Set<PipelineCallable<?>> activePipelineCallables;
    private boolean started;
    private Thread monitorThread;
    private static final Logger LOG = LoggerFactory.getLogger(SparkRuntime.class);
    static final Comparator<PCollectionImpl<?>> DEPTH_COMPARATOR = new Comparator<PCollectionImpl<?>>() { // from class: org.apache.crunch.impl.spark.SparkRuntime.1
        @Override // java.util.Comparator
        public int compare(PCollectionImpl<?> pCollectionImpl, PCollectionImpl<?> pCollectionImpl2) {
            int depth = pCollectionImpl.getDepth() - pCollectionImpl2.getDepth();
            if (depth == 0) {
                depth = new Integer(pCollectionImpl.hashCode()).compareTo(Integer.valueOf(pCollectionImpl2.hashCode()));
            }
            return depth;
        }
    };
    private final CountDownLatch doneSignal = new CountDownLatch(1);
    private AtomicReference<PipelineExecution.Status> status = new AtomicReference<>(PipelineExecution.Status.READY);
    private Map<PCollectionImpl<?>, Set<Target>> outputTargets = Maps.newTreeMap(DEPTH_COMPARATOR);

    /* JADX WARN: Multi-variable type inference failed */
    public SparkRuntime(SparkPipeline sparkPipeline, JavaSparkContext javaSparkContext, Configuration configuration, Map<PCollectionImpl<?>, Set<Target>> map, Map<PCollectionImpl<?>, MaterializableIterable> map2, Map<PCollection<?>, StorageLevel> map3, Map<PipelineCallable<?>, Set<Target>> map4) {
        this.pipeline = sparkPipeline;
        this.sparkContext = javaSparkContext;
        this.conf = configuration;
        this.counters = javaSparkContext.accumulator(Maps.newHashMap(), sparkPipeline.getName(), new CounterAccumulatorParam());
        this.ctxt = new SparkRuntimeContext(javaSparkContext.appName(), this.counters, javaSparkContext.broadcast(WritableUtils.toByteArray(new Writable[]{configuration})));
        this.outputTargets.putAll(map);
        this.toMaterialize = map2;
        this.toCache = map3;
        this.allPipelineCallables = map4;
        this.activePipelineCallables = map4.keySet();
        this.status.set(PipelineExecution.Status.READY);
        this.monitorThread = new Thread(new Runnable() { // from class: org.apache.crunch.impl.spark.SparkRuntime.2
            @Override // java.lang.Runnable
            public void run() {
                SparkRuntime.this.monitorLoop();
            }
        });
    }

    public void setCombineFn(CombineFn combineFn) {
        this.combineFn = combineFn;
    }

    public CombineFn getCombineFn() {
        CombineFn combineFn = this.combineFn;
        this.combineFn = null;
        return combineFn;
    }

    private void distributeFiles() {
        try {
            URI[] cacheFiles = DistributedCache.getCacheFiles(this.conf);
            if (cacheFiles != null) {
                URI[] uriArr = new URI[cacheFiles.length];
                for (int i = 0; i < cacheFiles.length; i++) {
                    Path path = new Path(cacheFiles[i]);
                    FileSystem fileSystem = path.getFileSystem(this.conf);
                    if (fileSystem.isFile(path)) {
                        uriArr[i] = cacheFiles[i];
                    } else {
                        Path path2 = new Path(path.getParent(), "sparkreadable-" + path.getName());
                        FileUtil.copyMerge(fileSystem, path, fileSystem, path2, false, this.conf, "");
                        uriArr[i] = path2.toUri();
                    }
                    this.sparkContext.addFile(uriArr[i].toString());
                }
                DistributedCache.setCacheFiles(uriArr, this.conf);
            }
        } catch (IOException e) {
            throw new RuntimeException("Error retrieving cache files", e);
        }
    }

    public synchronized SparkRuntime execute() {
        if (!this.started) {
            this.monitorThread.start();
            this.started = true;
        }
        return this;
    }

    public JavaSparkContext getSparkContext() {
        return this.sparkContext;
    }

    public SparkRuntimeContext getRuntimeContext() {
        return this.ctxt;
    }

    public Configuration getConfiguration() {
        return this.conf;
    }

    public boolean isValid(JavaRDDLike<?, ?> javaRDDLike) {
        return javaRDDLike != null;
    }

    public StorageLevel getStorageLevel(PCollection<?> pCollection) {
        return this.toCache.get(pCollection);
    }

    public String getPlanDotFile() {
        return "";
    }

    public Map<String, String> getNamedDotFiles() {
        return ImmutableMap.of("", "");
    }

    public void waitFor(long j, TimeUnit timeUnit) throws InterruptedException {
        this.doneSignal.await(j, timeUnit);
    }

    public void waitUntilDone() throws InterruptedException {
        this.doneSignal.await();
    }

    private void runCallables(Set<Target> set) {
        Set<PipelineCallable<?>> set2 = this.activePipelineCallables;
        this.activePipelineCallables = Sets.newHashSet();
        ArrayList newArrayList = Lists.newArrayList();
        ArrayList<PipelineCallable> newArrayList2 = Lists.newArrayList();
        for (PipelineCallable<?> pipelineCallable : set2) {
            if (!Sets.intersection(this.allPipelineCallables.get(pipelineCallable), set).isEmpty()) {
                this.activePipelineCallables.add(pipelineCallable);
            } else if (pipelineCallable.runSingleThreaded()) {
                try {
                    if (pipelineCallable.call() != PipelineCallable.Status.SUCCESS) {
                        newArrayList2.add(pipelineCallable);
                    }
                } catch (Throwable th) {
                    pipelineCallable.setMessage(th.getLocalizedMessage());
                    newArrayList2.add(pipelineCallable);
                }
            } else {
                newArrayList.add(pipelineCallable);
            }
        }
        ListeningExecutorService listeningDecorator = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
        try {
            try {
                List invokeAll = listeningDecorator.invokeAll(newArrayList);
                for (int i = 0; i < invokeAll.size(); i++) {
                    if (((Future) invokeAll.get(i)).get() != PipelineCallable.Status.SUCCESS) {
                        newArrayList2.add((PipelineCallable) newArrayList.get(i));
                    }
                }
            } catch (Throwable th2) {
                th2.printStackTrace();
                newArrayList2.addAll(newArrayList);
                listeningDecorator.shutdownNow();
            }
            if (newArrayList2.isEmpty()) {
                return;
            }
            LOG.error("{} callable failure(s) occurred:", Integer.valueOf(newArrayList2.size()));
            for (PipelineCallable pipelineCallable2 : newArrayList2) {
                LOG.error("{} : {}", pipelineCallable2.getName(), pipelineCallable2.getMessage());
            }
            this.status.set(PipelineExecution.Status.FAILED);
            set(PipelineResult.EMPTY);
            this.doneSignal.countDown();
        } finally {
            listeningDecorator.shutdownNow();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void monitorLoop() {
        this.status.set(PipelineExecution.Status.RUNNING);
        long currentTimeMillis = System.currentTimeMillis();
        TreeMap newTreeMap = Maps.newTreeMap(DEPTH_COMPARATOR);
        Set<Target> newHashSet = Sets.newHashSet();
        for (PCollectionImpl<?> pCollectionImpl : this.outputTargets.keySet()) {
            newTreeMap.put(pCollectionImpl, pCollectionImpl.getTargetDependencies());
            newHashSet.addAll(this.outputTargets.get(pCollectionImpl));
        }
        runCallables(newHashSet);
        while (!newTreeMap.isEmpty() && this.doneSignal.getCount() > 0) {
            HashSet newHashSet2 = Sets.newHashSet();
            Iterator it = newTreeMap.keySet().iterator();
            while (it.hasNext()) {
                newHashSet2.addAll(this.outputTargets.get((PCollectionImpl) it.next()));
            }
            TreeMap newTreeMap2 = Maps.newTreeMap(DEPTH_COMPARATOR);
            for (SparkCollection sparkCollection : newTreeMap.keySet()) {
                if (Sets.intersection(newHashSet2, (Set) newTreeMap.get(sparkCollection)).isEmpty()) {
                    newTreeMap2.put(sparkCollection, sparkCollection.getJavaRDDLike(this));
                }
            }
            distributeFiles();
            for (Map.Entry entry : newTreeMap2.entrySet()) {
                JavaRDD javaRDD = (JavaRDDLike) entry.getValue();
                PType pType = ((PCollectionImpl) entry.getKey()).getPType();
                Set<Target> set = this.outputTargets.get(entry.getKey());
                if (set.size() > 1) {
                    javaRDD.rdd().cache();
                }
                Iterator<Target> it2 = set.iterator();
                while (it2.hasNext()) {
                    PathTarget pathTarget = (Target) it2.next();
                    Writable configuration = new Configuration(getConfiguration());
                    getRuntimeContext().setConf(this.sparkContext.broadcast(WritableUtils.toByteArray(new Writable[]{configuration})));
                    if (pathTarget instanceof MapReduceTarget) {
                        Converter converter = pathTarget.getConverter(pType);
                        MapFn identityFn = IdentityFn.getInstance();
                        JavaPairRDD mapToPair = javaRDD instanceof JavaRDD ? javaRDD.map(new MapFunction(converter.applyPTypeTransforms() ? pType.getOutputMapFn() : identityFn, this.ctxt)).mapToPair(new OutputConverterFunction(converter)) : ((JavaPairRDD) javaRDD).map(new PairMapFunction(converter.applyPTypeTransforms() ? pType.getOutputMapFn() : identityFn, this.ctxt)).mapToPair(new OutputConverterFunction(converter));
                        try {
                            Job job = new Job(configuration);
                            if (pathTarget instanceof PathTarget) {
                                PathTarget pathTarget2 = pathTarget;
                                pathTarget2.configureForMapReduce(job, pType, pathTarget2.getPath(), "out0");
                                CrunchOutputs.OutputConfig outputConfig = (CrunchOutputs.OutputConfig) CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
                                job.setOutputFormatClass(outputConfig.bundle.getFormatClass());
                                job.setOutputKeyClass(outputConfig.keyClass);
                                job.setOutputValueClass(outputConfig.valueClass);
                                outputConfig.bundle.configure(job.getConfiguration());
                                job.getConfiguration().set("crunch.namedoutput", "out0");
                                Path createTempPath = this.pipeline.createTempPath();
                                mapToPair.saveAsNewAPIHadoopFile(createTempPath.toString(), converter.getKeyClass(), converter.getValueClass(), job.getOutputFormatClass(), job.getConfiguration());
                                pathTarget2.handleOutputs(job.getConfiguration(), createTempPath, -1);
                            } else {
                                ((MapReduceTarget) pathTarget).configureForMapReduce(job, pType, this.pipeline.createTempPath(), "out0");
                                CrunchOutputs.OutputConfig outputConfig2 = (CrunchOutputs.OutputConfig) CrunchOutputs.getNamedOutputs(job.getConfiguration()).get("out0");
                                job.setOutputFormatClass(outputConfig2.bundle.getFormatClass());
                                job.setOutputKeyClass(outputConfig2.keyClass);
                                job.setOutputValueClass(outputConfig2.valueClass);
                                mapToPair.saveAsHadoopDataset(new JobConf(job.getConfiguration()));
                            }
                        } catch (Exception e) {
                            LOG.error("Spark Exception", e);
                            this.status.set(PipelineExecution.Status.FAILED);
                            set(PipelineResult.EMPTY);
                            this.doneSignal.countDown();
                        }
                    }
                }
                newHashSet.removeAll(set);
            }
            if (this.status.get() == PipelineExecution.Status.RUNNING) {
                for (PCollectionImpl pCollectionImpl2 : newTreeMap2.keySet()) {
                    if (this.toMaterialize.containsKey(pCollectionImpl2)) {
                        MaterializableIterable materializableIterable = this.toMaterialize.get(pCollectionImpl2);
                        if (materializableIterable.isSourceTarget()) {
                            pCollectionImpl2.materializeAt(materializableIterable.getSource());
                        }
                    }
                    newTreeMap.remove(pCollectionImpl2);
                }
            }
            runCallables(newHashSet);
        }
        if (this.status.get() == PipelineExecution.Status.FAILED && this.status.get() == PipelineExecution.Status.KILLED) {
            set(PipelineResult.EMPTY);
        } else {
            this.status.set(PipelineExecution.Status.SUCCEEDED);
            set(new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("Spark", getCounters(), currentTimeMillis, System.currentTimeMillis())), PipelineExecution.Status.SUCCEEDED));
        }
        this.doneSignal.countDown();
    }

    private Counters getCounters() {
        Counters counters = new Counters();
        for (Map.Entry entry : ((Map) this.counters.value()).entrySet()) {
            CounterGroup group = counters.getGroup((String) entry.getKey());
            for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                group.findCounter((String) entry2.getKey()).setValue(((Long) entry2.getValue()).longValue());
            }
        }
        return counters;
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public PipelineResult m5get() throws InterruptedException, ExecutionException {
        if (getStatus() == PipelineExecution.Status.READY) {
            execute();
        }
        return (PipelineResult) super.get();
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public PipelineResult m6get(long j, TimeUnit timeUnit) throws InterruptedException, TimeoutException, ExecutionException {
        if (getStatus() == PipelineExecution.Status.READY) {
            execute();
        }
        return (PipelineResult) super.get(j, timeUnit);
    }

    public PipelineExecution.Status getStatus() {
        return this.status.get();
    }

    public PipelineResult getResult() {
        try {
            return m5get();
        } catch (Exception e) {
            LOG.error("Exception retrieving PipelineResult, returning EMPTY", e);
            return PipelineResult.EMPTY;
        }
    }

    public void kill() throws InterruptedException {
        if (this.started) {
            this.sparkContext.stop();
            set(PipelineResult.EMPTY);
        }
    }
}
