package org.apache.crunch.impl.dist;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.apache.crunch.CreateOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.MapFn;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineCallable;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
import org.apache.crunch.fn.IdentityFn;
import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
import org.apache.crunch.impl.dist.collect.BaseInputCollection;
import org.apache.crunch.impl.dist.collect.BaseInputTable;
import org.apache.crunch.impl.dist.collect.BaseUnionCollection;
import org.apache.crunch.impl.dist.collect.BaseUnionTable;
import org.apache.crunch.impl.dist.collect.EmptyPCollection;
import org.apache.crunch.impl.dist.collect.EmptyPTable;
import org.apache.crunch.impl.dist.collect.PCollectionFactory;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.dist.collect.PTableBase;
import org.apache.crunch.impl.mr.run.RuntimeParameters;
import org.apache.crunch.io.From;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.To;
import org.apache.crunch.materialize.MaterializableIterable;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.writable.Writables;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:lib/crunch-core-0.12.0.jar:org/apache/crunch/impl/dist/DistributedPipeline.class */
public abstract class DistributedPipeline implements Pipeline {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedPipeline.class);
    private static final Random RANDOM = new Random();
    private final String name;
    protected final PCollectionFactory factory;
    private Path tempDirectory;
    private Configuration conf;
    private PipelineCallable currentPipelineCallable;
    protected final Map<PCollectionImpl<?>, Set<Target>> outputTargets = Maps.newHashMap();
    protected final Map<PCollectionImpl<?>, MaterializableIterable<?>> outputTargetsToMaterialize = Maps.newHashMap();
    protected final Map<PipelineCallable<?>, Set<Target>> allPipelineCallables = Maps.newHashMap();
    protected final Set<Target> appendedTargets = Sets.newHashSet();
    private int tempFileIndex = 0;
    private int nextAnonymousStageId = 0;

    /* loaded from: input_file:lib/crunch-core-0.12.0.jar:org/apache/crunch/impl/dist/DistributedPipeline$StringifyFn.class */
    private static class StringifyFn<T> extends MapFn<T, String> {
        private StringifyFn() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.crunch.MapFn
        public String map(T t) {
            return t.toString();
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.crunch.MapFn
        public /* bridge */ /* synthetic */ String map(Object obj) {
            return map((StringifyFn<T>) obj);
        }
    }

    public DistributedPipeline(String str, Configuration configuration, PCollectionFactory pCollectionFactory) {
        this.name = str;
        this.factory = pCollectionFactory;
        this.conf = configuration;
        this.tempDirectory = createTempDirectory(configuration);
    }

    public PCollectionFactory getFactory() {
        return this.factory;
    }

    @Override // org.apache.crunch.Pipeline
    public Configuration getConfiguration() {
        return this.conf;
    }

    @Override // org.apache.crunch.Pipeline
    public void setConfiguration(Configuration configuration) {
        this.conf = configuration;
        this.tempDirectory = createTempDirectory(configuration);
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineResult done() {
        PipelineResult pipelineResult = PipelineResult.DONE;
        if (!this.outputTargets.isEmpty()) {
            pipelineResult = run();
        }
        cleanup();
        return pipelineResult;
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> union(List<PCollection<S>> list) {
        return this.factory.createUnionCollection(Lists.transform(list, new Function<PCollection<S>, PCollectionImpl<S>>() { // from class: org.apache.crunch.impl.dist.DistributedPipeline.1
            @Override // com.google.common.base.Function
            public PCollectionImpl<S> apply(PCollection<S> pCollection) {
                return (PCollectionImpl) pCollection;
            }
        }));
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> unionTables(List<PTable<K, V>> list) {
        return this.factory.createUnionTable(Lists.transform(list, new Function<PTable<K, V>, PTableBase<K, V>>() { // from class: org.apache.crunch.impl.dist.DistributedPipeline.2
            @Override // com.google.common.base.Function
            public PTableBase<K, V> apply(PTable<K, V> pTable) {
                return (PTableBase) pTable;
            }
        }));
    }

    @Override // org.apache.crunch.Pipeline
    public <Output> Output sequentialDo(PipelineCallable<Output> pipelineCallable) {
        this.allPipelineCallables.put(pipelineCallable, getDependencies(pipelineCallable));
        PipelineCallable pipelineCallable2 = this.currentPipelineCallable;
        this.currentPipelineCallable = pipelineCallable;
        Output generateOutput = pipelineCallable.generateOutput(this);
        this.currentPipelineCallable = pipelineCallable2;
        return generateOutput;
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> read(Source<S> source) {
        return read(source, (String) null);
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> read(Source<S> source, String str) {
        return this.factory.createInputCollection(source, str, this, getCurrentPDoOptions());
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
        return read((TableSource) tableSource, (String) null);
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> read(TableSource<K, V> tableSource, String str) {
        return this.factory.createInputTable(tableSource, str, this, getCurrentPDoOptions());
    }

    private ParallelDoOptions getCurrentPDoOptions() {
        ParallelDoOptions.Builder builder = ParallelDoOptions.builder();
        if (this.currentPipelineCallable != null) {
            builder.targets(this.allPipelineCallables.get(this.currentPipelineCallable));
        }
        return builder.build();
    }

    private Set<Target> getDependencies(PipelineCallable<?> pipelineCallable) {
        HashSet newHashSet = Sets.newHashSet(pipelineCallable.getAllTargets().values());
        for (PCollection<?> pCollection : pipelineCallable.getAllPCollections().values()) {
            newHashSet.addAll(((PCollectionImpl) pCollection).getTargetDependencies());
            ReadableSource source = ((MaterializableIterable) pCollection.materialize()).getSource();
            if (source instanceof Target) {
                newHashSet.add((Target) source);
            }
        }
        return newHashSet;
    }

    @Override // org.apache.crunch.Pipeline
    public PCollection<String> readTextFile(String str) {
        return read(From.textFile(str));
    }

    @Override // org.apache.crunch.Pipeline
    public void write(PCollection<?> pCollection, Target target) {
        write(pCollection, target, Target.WriteMode.DEFAULT);
    }

    @Override // org.apache.crunch.Pipeline
    public void write(PCollection<?> pCollection, Target target, Target.WriteMode writeMode) {
        if (pCollection instanceof BaseGroupedTable) {
            pCollection = ((BaseGroupedTable) pCollection).ungroup();
        } else if ((pCollection instanceof BaseUnionCollection) || (pCollection instanceof BaseUnionTable)) {
            pCollection = pCollection.parallelDo("UnionCollectionWrapper", IdentityFn.getInstance(), pCollection.getPType());
        }
        if (target.handleExisting(writeMode, ((PCollectionImpl) pCollection).getLastModifiedAt(), getConfiguration()) && writeMode == Target.WriteMode.CHECKPOINT) {
            SourceTarget asSourceTarget = target.asSourceTarget(pCollection.getPType());
            if (asSourceTarget == null) {
                throw new CrunchRuntimeException("Target " + target + " does not support checkpointing");
            }
            ((PCollectionImpl) pCollection).materializeAt(asSourceTarget);
            return;
        }
        if (writeMode != Target.WriteMode.APPEND && targetInCurrentRun(target)) {
            throw new CrunchRuntimeException("Target " + target + " is already written in current run. Use WriteMode.APPEND in order to write additional data to it.");
        }
        if (writeMode == Target.WriteMode.APPEND) {
            this.appendedTargets.add(target);
        }
        addOutput((PCollectionImpl) pCollection, target);
    }

    private boolean targetInCurrentRun(Target target) {
        Iterator<Set<Target>> it = this.outputTargets.values().iterator();
        while (it.hasNext()) {
            if (it.next().contains(target)) {
                return true;
            }
        }
        return false;
    }

    private void addOutput(PCollectionImpl<?> pCollectionImpl, Target target) {
        if (!this.outputTargets.containsKey(pCollectionImpl)) {
            this.outputTargets.put(pCollectionImpl, Sets.newHashSet());
        }
        this.outputTargets.get(pCollectionImpl).add(target);
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> emptyPCollection(PType<S> pType) {
        return new EmptyPCollection(this, pType);
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> emptyPTable(PTableType<K, V> pTableType) {
        return new EmptyPTable(this, pTableType);
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> create(Iterable<S> iterable, PType<S> pType) {
        return create(iterable, pType, CreateOptions.none());
    }

    @Override // org.apache.crunch.Pipeline
    public <S> PCollection<S> create(Iterable<S> iterable, PType<S> pType, CreateOptions createOptions) {
        if (Iterables.isEmpty(iterable)) {
            return emptyPCollection(pType);
        }
        try {
            return read(pType.createSourceTarget(getConfiguration(), createTempPath(), iterable, createOptions.getParallelism()));
        } catch (IOException e) {
            throw new CrunchRuntimeException("Error creating PCollection: " + iterable, e);
        }
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> iterable, PTableType<K, V> pTableType) {
        return create((Iterable) iterable, (PTableType) pTableType, CreateOptions.none());
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> create(Iterable<Pair<K, V>> iterable, PTableType<K, V> pTableType, CreateOptions createOptions) {
        if (Iterables.isEmpty(iterable)) {
            return emptyPTable(pTableType);
        }
        try {
            return read(pTableType.createSourceTarget(getConfiguration(), createTempPath(), iterable, createOptions.getParallelism())).parallelDo((DoFn) IdentityFn.getInstance(), (PTableType) pTableType);
        } catch (IOException e) {
            throw new CrunchRuntimeException("Error creating PTable: " + iterable, e);
        }
    }

    public <T> ReadableSource<T> getMaterializeSourceTarget(PCollection<T> pCollection) {
        PCollectionImpl<T> pCollectionImpl = toPCollectionImpl(pCollection);
        if (pCollectionImpl instanceof BaseInputCollection) {
            BaseInputCollection baseInputCollection = (BaseInputCollection) pCollectionImpl;
            if (baseInputCollection.getSource() instanceof ReadableSource) {
                return (ReadableSource) baseInputCollection.getSource();
            }
            throw new IllegalArgumentException("Cannot materialize non-readable input collection: " + baseInputCollection);
        }
        if (pCollectionImpl instanceof BaseInputTable) {
            BaseInputTable baseInputTable = (BaseInputTable) pCollectionImpl;
            if (baseInputTable.getSource() instanceof ReadableSource) {
                return (ReadableSource) baseInputTable.getSource();
            }
            throw new IllegalArgumentException("Cannot materialize non-readable input table: " + baseInputTable);
        }
        SourceTarget<T> materializedAt = pCollectionImpl.getMaterializedAt();
        if (materializedAt != null && (materializedAt instanceof ReadableSourceTarget)) {
            return (ReadableSourceTarget) materializedAt;
        }
        if (this.outputTargets.containsKey(pCollection)) {
            for (Target target : this.outputTargets.get(pCollectionImpl)) {
                if ((target instanceof ReadableSourceTarget) && !this.appendedTargets.contains(target)) {
                    return (ReadableSourceTarget) target;
                }
            }
        }
        SourceTarget<T> createIntermediateOutput = createIntermediateOutput(pCollection.getPType());
        if (!(createIntermediateOutput instanceof ReadableSourceTarget)) {
            throw new IllegalArgumentException("The PType for the given PCollection is not readable and cannot be materialized");
        }
        ReadableSourceTarget readableSourceTarget = (ReadableSourceTarget) createIntermediateOutput;
        addOutput(pCollectionImpl, readableSourceTarget);
        return readableSourceTarget;
    }

    private <T> PCollectionImpl<T> toPCollectionImpl(PCollection<T> pCollection) {
        return ((pCollection instanceof BaseUnionCollection) || (pCollection instanceof BaseUnionTable)) ? (PCollectionImpl) pCollection.parallelDo("UnionCollectionWrapper", IdentityFn.getInstance(), pCollection.getPType()) : (PCollectionImpl) pCollection;
    }

    public <T> SourceTarget<T> createIntermediateOutput(PType<T> pType) {
        return pType.getDefaultFileSource(createTempPath());
    }

    public Path createTempPath() {
        this.tempFileIndex++;
        return new Path(this.tempDirectory, "p" + this.tempFileIndex);
    }

    private static Path createTempDirectory(Configuration configuration) {
        Path createTemporaryPath = createTemporaryPath(configuration);
        try {
            createTemporaryPath.getFileSystem(configuration).mkdirs(createTemporaryPath);
            return createTemporaryPath;
        } catch (IOException e) {
            throw new RuntimeException("Cannot create job output directory " + createTemporaryPath, e);
        }
    }

    private static Path createTemporaryPath(Configuration configuration) {
        return new Path(configuration.get(RuntimeParameters.TMP_DIR, "/tmp"), "crunch-" + (RANDOM.nextInt() & Integer.MAX_VALUE));
    }

    @Override // org.apache.crunch.Pipeline
    public <T> void writeTextFile(PCollection<T> pCollection, String str) {
        pCollection.parallelDo("asText", new StringifyFn(), Writables.strings()).write(To.textFile(str));
    }

    @Override // org.apache.crunch.Pipeline
    public void cleanup(boolean z) {
        if (!z && !this.outputTargets.isEmpty()) {
            LOG.warn("Not running cleanup while output targets remain.");
            return;
        }
        try {
            FileSystem fileSystem = this.tempDirectory.getFileSystem(this.conf);
            if (fileSystem.exists(this.tempDirectory)) {
                fileSystem.delete(this.tempDirectory, true);
            }
        } catch (IOException e) {
            LOG.info("Exception during cleanup", (Throwable) e);
        }
    }

    private void cleanup() {
        cleanup(false);
    }

    public int getNextAnonymousStageId() {
        int i = this.nextAnonymousStageId;
        this.nextAnonymousStageId = i + 1;
        return i;
    }

    @Override // org.apache.crunch.Pipeline
    public void enableDebug() {
        getConfiguration().setBoolean(RuntimeParameters.DEBUG, true);
    }

    @Override // org.apache.crunch.Pipeline
    public String getName() {
        return this.name;
    }
}
