package org.apache.crunch.impl.mr;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.mr.collect.InputCollection;
import org.apache.crunch.impl.mr.collect.InputTable;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.collect.PGroupedTableImpl;
import org.apache.crunch.impl.mr.collect.UnionCollection;
import org.apache.crunch.impl.mr.collect.UnionTable;
import org.apache.crunch.impl.mr.plan.MSCRPlanner;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.At;
import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.WritableTypeFamily;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Appender;
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

/* loaded from: input_file:lib/crunch-0.3.0-incubating.jar:org/apache/crunch/impl/mr/MRPipeline.class */
public class MRPipeline implements Pipeline {
    private static final Log LOG = LogFactory.getLog(MRPipeline.class);
    private static final Random RANDOM = new Random();
    private final Class<?> jarClass;
    private final String name;
    private final Map<PCollectionImpl<?>, Set<Target>> outputTargets;
    private final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize;
    private Path tempDirectory;
    private int tempFileIndex;
    private int nextAnonymousStageId;
    private Configuration conf;

    public MRPipeline(Class<?> cls) throws IOException {
        this(cls, new Configuration());
    }

    public MRPipeline(Class<?> cls, String str) {
        this(cls, str, new Configuration());
    }

    public MRPipeline(Class<?> cls, Configuration configuration) {
        this(cls, cls.getName(), configuration);
    }

    public MRPipeline(Class<?> cls, String str, Configuration configuration) {
        this.jarClass = cls;
        this.name = str;
        this.outputTargets = Maps.newHashMap();
        this.outputTargetsToMaterialize = Maps.newHashMap();
        this.conf = configuration;
        this.tempDirectory = createTempDirectory(configuration);
        this.tempFileIndex = 0;
        this.nextAnonymousStageId = 0;
    }

    @Override // org.apache.crunch.Pipeline
    public Configuration getConfiguration() {
        return this.conf;
    }

    @Override // org.apache.crunch.Pipeline
    public void setConfiguration(Configuration configuration) {
        this.conf = configuration;
        this.tempDirectory = createTempDirectory(configuration);
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineResult run() {
        try {
            PipelineResult execute = new MSCRPlanner(this, this.outputTargets).plan(this.jarClass, this.conf).execute();
            for (PCollectionImpl<?> pCollectionImpl : this.outputTargets.keySet()) {
                if (this.outputTargetsToMaterialize.containsKey(pCollectionImpl)) {
                    MaterializableIterable<?> materializableIterable = this.outputTargetsToMaterialize.get(pCollectionImpl);
                    materializableIterable.materialize();
                    pCollectionImpl.materializeAt(materializableIterable.getSourceTarget());
                    this.outputTargetsToMaterialize.remove(pCollectionImpl);
                } else {
                    boolean z = false;
                    for (Target target : this.outputTargets.get(pCollectionImpl)) {
                        if (!z && (target instanceof Source)) {
                            pCollectionImpl.materializeAt((SourceTarget) target);
                            z = true;
                        }
                    }
                }
            }
            this.outputTargets.clear();
            return execute;
        } catch (IOException e) {
            LOG.error(e);
            return PipelineResult.EMPTY;
        }
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineResult done() {
        PipelineResult pipelineResult = null;
        if (!this.outputTargets.isEmpty()) {
            pipelineResult = run();
        }
        cleanup();
        return pipelineResult;
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> read(Source<S> source) {
        return new InputCollection(source, this);
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
        return new InputTable(tableSource, this);
    }

    @Override // org.apache.crunch.Pipeline
    public PCollection<String> readTextFile(String str) {
        return read(At.textFile(str));
    }

    @Override // org.apache.crunch.Pipeline
    public void write(PCollection<?> pCollection, Target target) {
        if (pCollection instanceof PGroupedTableImpl) {
            pCollection = ((PGroupedTableImpl) pCollection).ungroup();
        } else if ((pCollection instanceof UnionCollection) || (pCollection instanceof UnionTable)) {
            pCollection = pCollection.parallelDo("UnionCollectionWrapper", IdentityFn.getInstance(), pCollection.getPType());
        }
        addOutput((PCollectionImpl) pCollection, target);
    }

    private void addOutput(PCollectionImpl<?> pCollectionImpl, Target target) {
        if (!this.outputTargets.containsKey(pCollectionImpl)) {
            this.outputTargets.put(pCollectionImpl, Sets.newHashSet());
        }
        this.outputTargets.get(pCollectionImpl).add(target);
    }

    @Override // org.apache.crunch.Pipeline
    public <T> Iterable<T> materialize(PCollection<T> pCollection) {
        PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pCollection);
        MaterializableIterable<?> materializableIterable = new MaterializableIterable<>(this, getMaterializeSourceTarget(pcollectionImpl));
        if (!this.outputTargetsToMaterialize.containsKey(pcollectionImpl)) {
            this.outputTargetsToMaterialize.put(pcollectionImpl, materializableIterable);
        }
        return materializableIterable;
    }

    public <T> ReadableSourceTarget<T> getMaterializeSourceTarget(PCollection<T> pCollection) {
        PCollectionImpl<T> pcollectionImpl = toPcollectionImpl(pCollection);
        SourceTarget<T> materializedAt = pcollectionImpl.getMaterializedAt();
        if (materializedAt != null && (materializedAt instanceof ReadableSourceTarget)) {
            return (ReadableSourceTarget) materializedAt;
        }
        ReadableSourceTarget<T> readableSourceTarget = null;
        if (this.outputTargets.containsKey(pCollection)) {
            Iterator<Target> it = this.outputTargets.get(pcollectionImpl).iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                Target next = it.next();
                if (next instanceof ReadableSourceTarget) {
                    readableSourceTarget = (ReadableSourceTarget) next;
                    break;
                }
            }
        }
        if (readableSourceTarget == null) {
            SourceTarget<T> createIntermediateOutput = createIntermediateOutput(pCollection.getPType());
            if (!(createIntermediateOutput instanceof ReadableSourceTarget)) {
                throw new IllegalArgumentException("The PType for the given PCollection is not readable and cannot be materialized");
            }
            readableSourceTarget = (ReadableSourceTarget) createIntermediateOutput;
            addOutput(pcollectionImpl, readableSourceTarget);
        }
        return readableSourceTarget;
    }

    private <T> PCollectionImpl<T> toPcollectionImpl(PCollection<T> pCollection) {
        return pCollection instanceof UnionCollection ? (PCollectionImpl) pCollection.parallelDo("UnionCollectionWrapper", IdentityFn.getInstance(), pCollection.getPType()) : (PCollectionImpl) pCollection;
    }

    public <T> SourceTarget<T> createIntermediateOutput(PType<T> pType) {
        return pType.getDefaultFileSource(createTempPath());
    }

    public Path createTempPath() {
        this.tempFileIndex++;
        return new Path(this.tempDirectory, "p" + this.tempFileIndex);
    }

    private static Path createTempDirectory(Configuration configuration) {
        Path createTemporaryPath = createTemporaryPath(configuration);
        try {
            createTemporaryPath.getFileSystem(configuration).mkdirs(createTemporaryPath);
            return createTemporaryPath;
        } catch (IOException e) {
            throw new RuntimeException("Cannot create job output directory " + createTemporaryPath, e);
        }
    }

    private static Path createTemporaryPath(Configuration configuration) {
        return new Path(configuration.get(RuntimeParameters.TMP_DIR, "/tmp"), "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE));
    }

    @Override // org.apache.crunch.Pipeline
    public <T> void writeTextFile(PCollection<T> pCollection, String str) {
        write(pCollection.parallelDo("asText", IdentityFn.getInstance(), WritableTypeFamily.getInstance().as(pCollection.getPType())), At.textFile(str));
    }

    private void cleanup() {
        if (!this.outputTargets.isEmpty()) {
            LOG.warn("Not running cleanup while output targets remain");
            return;
        }
        try {
            FileSystem fileSystem = this.tempDirectory.getFileSystem(this.conf);
            if (fileSystem.exists(this.tempDirectory)) {
                fileSystem.delete(this.tempDirectory, true);
            }
        } catch (IOException e) {
            LOG.info("Exception during cleanup", e);
        }
    }

    public int getNextAnonymousStageId() {
        int i = this.nextAnonymousStageId;
        this.nextAnonymousStageId = i + 1;
        return i;
    }

    @Override // org.apache.crunch.Pipeline
    public void enableDebug() {
        getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
        Appender appender = LogManager.getLogger("org.apache.crunch").getAppender("A");
        if (appender == null) {
            LOG.warn("Could not find console appender named 'A' for writing Hadoop warning logs");
            return;
        }
        Logger logger = LogManager.getLogger("org.apache.hadoop");
        logger.setLevel(Level.WARN);
        logger.addAppender(appender);
    }

    @Override // org.apache.crunch.Pipeline
    public String getName() {
        return this.name;
    }
}
