package org.apache.crunch.impl.spark.collect;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import org.apache.crunch.CreateOptions;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.MapFn;
import org.apache.crunch.Pair;
import org.apache.crunch.ReadableData;
import org.apache.crunch.impl.dist.collect.PCollectionImpl;
import org.apache.crunch.impl.dist.collect.PTableBase;
import org.apache.crunch.impl.spark.ByteArray;
import org.apache.crunch.impl.spark.SparkCollection;
import org.apache.crunch.impl.spark.SparkPipeline;
import org.apache.crunch.impl.spark.SparkRuntime;
import org.apache.crunch.impl.spark.SparkRuntimeContext;
import org.apache.crunch.impl.spark.serde.SerDe;
import org.apache.crunch.impl.spark.serde.SerDeFactory;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.storage.StorageLevel;
import scala.Tuple2;

/* loaded from: input_file:org/apache/crunch/impl/spark/collect/CreatedTable.class */
public class CreatedTable<K, V> extends PTableBase<K, V> implements SparkCollection {
    private final Iterable<Pair<K, V>> contents;
    private final PTableType<K, V> ptype;
    private final int parallelism;
    private JavaPairRDD<K, V> rdd;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/crunch/impl/spark/collect/CreatedTable$MapPairInputFn.class */
    public static class MapPairInputFn<K, V> implements PairFunction<Tuple2<ByteArray, ByteArray>, K, V> {
        private final SerDe keySerde;
        private final SerDe valueSerde;
        private final MapFn<Object, K> keyFn;
        private final MapFn<Object, V> valueFn;
        private final SparkRuntimeContext context;
        private boolean initialized = false;

        public MapPairInputFn(SerDe serDe, SerDe serDe2, MapFn<Object, K> mapFn, MapFn<Object, V> mapFn2, SparkRuntimeContext sparkRuntimeContext) {
            this.keySerde = serDe;
            this.valueSerde = serDe2;
            this.keyFn = mapFn;
            this.valueFn = mapFn2;
            this.context = sparkRuntimeContext;
        }

        public Tuple2<K, V> call(Tuple2<ByteArray, ByteArray> tuple2) throws Exception {
            if (!this.initialized) {
                this.context.initialize(this.keyFn, -1);
                this.context.initialize(this.valueFn, -1);
                this.initialized = true;
            }
            return new Tuple2<>(this.keyFn.map(this.keySerde.fromBytes(((ByteArray) tuple2._1()).value)), this.valueFn.map(this.valueSerde.fromBytes(((ByteArray) tuple2._2()).value)));
        }
    }

    public CreatedTable(SparkPipeline sparkPipeline, Iterable<Pair<K, V>> iterable, PTableType<K, V> pTableType, CreateOptions createOptions) {
        super(createOptions.getName(), sparkPipeline);
        this.contents = iterable;
        this.ptype = pTableType;
        this.parallelism = createOptions.getParallelism();
    }

    protected void acceptInternal(PCollectionImpl.Visitor visitor) {
    }

    public List<PCollectionImpl<?>> getParents() {
        return ImmutableList.of();
    }

    protected ReadableData<Pair<K, V>> getReadableDataInternal() {
        try {
            return this.ptype.createSourceTarget(this.pipeline.getConfiguration(), this.pipeline.createTempPath(), this.contents, this.parallelism).asReadable();
        } catch (IOException e) {
            throw new CrunchRuntimeException(e);
        }
    }

    protected long getSizeInternal() {
        return Iterables.size(this.contents);
    }

    public long getLastModifiedAt() {
        return -1L;
    }

    public PTableType<K, V> getPTableType() {
        return this.ptype;
    }

    public PType<Pair<K, V>> getPType() {
        return this.ptype;
    }

    @Override // org.apache.crunch.impl.spark.SparkCollection
    public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime sparkRuntime) {
        if (!sparkRuntime.isValid(this.rdd)) {
            this.rdd = getJavaRDDLikeInternal(sparkRuntime);
            this.rdd.rdd().setName(getName());
            StorageLevel storageLevel = sparkRuntime.getStorageLevel(this);
            if (storageLevel != null) {
                this.rdd.rdd().persist(storageLevel);
            }
        }
        return this.rdd;
    }

    private JavaPairRDD<K, V> getJavaRDDLikeInternal(SparkRuntime sparkRuntime) {
        this.ptype.initialize(sparkRuntime.getConfiguration());
        PType keyType = this.ptype.getKeyType();
        PType valueType = this.ptype.getValueType();
        SerDe create = SerDeFactory.create(keyType, sparkRuntime.getConfiguration());
        SerDe create2 = SerDeFactory.create(valueType, sparkRuntime.getConfiguration());
        LinkedList newLinkedList = Lists.newLinkedList();
        try {
            for (Pair<K, V> pair : this.contents) {
                newLinkedList.add(new Tuple2(create.toBytes(keyType.getOutputMapFn().map(pair.first())), create2.toBytes(valueType.getOutputMapFn().map(pair.second()))));
            }
            return sparkRuntime.getSparkContext().parallelizePairs(newLinkedList, this.parallelism).mapToPair(new MapPairInputFn(create, create2, keyType.getInputMapFn(), valueType.getInputMapFn(), sparkRuntime.getRuntimeContext()));
        } catch (Exception e) {
            throw new CrunchRuntimeException(e);
        }
    }
}
