package org.apache.crunch.impl.mr;

import com.google.common.base.Charsets;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URLEncoder;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.PCollection;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.impl.dist.DistributedPipeline;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.collect.MRCollectionFactory;
import org.apache.crunch.impl.mr.exec.MRExecutor;
import org.apache.crunch.impl.mr.plan.MSCRPlanner;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/crunch-core-0.12.0.jar:org/apache/crunch/impl/mr/MRPipeline.class */
public class MRPipeline extends DistributedPipeline {
    private static final Logger LOG = LoggerFactory.getLogger(MRPipeline.class);
    private final Class<?> jarClass;

    public MRPipeline(Class<?> cls) {
        this(cls, new Configuration());
    }

    public MRPipeline(Class<?> cls, String str) {
        this(cls, str, new Configuration());
    }

    public MRPipeline(Class<?> cls, Configuration configuration) {
        this(cls, cls.getName(), configuration);
    }

    public MRPipeline(Class<?> cls, String str, Configuration configuration) {
        super(str, configuration, new MRCollectionFactory());
        this.jarClass = cls;
    }

    public MRExecutor plan() {
        HashMap newHashMap = Maps.newHashMap();
        for (PCollectionImpl<?> pCollectionImpl : this.outputTargets.keySet()) {
            if (this.outputTargetsToMaterialize.containsKey(pCollectionImpl)) {
                newHashMap.put(pCollectionImpl, this.outputTargetsToMaterialize.get(pCollectionImpl));
                this.outputTargetsToMaterialize.remove(pCollectionImpl);
            }
        }
        try {
            return new MSCRPlanner(this, this.outputTargets, newHashMap, this.appendedTargets, this.allPipelineCallables).plan(this.jarClass, getConfiguration());
        } catch (IOException e) {
            throw new CrunchRuntimeException(e);
        }
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineResult run() {
        try {
            MRPipelineExecution runAsync = runAsync();
            runAsync.waitUntilDone();
            return runAsync.getResult();
        } catch (InterruptedException e) {
            LOG.error("Exception running pipeline", (Throwable) e);
            return PipelineResult.EMPTY;
        }
    }

    @Override // org.apache.crunch.Pipeline
    public MRPipelineExecution runAsync() {
        MRExecutor plan = plan();
        for (Map.Entry<String, String> entry : plan.getNamedDotFiles().entrySet()) {
            writePlanDotFile(entry.getKey(), entry.getValue());
        }
        MRPipelineExecution execute = plan.execute();
        this.outputTargets.clear();
        return execute;
    }

    @Override // org.apache.crunch.Pipeline
    public <T> Iterable<T> materialize(PCollection<T> pCollection) {
        ((PCollectionImpl) pCollection).setBreakpoint();
        MaterializableIterable<?> materializableIterable = new MaterializableIterable<>(this, getMaterializeSourceTarget(pCollection));
        if (!this.outputTargetsToMaterialize.containsKey(pCollection)) {
            this.outputTargetsToMaterialize.put((PCollectionImpl) pCollection, materializableIterable);
        }
        return materializableIterable;
    }

    @Override // org.apache.crunch.Pipeline
    public <T> void cache(PCollection<T> pCollection, CachingOptions cachingOptions) {
        materialize(pCollection);
    }

    private void writePlanDotFile(String str, String str2) {
        String str3 = getConfiguration().get(PlanningParameters.PIPELINE_DOTFILE_OUTPUT_DIR);
        if (str3 != null) {
            FSDataOutputStream fSDataOutputStream = null;
            try {
                try {
                    URI uri = new URI(str3);
                    FileSystem fileSystem = FileSystem.get(uri, getConfiguration());
                    String format = String.format("_%s_%s.dot", new SimpleDateFormat("yyyy-MM-dd_HH.mm.ss.SSS").format(new Date()), str);
                    String encode = URLEncoder.encode(getName(), "UTF-8");
                    Path path = new Path(uri.getPath(), encode.substring(0, Math.min(150, encode.length())) + format);
                    LOG.info("Writing jobplan to {}", path);
                    fSDataOutputStream = fileSystem.create(path, true);
                    fSDataOutputStream.write(str2.getBytes(Charsets.UTF_8));
                    if (fSDataOutputStream != null) {
                        try {
                            fSDataOutputStream.close();
                        } catch (IOException e) {
                            if (0 == 0) {
                                throw new CrunchRuntimeException("Error closing dotfile", e);
                            }
                        }
                    }
                } catch (IOException e2) {
                    throw new CrunchRuntimeException("Error writing dotfile contents to " + str3, e2);
                } catch (RuntimeException e3) {
                    throw e3;
                } catch (URISyntaxException e4) {
                    throw new CrunchRuntimeException("Invalid dot file dir URI, job plan will not be written: " + str3, e4);
                }
            } catch (Throwable th) {
                if (fSDataOutputStream != null) {
                    try {
                        fSDataOutputStream.close();
                    } catch (IOException e5) {
                        if (0 == 0) {
                            throw new CrunchRuntimeException("Error closing dotfile", e5);
                        }
                    }
                }
                throw th;
            }
        }
    }
}
