package org.apache.crunch.impl.mr.collect;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.crunch.Aggregator;
import org.apache.crunch.CombineFn;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.GroupingOptions;
import org.apache.crunch.MapFn;
import org.apache.crunch.PGroupedTable;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ReadableData;
import org.apache.crunch.SourceTarget;
import org.apache.crunch.fn.Aggregators;
import org.apache.crunch.impl.mr.collect.PCollectionImpl;
import org.apache.crunch.impl.mr.plan.DoNode;
import org.apache.crunch.lib.PTables;
import org.apache.crunch.types.PGroupedTableType;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PType;
import org.apache.crunch.util.PartitionUtils;
import org.apache.hadoop.mapreduce.Job;

/* loaded from: input_file:lib/crunch-core-0.8.1.jar:org/apache/crunch/impl/mr/collect/PGroupedTableImpl.class */
public class PGroupedTableImpl<K, V> extends PCollectionImpl<Pair<K, Iterable<V>>> implements PGroupedTable<K, V> {
    private static final Log LOG = LogFactory.getLog(PGroupedTableImpl.class);
    private final PTableBase<K, V> parent;
    private final GroupingOptions groupingOptions;
    private final PGroupedTableType<K, V> ptype;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/crunch-core-0.8.1.jar:org/apache/crunch/impl/mr/collect/PGroupedTableImpl$Ungroup.class */
    public static class Ungroup<K, V> extends DoFn<Pair<K, Iterable<V>>, Pair<K, V>> {
        private Ungroup() {
        }

        @Override // org.apache.crunch.DoFn
        public void process(Pair<K, Iterable<V>> pair, Emitter<Pair<K, V>> emitter) {
            Iterator<V> it2 = pair.second().iterator();
            while (it2.hasNext()) {
                emitter.emit(Pair.of(pair.first(), it2.next()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PGroupedTableImpl(PTableBase<K, V> pTableBase) {
        this(pTableBase, null);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PGroupedTableImpl(PTableBase<K, V> pTableBase, GroupingOptions groupingOptions) {
        super("GBK");
        this.parent = pTableBase;
        this.groupingOptions = groupingOptions;
        this.ptype = pTableBase.getPTableType().getGroupedTableType();
    }

    public void configureShuffle(Job job) {
        this.ptype.configureShuffle(job, this.groupingOptions);
        if (this.groupingOptions == null || this.groupingOptions.getNumReducers() <= 0) {
            int recommendedPartitions = PartitionUtils.getRecommendedPartitions(this, getPipeline().getConfiguration());
            if (recommendedPartitions <= 0) {
                LOG.warn("Attempted to set a negative number of reduce tasks");
            } else {
                job.setNumReduceTasks(recommendedPartitions);
                LOG.info(String.format("Setting num reduce tasks to %d", Integer.valueOf(recommendedPartitions)));
            }
        }
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    protected ReadableData<Pair<K, Iterable<V>>> getReadableDataInternal() {
        throw new UnsupportedOperationException("PGroupedTable does not currently support readability");
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    protected long getSizeInternal() {
        return this.parent.getSizeInternal();
    }

    @Override // org.apache.crunch.PCollection
    public PType<Pair<K, Iterable<V>>> getPType() {
        return this.ptype;
    }

    @Override // org.apache.crunch.PGroupedTable
    public PTable<K, V> combineValues(CombineFn<K, V> combineFn, CombineFn<K, V> combineFn2) {
        return new DoTableImpl("combine", getChainingCollection(), combineFn, combineFn2, this.parent.getPTableType());
    }

    @Override // org.apache.crunch.PGroupedTable
    public PTable<K, V> combineValues(CombineFn<K, V> combineFn) {
        return combineValues(combineFn, combineFn);
    }

    @Override // org.apache.crunch.PGroupedTable
    public PTable<K, V> combineValues(Aggregator<V> aggregator) {
        return combineValues(Aggregators.toCombineFn(aggregator));
    }

    @Override // org.apache.crunch.PGroupedTable
    public PTable<K, V> combineValues(Aggregator<V> aggregator, Aggregator<V> aggregator2) {
        return combineValues(Aggregators.toCombineFn(aggregator), Aggregators.toCombineFn(aggregator2));
    }

    @Override // org.apache.crunch.PGroupedTable
    public PTable<K, V> ungroup() {
        return parallelDo("ungroup", (DoFn) new Ungroup(), (PTableType) this.parent.getPTableType());
    }

    @Override // org.apache.crunch.PGroupedTable
    public <U> PTable<K, U> mapValues(MapFn<Iterable<V>, U> mapFn, PType<U> pType) {
        return PTables.mapValues(this, mapFn, pType);
    }

    @Override // org.apache.crunch.PGroupedTable
    public <U> PTable<K, U> mapValues(String str, MapFn<Iterable<V>, U> mapFn, PType<U> pType) {
        return PTables.mapValues(str, this, mapFn, pType);
    }

    @Override // org.apache.crunch.PGroupedTable
    public PGroupedTableType<K, V> getGroupedTableType() {
        return this.ptype;
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    protected void acceptInternal(PCollectionImpl.Visitor visitor) {
        visitor.visitGroupedTable(this);
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    public Set<SourceTarget<?>> getTargetDependencies() {
        HashSet newHashSet = Sets.newHashSet(super.getTargetDependencies());
        if (this.groupingOptions != null) {
            newHashSet.addAll(this.groupingOptions.getSourceTargets());
        }
        return ImmutableSet.copyOf((Collection) newHashSet);
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    public List<PCollectionImpl<?>> getParents() {
        return ImmutableList.of(this.parent);
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    public DoNode createDoNode() {
        return DoNode.createFnNode(getName(), this.ptype.getInputMapFn(), this.ptype, this.doOptions);
    }

    public DoNode getGroupingNode() {
        return DoNode.createGroupingNode("", this.ptype);
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    public long getLastModifiedAt() {
        return this.parent.getLastModifiedAt();
    }

    @Override // org.apache.crunch.impl.mr.collect.PCollectionImpl
    protected PCollectionImpl<Pair<K, Iterable<V>>> getChainingCollection() {
        return new PGroupedTableImpl(this.parent, this.groupingOptions);
    }
}
