package org.apache.crunch.impl.spark.collect;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import org.apache.crunch.CreateOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.ReadableData;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.spark.ByteArray;
import org.apache.crunch.impl.spark.SparkCollection;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.impl.spark.SparkRuntime;
import org.apache.crunch.impl.spark.SparkRuntimeContext;
import org.apache.crunch.impl.spark.serde.SerDe;
import org.apache.crunch.impl.spark.serde.SerDeFactory;
import org.apache.crunch.types.PType;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.storage.StorageLevel;

/* loaded from: input_file:org/apache/crunch/impl/spark/collect/CreatedCollection.class */
public class CreatedCollection<T> extends PCollectionImpl<T> implements SparkCollection {
    private final Iterable<T> contents;
    private final PType<T> ptype;
    private final int parallelism;
    private JavaRDD<T> rdd;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/crunch/impl/spark/collect/CreatedCollection$MapInputFn.class */
    public static class MapInputFn<T> implements Function<ByteArray, T> {
        private final SerDe serde;
        private final MapFn<Object, T> fn;
        private final SparkRuntimeContext context;
        private boolean initialized = false;

        public MapInputFn(SerDe serDe, MapFn<Object, T> mapFn, SparkRuntimeContext sparkRuntimeContext) {
            this.serde = serDe;
            this.fn = mapFn;
            this.context = sparkRuntimeContext;
        }

        public T call(ByteArray byteArray) throws Exception {
            if (!this.initialized) {
                this.context.initialize(this.fn, -1);
                this.initialized = true;
            }
            return (T) this.fn.map(this.serde.fromBytes(byteArray.value));
        }
    }

    public CreatedCollection(SparkPipeline sparkPipeline, Iterable<T> iterable, PType<T> pType, CreateOptions createOptions) {
        super(createOptions.getName(), sparkPipeline);
        this.contents = iterable;
        this.ptype = pType;
        this.parallelism = createOptions.getParallelism();
    }

    protected void acceptInternal(PCollectionImpl.Visitor visitor) {
    }

    public List<PCollectionImpl<?>> getParents() {
        return ImmutableList.of();
    }

    protected ReadableData<T> getReadableDataInternal() {
        try {
            return this.ptype.createSourceTarget(getPipeline().getConfiguration(), getPipeline().createTempPath(), this.contents, this.parallelism).asReadable();
        } catch (IOException e) {
            throw new CrunchRuntimeException(e);
        }
    }

    protected long getSizeInternal() {
        return Iterables.size(this.contents);
    }

    public long getLastModifiedAt() {
        return -1L;
    }

    public PType<T> getPType() {
        return this.ptype;
    }

    @Override // org.apache.crunch.impl.spark.SparkCollection
    public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime sparkRuntime) {
        if (!sparkRuntime.isValid(this.rdd)) {
            this.rdd = getJavaRDDLikeInternal(sparkRuntime);
            this.rdd.rdd().setName(getName());
            StorageLevel storageLevel = sparkRuntime.getStorageLevel(this);
            if (storageLevel != null) {
                this.rdd.rdd().persist(storageLevel);
            }
        }
        return this.rdd;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private JavaRDD<T> getJavaRDDLikeInternal(SparkRuntime sparkRuntime) {
        SerDe create = SerDeFactory.create(this.ptype, sparkRuntime.getConfiguration());
        this.ptype.initialize(sparkRuntime.getConfiguration());
        LinkedList newLinkedList = Lists.newLinkedList();
        try {
            Iterator<T> it = this.contents.iterator();
            while (it.hasNext()) {
                newLinkedList.add(create.toBytes(this.ptype.getOutputMapFn().map(it.next())));
            }
            return sparkRuntime.getSparkContext().parallelize(newLinkedList, this.parallelism).map(new MapInputFn(create, this.ptype.getInputMapFn(), sparkRuntime.getRuntimeContext()));
        } catch (Exception e) {
            throw new CrunchRuntimeException(e);
        }
    }
}
