package org.apache.crunch.impl.spark.collect;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.CombineFn;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.impl.dist.collect.BaseGroupedTable;
import org.apache.crunch.impl.dist.collect.PTableBase;
import org.apache.crunch.impl.spark.SparkCollection;
import org.apache.crunch.impl.spark.SparkComparator;
import org.apache.crunch.impl.spark.SparkPartitioner;
import org.apache.crunch.impl.spark.SparkRuntime;
import org.apache.crunch.impl.spark.fn.CombineMapsideFunction;
import org.apache.crunch.impl.spark.fn.MapOutputFunction;
import org.apache.crunch.impl.spark.fn.PairMapFunction;
import org.apache.crunch.impl.spark.fn.PairMapIterableFunction;
import org.apache.crunch.impl.spark.fn.PartitionedMapOutputFunction;
import org.apache.crunch.impl.spark.fn.ReduceGroupingFunction;
import org.apache.crunch.impl.spark.fn.ReduceInputFunction;
import org.apache.crunch.impl.spark.serde.AvroSerDe;
import org.apache.crunch.impl.spark.serde.SerDe;
import org.apache.crunch.impl.spark.serde.WritableSerDe;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.avro.AvroMode;
import org.apache.crunch.types.avro.AvroType;
import org.apache.crunch.util.PartitionUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDDLike;
import org.apache.spark.storage.StorageLevel;

/* loaded from: input_file:org/apache/crunch/impl/spark/collect/PGroupedTableImpl.class */
public class PGroupedTableImpl<K, V> extends BaseGroupedTable<K, V> implements SparkCollection {
    private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
    private JavaRDDLike<?, ?> rdd;

    /* JADX INFO: Access modifiers changed from: package-private */
    public PGroupedTableImpl(PTableBase<K, V> pTableBase, GroupingOptions groupingOptions) {
        super(pTableBase, groupingOptions);
    }

    @Override // org.apache.crunch.impl.spark.SparkCollection
    public JavaRDDLike<?, ?> getJavaRDDLike(SparkRuntime sparkRuntime) {
        if (!sparkRuntime.isValid(this.rdd)) {
            this.rdd = getJavaRDDLikeInternal(sparkRuntime, sparkRuntime.getCombineFn());
            this.rdd.rdd().setName(getName());
            StorageLevel storageLevel = sparkRuntime.getStorageLevel(this);
            if (storageLevel != null) {
                this.rdd.rdd().persist(storageLevel);
            }
        }
        return this.rdd;
    }

    private AvroSerDe getAvroSerde(PType pType, Configuration configuration) {
        AvroType avroType = (AvroType) pType;
        return new AvroSerDe(avroType, AvroMode.fromType(avroType).withFactoryFromConfiguration(configuration).getModeProperties());
    }

    private JavaRDDLike<?, ?> getJavaRDDLikeInternal(SparkRuntime sparkRuntime, CombineFn<K, V> combineFn) {
        SerDe writableSerDe;
        SerDe writableSerDe2;
        JavaPairRDD javaRDDLike = getOnlyParent().getJavaRDDLike(sparkRuntime);
        if (combineFn != null) {
            javaRDDLike = javaRDDLike.mapPartitions(new CombineMapsideFunction(combineFn, sparkRuntime.getRuntimeContext()));
        }
        PTableType tableType = this.ptype.getTableType();
        if (tableType instanceof AvroType) {
            writableSerDe = getAvroSerde(tableType.getKeyType(), sparkRuntime.getConfiguration());
            writableSerDe2 = getAvroSerde(tableType.getValueType(), sparkRuntime.getConfiguration());
        } else {
            writableSerDe = new WritableSerDe(tableType.getKeyType().getSerializationClass());
            writableSerDe2 = new WritableSerDe(tableType.getValueType().getSerializationClass());
        }
        int numReducers = this.groupingOptions.getNumReducers() > 0 ? this.groupingOptions.getNumReducers() : PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
        if (numReducers <= 0) {
            LOG.warn("Attempted to set a non-positive number of partitions");
            numReducers = 1;
        }
        JavaPairRDD groupByKey = this.groupingOptions.getPartitionerClass() != null ? javaRDDLike.map(new PairMapFunction(this.ptype.getOutputMapFn(), sparkRuntime.getRuntimeContext())).map(new PartitionedMapOutputFunction(writableSerDe, writableSerDe2, this.ptype, this.groupingOptions.getPartitionerClass(), numReducers, sparkRuntime.getRuntimeContext())).groupByKey(new SparkPartitioner(numReducers)) : javaRDDLike.map(new PairMapFunction(this.ptype.getOutputMapFn(), sparkRuntime.getRuntimeContext())).map(new MapOutputFunction(writableSerDe, writableSerDe2)).groupByKey(numReducers);
        if (this.groupingOptions.requireSortedKeys() || this.groupingOptions.getSortComparatorClass() != null) {
            groupByKey = groupByKey.sortByKey(new SparkComparator(this.groupingOptions, this.ptype, sparkRuntime.getRuntimeContext()));
        }
        if (this.groupingOptions.getGroupingComparatorClass() != null) {
            groupByKey = groupByKey.mapPartitions(new ReduceGroupingFunction(this.groupingOptions, this.ptype, sparkRuntime.getRuntimeContext()));
        }
        return groupByKey.map(new ReduceInputFunction(writableSerDe, writableSerDe2)).map(new PairMapIterableFunction(this.ptype.getInputMapFn(), sparkRuntime.getRuntimeContext()));
    }
}
