package org.apache.reef.io.data.loading.impl;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.reef.annotations.audience.DriverSide;

@DriverSide
/* loaded from: input_file:org/apache/reef/io/data/loading/impl/EvaluatorToPartitionMapper.class */
public class EvaluatorToPartitionMapper<V extends InputSplit> {
    private static final Logger LOG = Logger.getLogger(EvaluatorToPartitionMapper.class.getName());
    private final ConcurrentMap<String, BlockingQueue<NumberedSplit<V>>> locationToSplits = new ConcurrentHashMap();
    private final ConcurrentMap<String, NumberedSplit<V>> evaluatorToSplits = new ConcurrentHashMap();
    private final BlockingQueue<NumberedSplit<V>> unallocatedSplits = new LinkedBlockingQueue();

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v37, types: [java.util.concurrent.BlockingQueue] */
    public EvaluatorToPartitionMapper(V[] vArr) {
        for (int i = 0; i < vArr.length; i++) {
            try {
                LOG.log(Level.FINE, "Processing split: " + i);
                V v = vArr[i];
                String[] locations = v.getLocations();
                NumberedSplit<V> numberedSplit = new NumberedSplit<>(v, i);
                this.unallocatedSplits.add(numberedSplit);
                for (String str : locations) {
                    LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
                    BlockingQueue<NumberedSplit<V>> putIfAbsent = this.locationToSplits.putIfAbsent(str, linkedBlockingQueue);
                    if (putIfAbsent != 0) {
                        linkedBlockingQueue = putIfAbsent;
                    }
                    linkedBlockingQueue.add(numberedSplit);
                }
            } catch (IOException e) {
                throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
            }
        }
        for (Map.Entry<String, BlockingQueue<NumberedSplit<V>>> entry : this.locationToSplits.entrySet()) {
            LOG.log(Level.FINE, entry.getKey() + ": " + entry.getValue().toString());
        }
    }

    public NumberedSplit<V> getInputSplit(String str, String str2) {
        synchronized (this.evaluatorToSplits) {
            if (this.evaluatorToSplits.containsKey(str2)) {
                LOG.log(Level.FINE, "Found an already allocated partition");
                LOG.log(Level.FINE, this.evaluatorToSplits.toString());
                return this.evaluatorToSplits.get(str2);
            }
            LOG.log(Level.FINE, "allocated partition not found");
            if (this.locationToSplits.containsKey(str)) {
                LOG.log(Level.FINE, "Found partitions possibly hosted for " + str2 + " at " + str);
                NumberedSplit<V> allocateSplit = allocateSplit(str2, this.locationToSplits.get(str));
                LOG.log(Level.FINE, this.evaluatorToSplits.toString());
                if (allocateSplit != null) {
                    return allocateSplit;
                }
            }
            LOG.log(Level.FINE, str + " does not host any partitions or someone else took partitions hosted here. Picking a random one");
            NumberedSplit<V> allocateSplit2 = allocateSplit(str2, this.unallocatedSplits);
            LOG.log(Level.FINE, this.evaluatorToSplits.toString());
            if (allocateSplit2 != null) {
                return allocateSplit2;
            }
            throw new RuntimeException("Unable to find an input partition to evaluator " + str2);
        }
    }

    private NumberedSplit<V> allocateSplit(String str, BlockingQueue<NumberedSplit<V>> blockingQueue) {
        NumberedSplit<V> poll;
        if (blockingQueue == null) {
            LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
            return null;
        }
        do {
            poll = blockingQueue.poll();
            if (poll != null) {
                if (blockingQueue == this.unallocatedSplits) {
                    break;
                }
            } else {
                return null;
            }
        } while (!this.unallocatedSplits.remove(poll));
        LOG.log(Level.FINE, "Found split-" + poll.getIndex() + " in the queue");
        if (this.evaluatorToSplits.putIfAbsent(str, poll) != null) {
            LOG.severe("Trying to assign different partitions to the same evaluator is not supported");
            throw new RuntimeException("Trying to assign different partitions to the same evaluator is not supported");
        }
        LOG.log(Level.FINE, "Returning " + poll.getIndex());
        return poll;
    }
}
