package org.apache.crunch.lib.join;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.Collection;
import java.util.Iterator;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.MapFn;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.ParallelDoOptions;
import org.apache.crunch.ReadableData;
import org.apache.crunch.types.PTableType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.conf.Configuration;

/* loaded from: input_file:lib/crunch-core-0.10.0-hadoop2.jar:org/apache/crunch/lib/join/MapsideJoinStrategy.class */
public class MapsideJoinStrategy<K, U, V> implements JoinStrategy<K, U, V> {
    private boolean materialize;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:lib/crunch-core-0.10.0-hadoop2.jar:org/apache/crunch/lib/join/MapsideJoinStrategy$LoadLeftSideMapsideJoinStrategy.class */
    public static class LoadLeftSideMapsideJoinStrategy<K, U, V> extends MapsideJoinStrategy<K, U, V> {
        private MapsideJoinStrategy<K, V, U> mapsideJoinStrategy;

        public LoadLeftSideMapsideJoinStrategy(boolean z) {
            this.mapsideJoinStrategy = new MapsideJoinStrategy<>(z);
        }

        @Override // org.apache.crunch.lib.join.MapsideJoinStrategy, org.apache.crunch.lib.join.JoinStrategy
        public PTable<K, Pair<U, V>> join(PTable<K, U> pTable, PTable<K, V> pTable2, JoinType joinType) {
            JoinType joinType2;
            switch (joinType) {
                case INNER_JOIN:
                    joinType2 = JoinType.INNER_JOIN;
                    break;
                case RIGHT_OUTER_JOIN:
                    joinType2 = JoinType.LEFT_OUTER_JOIN;
                    break;
                default:
                    throw new UnsupportedOperationException("Join type " + joinType + " is not supported");
            }
            return this.mapsideJoinStrategy.join(pTable2, pTable, joinType2).mapValues("Reverse order out output table values", new ReversePairOrderFn(), pTable.getTypeFamily().pairs(pTable.getValueType(), pTable2.getValueType()));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:lib/crunch-core-0.10.0-hadoop2.jar:org/apache/crunch/lib/join/MapsideJoinStrategy$MapsideJoinDoFn.class */
    public static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
        private final ReadableData<Pair<K, V>> readable;
        private final PTableType<K, V> tableType;
        private final boolean includeUnmatched;
        private Multimap<K, V> joinMap;

        public MapsideJoinDoFn(ReadableData<Pair<K, V>> readableData, PTableType<K, V> pTableType, boolean z) {
            this.readable = readableData;
            this.tableType = pTableType;
            this.includeUnmatched = z;
        }

        @Override // org.apache.crunch.DoFn
        public void configure(Configuration configuration) {
            this.readable.configure(configuration);
        }

        @Override // org.apache.crunch.DoFn
        public void initialize() {
            super.initialize();
            this.tableType.initialize(getConfiguration());
            this.joinMap = ArrayListMultimap.create();
            try {
                Iterator<Pair<K, V>> it = this.readable.read(getContext()).iterator();
                while (it.hasNext()) {
                    Pair<K, V> detachedValue = this.tableType.getDetachedValue(it.next());
                    this.joinMap.put(detachedValue.first(), detachedValue.second());
                }
            } catch (IOException e) {
                throw new CrunchRuntimeException("Error reading map-side join data", e);
            }
        }

        @Override // org.apache.crunch.DoFn
        public void process(Pair<K, U> pair, Emitter<Pair<K, Pair<U, V>>> emitter) {
            K first = pair.first();
            U second = pair.second();
            Collection<V> collection = this.joinMap.get(first);
            if (this.includeUnmatched && collection.isEmpty()) {
                emitter.emit(Pair.of(first, Pair.of(second, null)));
                return;
            }
            Iterator<V> it = collection.iterator();
            while (it.hasNext()) {
                emitter.emit(Pair.of(first, Pair.of(second, it.next())));
            }
        }
    }

    /* loaded from: input_file:lib/crunch-core-0.10.0-hadoop2.jar:org/apache/crunch/lib/join/MapsideJoinStrategy$ReversePairOrderFn.class */
    private static class ReversePairOrderFn<V, U> extends MapFn<Pair<V, U>, Pair<U, V>> {
        private ReversePairOrderFn() {
        }

        @Override // org.apache.crunch.MapFn
        public Pair<U, V> map(Pair<V, U> pair) {
            return Pair.of(pair.second(), pair.first());
        }
    }

    @Deprecated
    public MapsideJoinStrategy() {
        this(true);
    }

    @Deprecated
    public MapsideJoinStrategy(boolean z) {
        this.materialize = z;
    }

    public static <K, U, V> MapsideJoinStrategy<K, U, V> create() {
        return create(true);
    }

    public static <K, U, V> MapsideJoinStrategy<K, U, V> create(boolean z) {
        return new LoadLeftSideMapsideJoinStrategy(z);
    }

    @Override // org.apache.crunch.lib.join.JoinStrategy
    public PTable<K, Pair<U, V>> join(PTable<K, U> pTable, PTable<K, V> pTable2, JoinType joinType) {
        switch (joinType) {
            case INNER_JOIN:
                return joinInternal(pTable, pTable2, false);
            case LEFT_OUTER_JOIN:
                return joinInternal(pTable, pTable2, true);
            default:
                throw new UnsupportedOperationException("Join type " + joinType + " not supported by MapsideJoinStrategy");
        }
    }

    private PTable<K, Pair<U, V>> joinInternal(PTable<K, U> pTable, PTable<K, V> pTable2, boolean z) {
        PTypeFamily typeFamily = pTable.getTypeFamily();
        ReadableData<Pair<K, V>> asReadable = pTable2.asReadable(this.materialize);
        return pTable.parallelDo("mapjoin", (DoFn<S, Pair<K, U>>) new MapsideJoinDoFn(asReadable, pTable2.getPTableType(), z), typeFamily.tableOf(pTable.getKeyType(), typeFamily.pairs(pTable.getValueType(), pTable2.getValueType())), ParallelDoOptions.builder().sourceTargets(asReadable.getSourceTargets()).build());
    }
}
