package org.apache.crunch.lib.join;

import com.google.common.collect.ArrayListMultimap;
import com.google.common.collect.Multimap;
import java.io.IOException;
import java.util.Iterator;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;
import org.apache.crunch.PTable;
import org.apache.crunch.Pair;
import org.apache.crunch.Target;
import org.apache.crunch.impl.mr.MRPipeline;
import org.apache.crunch.impl.mr.run.CrunchRuntimeException;
import org.apache.crunch.io.ReadableSourceTarget;
import org.apache.crunch.io.impl.SourcePathTargetImpl;
import org.apache.crunch.types.PType;
import org.apache.crunch.types.PTypeFamily;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:lib/crunch-0.4.0-incubating.jar:org/apache/crunch/lib/join/MapsideJoin.class */
public class MapsideJoin {

    /* loaded from: input_file:lib/crunch-0.4.0-incubating.jar:org/apache/crunch/lib/join/MapsideJoin$MapsideJoinDoFn.class */
    static class MapsideJoinDoFn<K, U, V> extends DoFn<Pair<K, U>, Pair<K, Pair<U, V>>> {
        private String inputPath;
        private PType<Pair<K, V>> ptype;
        private Multimap<K, V> joinMap;

        public MapsideJoinDoFn(String str, PType<Pair<K, V>> pType) {
            this.inputPath = str;
            this.ptype = pType;
        }

        private Path getCacheFilePath() {
            try {
                for (Path path : DistributedCache.getLocalCacheFiles(getConfiguration())) {
                    if (path.toString().endsWith(this.inputPath)) {
                        return path.makeQualified(FileSystem.getLocal(getConfiguration()));
                    }
                }
                throw new CrunchRuntimeException("Can't find local cache file for '" + this.inputPath + "'");
            } catch (IOException e) {
                throw new CrunchRuntimeException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @Override // org.apache.crunch.DoFn
        public void initialize() {
            super.initialize();
            try {
                Iterable<T> read = ((ReadableSourceTarget) this.ptype.getDefaultFileSource(getCacheFilePath())).read(getConfiguration());
                this.joinMap = ArrayListMultimap.create();
                for (T t : read) {
                    this.joinMap.put(t.first(), t.second());
                }
            } catch (IOException e) {
                throw new CrunchRuntimeException("Error reading right-side of map side join: ", e);
            }
        }

        @Override // org.apache.crunch.DoFn
        public void process(Pair<K, U> pair, Emitter<Pair<K, Pair<U, V>>> emitter) {
            K first = pair.first();
            U second = pair.second();
            Iterator<V> it = this.joinMap.get(first).iterator();
            while (it.hasNext()) {
                emitter.emit(Pair.of(first, Pair.of(second, it.next())));
            }
        }
    }

    public static <K, U, V> PTable<K, Pair<U, V>> join(PTable<K, U> pTable, PTable<K, V> pTable2) {
        if (!(pTable2.getPipeline() instanceof MRPipeline)) {
            throw new CrunchRuntimeException("Map-side join is only supported within a MapReduce context");
        }
        MRPipeline mRPipeline = (MRPipeline) pTable2.getPipeline();
        mRPipeline.materialize(pTable2);
        mRPipeline.run();
        Target materializeSourceTarget = mRPipeline.getMaterializeSourceTarget(pTable2);
        if (!(materializeSourceTarget instanceof SourcePathTargetImpl)) {
            throw new CrunchRuntimeException("Right-side contents can't be read from a path");
        }
        Path path = ((SourcePathTargetImpl) materializeSourceTarget).getPath();
        DistributedCache.addCacheFile(path.toUri(), mRPipeline.getConfiguration());
        MapsideJoinDoFn mapsideJoinDoFn = new MapsideJoinDoFn(path.getName(), pTable2.getPType());
        PTypeFamily typeFamily = pTable.getTypeFamily();
        return pTable.parallelDo("mapjoin", (DoFn<S, Pair<K, U>>) mapsideJoinDoFn, typeFamily.tableOf(pTable.getKeyType(), typeFamily.pairs(pTable.getValueType(), pTable2.getValueType())));
    }
}
