package org.apache.crunch.impl.spark.fn;

import java.io.IOException;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.Pair;
import org.apache.crunch.impl.spark.IntByteArray;
import org.apache.crunch.impl.spark.SparkRuntimeContext;
import org.apache.crunch.impl.spark.serde.SerDe;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

/* loaded from: input_file:org/apache/crunch/impl/spark/fn/PartitionedMapOutputFunction.class */
public class PartitionedMapOutputFunction<K, V> extends PairFunction<Pair<K, V>, IntByteArray, byte[]> {
    private final SerDe<K> keySerde;
    private final SerDe<V> valueSerde;
    private final PGroupedTableType<K, V> ptype;
    private final Class<? extends Partitioner> partitionerClass;
    private final int numPartitions;
    private final SparkRuntimeContext runtimeContext;
    private transient Partitioner partitioner;

    public PartitionedMapOutputFunction(SerDe<K> serDe, SerDe<V> serDe2, PGroupedTableType<K, V> pGroupedTableType, Class<? extends Partitioner> cls, int i, SparkRuntimeContext sparkRuntimeContext) {
        this.keySerde = serDe;
        this.valueSerde = serDe2;
        this.ptype = pGroupedTableType;
        this.partitionerClass = cls;
        this.numPartitions = i;
        this.runtimeContext = sparkRuntimeContext;
    }

    public Tuple2<IntByteArray, byte[]> call(Pair<K, V> pair) throws Exception {
        return new Tuple2<>(new IntByteArray(getPartitioner().getPartition(pair.first(), pair.second(), this.numPartitions), this.keySerde.toBytes(pair.first())), this.valueSerde.toBytes(pair.second()));
    }

    private Partitioner getPartitioner() {
        if (this.partitioner == null) {
            try {
                this.ptype.initialize(this.runtimeContext.getConfiguration());
                Job job = new Job(this.runtimeContext.getConfiguration());
                this.ptype.configureShuffle(job, GroupingOptions.builder().partitionerClass(this.partitionerClass).build());
                this.partitioner = (Partitioner) ReflectionUtils.newInstance(this.partitionerClass, job.getConfiguration());
            } catch (IOException e) {
                throw new CrunchRuntimeException("Error configuring partitioner", e);
            }
        }
        return this.partitioner;
    }
}
