package org.apache.crunch.impl.mem;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.util.concurrent.AbstractFuture;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.mapred.AvroOutputFormat;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CachingOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.PCollection;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Pipeline;
import org.apache.crunch.PipelineExecution;
import org.apache.crunch.PipelineResult;
import org.apache.crunch.Source;
import org.apache.crunch.TableSource;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mem.collect.MemCollection;
import org.apache.crunch.impl.mem.collect.MemTable;
import org.apache.crunch.impl.mr.plan.PlanningParameters;
import org.apache.crunch.io.At;
import org.apache.crunch.io.PathTarget;
import org.apache.crunch.io.ReadableSource;
import org.apache.crunch.io.avro.AvroFileTarget;
import org.apache.crunch.io.seq.SeqFileTarget;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.types.avro.Avros;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapreduce.Counters;

/* loaded from: input_file:lib/crunch-core-0.9.0.jar:org/apache/crunch/impl/mem/MemPipeline.class */
public class MemPipeline implements Pipeline {
    private static final Log LOG = LogFactory.getLog(MemPipeline.class);
    private static Counters COUNTERS = new CountersWrapper();
    private static final MemPipeline INSTANCE = new MemPipeline();
    private int outputIndex = 0;
    private Configuration conf = new Configuration();
    private Set<Target> activeTargets = Sets.newHashSet();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/crunch-core-0.9.0.jar:org/apache/crunch/impl/mem/MemPipeline$MemExecution.class */
    public static class MemExecution extends AbstractFuture<PipelineResult> implements PipelineExecution {
        private PipelineResult res = new PipelineResult(ImmutableList.of(new PipelineResult.StageResult("MemPipelineStage", MemPipeline.COUNTERS)), PipelineExecution.Status.SUCCEEDED);

        @Override // org.apache.crunch.PipelineExecution
        public String getPlanDotFile() {
            return "";
        }

        @Override // org.apache.crunch.PipelineExecution
        public void waitFor(long j, TimeUnit timeUnit) throws InterruptedException {
            set(this.res);
        }

        @Override // org.apache.crunch.PipelineExecution
        public void waitUntilDone() throws InterruptedException {
            set(this.res);
        }

        @Override // com.google.common.util.concurrent.AbstractFuture, java.util.concurrent.Future
        public PipelineResult get() throws ExecutionException, InterruptedException {
            set(this.res);
            return (PipelineResult) super.get();
        }

        @Override // com.google.common.util.concurrent.AbstractFuture, java.util.concurrent.Future
        public PipelineResult get(long j, TimeUnit timeUnit) throws InterruptedException, ExecutionException, TimeoutException {
            set(this.res);
            return (PipelineResult) super.get(j, timeUnit);
        }

        @Override // org.apache.crunch.PipelineExecution
        public PipelineExecution.Status getStatus() {
            return isDone() ? PipelineExecution.Status.SUCCEEDED : PipelineExecution.Status.READY;
        }

        @Override // org.apache.crunch.PipelineExecution
        public PipelineResult getResult() {
            if (isDone()) {
                return this.res;
            }
            return null;
        }

        @Override // org.apache.crunch.PipelineExecution
        public void kill() {
        }
    }

    public static Counters getCounters() {
        return COUNTERS;
    }

    public static void clearCounters() {
        COUNTERS = new CountersWrapper();
    }

    public static Pipeline getInstance() {
        return INSTANCE;
    }

    public static <T> PCollection<T> collectionOf(T... tArr) {
        return new MemCollection(ImmutableList.copyOf(tArr));
    }

    public static <T> PCollection<T> collectionOf(Iterable<T> iterable) {
        return new MemCollection(iterable);
    }

    public static <T> PCollection<T> typedCollectionOf(PType<T> pType, T... tArr) {
        return new MemCollection(ImmutableList.copyOf(tArr), pType, null);
    }

    public static <T> PCollection<T> typedCollectionOf(PType<T> pType, Iterable<T> iterable) {
        return new MemCollection(iterable, pType, null);
    }

    public static <S, T> PTable<S, T> tableOf(S s, T t, Object... objArr) {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(Pair.of(s, t));
        for (int i = 0; i < objArr.length; i += 2) {
            newArrayList.add(Pair.of(objArr[i], objArr[i + 1]));
        }
        return new MemTable(newArrayList);
    }

    public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> pTableType, S s, T t, Object... objArr) {
        ArrayList newArrayList = Lists.newArrayList();
        newArrayList.add(Pair.of(s, t));
        for (int i = 0; i < objArr.length; i += 2) {
            newArrayList.add(Pair.of(objArr[i], objArr[i + 1]));
        }
        return new MemTable(newArrayList, pTableType, null);
    }

    public static <S, T> PTable<S, T> tableOf(Iterable<Pair<S, T>> iterable) {
        return new MemTable(iterable);
    }

    public static <S, T> PTable<S, T> typedTableOf(PTableType<S, T> pTableType, Iterable<Pair<S, T>> iterable) {
        return new MemTable(iterable, pTableType, null);
    }

    private MemPipeline() {
    }

    @Override // org.apache.crunch.Pipeline
    public void setConfiguration(Configuration configuration) {
        this.conf = configuration;
    }

    @Override // org.apache.crunch.Pipeline
    public Configuration getConfiguration() {
        return this.conf;
    }

    @Override // org.apache.crunch.Pipeline
    public <T> PCollection<T> read(Source<T> source) {
        if (!(source instanceof ReadableSource)) {
            LOG.error("Source " + source + " is not readable");
            throw new IllegalStateException("Source " + source + " is not readable");
        }
        try {
            return new MemCollection(((ReadableSource) source).read(this.conf), source.getType(), source.toString());
        } catch (IOException e) {
            LOG.error("Exception reading source: " + source.toString(), e);
            throw new IllegalStateException(e);
        }
    }

    @Override // org.apache.crunch.Pipeline
    public <K, V> PTable<K, V> read(TableSource<K, V> tableSource) {
        if (!(tableSource instanceof ReadableSource)) {
            LOG.error("Source " + tableSource + " is not readable");
            throw new IllegalStateException("Source " + tableSource + " is not readable");
        }
        try {
            return new MemTable(((ReadableSource) tableSource).read(this.conf), tableSource.getTableType(), tableSource.toString());
        } catch (IOException e) {
            LOG.error("Exception reading source: " + tableSource.toString(), e);
            throw new IllegalStateException(e);
        }
    }

    @Override // org.apache.crunch.Pipeline
    public void write(PCollection<?> pCollection, Target target) {
        write(pCollection, target, Target.WriteMode.DEFAULT);
    }

    @Override // org.apache.crunch.Pipeline
    public void write(PCollection<?> pCollection, Target target, Target.WriteMode writeMode) {
        target.handleExisting(writeMode, -1L, getConfiguration());
        if (writeMode != Target.WriteMode.APPEND && this.activeTargets.contains(target)) {
            throw new CrunchRuntimeException("Target " + target + " is already written in the current run. Use WriteMode.APPEND in order to write additional data to it.");
        }
        this.activeTargets.add(target);
        if (!(target instanceof PathTarget)) {
            LOG.error("Target " + target + " is not a PathTarget instance");
            return;
        }
        if (pCollection.getPType() != null) {
            pCollection.getPType().initialize(getConfiguration());
        }
        Path path = ((PathTarget) target).getPath();
        try {
            FileSystem fileSystem = path.getFileSystem(this.conf);
            this.outputIndex++;
            if (target instanceof SeqFileTarget) {
                Path path2 = new Path(path, PlanningParameters.MULTI_OUTPUT_PREFIX + this.outputIndex + ".seq");
                if (pCollection instanceof PTable) {
                    writeSequenceFileFromPTable(fileSystem, path2, (PTable) pCollection);
                } else {
                    writeSequenceFileFromPCollection(fileSystem, path2, pCollection);
                }
            } else if (!(target instanceof AvroFileTarget) || (pCollection instanceof PTable)) {
                LOG.warn("Defaulting to write to a text file from MemPipeline");
                FSDataOutputStream create = fileSystem.create(new Path(path, PlanningParameters.MULTI_OUTPUT_PREFIX + this.outputIndex + ".txt"));
                if (pCollection instanceof PTable) {
                    Iterator<?> it2 = pCollection.materialize().iterator();
                    while (it2.hasNext()) {
                        Pair pair = (Pair) it2.next();
                        create.writeBytes(pair.first().toString());
                        create.writeBytes("\t");
                        create.writeBytes(pair.second().toString());
                        create.writeBytes("\r\n");
                    }
                } else {
                    Iterator<?> it3 = pCollection.materialize().iterator();
                    while (it3.hasNext()) {
                        create.writeBytes(it3.next().toString() + "\r\n");
                    }
                }
                create.close();
            } else {
                FSDataOutputStream create2 = fileSystem.create(new Path(path, PlanningParameters.MULTI_OUTPUT_PREFIX + this.outputIndex + AvroOutputFormat.EXT));
                writeAvroFile(create2, pCollection);
                create2.close();
            }
        } catch (IOException e) {
            LOG.error("Exception writing target: " + target, e);
        }
    }

    private void writeAvroFile(FSDataOutputStream fSDataOutputStream, PCollection pCollection) throws IOException {
        AvroType avroType = (AvroType) pCollection.getPType();
        if (avroType == null) {
            throw new IllegalStateException("Can't write a non-typed Avro collection");
        }
        DataFileWriter dataFileWriter = new DataFileWriter(Avros.newWriter((AvroType) pCollection.getPType()));
        dataFileWriter.create(avroType.getSchema(), fSDataOutputStream);
        Iterator it2 = pCollection.materialize().iterator();
        while (it2.hasNext()) {
            dataFileWriter.append(avroType.getOutputMapFn().map(it2.next()));
        }
        dataFileWriter.close();
        fSDataOutputStream.close();
    }

    private void writeSequenceFileFromPTable(FileSystem fileSystem, Path path, PTable pTable) throws IOException {
        PTableType pTableType = pTable.getPTableType();
        SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem, fileSystem.getConf(), path, pTableType.getConverter().getKeyClass(), pTableType.getConverter().getValueClass());
        Iterator it2 = pTable.materialize().iterator();
        while (it2.hasNext()) {
            Pair pair = (Pair) it2.next();
            writer.append(pTableType.getKeyType().getOutputMapFn().map(pair.first()), pTableType.getValueType().getOutputMapFn().map(pair.second()));
        }
        writer.close();
    }

    private void writeSequenceFileFromPCollection(FileSystem fileSystem, Path path, PCollection pCollection) throws IOException {
        PType pType = pCollection.getPType();
        SequenceFile.Writer writer = new SequenceFile.Writer(fileSystem, fileSystem.getConf(), path, NullWritable.class, pType.getConverter().getValueClass());
        Iterator it2 = pCollection.materialize().iterator();
        while (it2.hasNext()) {
            writer.append(NullWritable.get(), pType.getOutputMapFn().map(it2.next()));
        }
        writer.close();
    }

    @Override // org.apache.crunch.Pipeline
    public PCollection<String> readTextFile(String str) {
        return read(At.textFile(str));
    }

    @Override // org.apache.crunch.Pipeline
    public <T> void writeTextFile(PCollection<T> pCollection, String str) {
        write(pCollection, At.textFile(str));
    }

    @Override // org.apache.crunch.Pipeline
    public <T> Iterable<T> materialize(PCollection<T> pCollection) {
        return pCollection.materialize();
    }

    @Override // org.apache.crunch.Pipeline
    public <T> void cache(PCollection<T> pCollection, CachingOptions cachingOptions) {
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineExecution runAsync() {
        this.activeTargets.clear();
        return new MemExecution();
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineResult run() {
        try {
            return (PipelineResult) runAsync().get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // org.apache.crunch.Pipeline
    public void cleanup(boolean z) {
    }

    @Override // org.apache.crunch.Pipeline
    public PipelineResult done() {
        return run();
    }

    @Override // org.apache.crunch.Pipeline
    public void enableDebug() {
        LOG.info("Note: in-memory pipelines do not have debug logging");
    }

    @Override // org.apache.crunch.Pipeline
    public String getName() {
        return "Memory Pipeline";
    }
}
