package org.apache.reef.io.data.loading.impl;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.lang.Validate;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.reef.annotations.Unstable;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.catalog.NodeDescriptor;
import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;

@DriverSide
@Unstable
/* loaded from: input_file:org/apache/reef/io/data/loading/impl/AbstractEvaluatorToPartitionStrategy.class */
public abstract class AbstractEvaluatorToPartitionStrategy implements EvaluatorToPartitionStrategy<InputSplit> {
    private static final Logger LOG = Logger.getLogger(AbstractEvaluatorToPartitionStrategy.class.getName());
    protected final ConcurrentMap<String, BlockingQueue<NumberedSplit<InputSplit>>> locationToSplits;
    protected final ConcurrentMap<String, NumberedSplit<InputSplit>> evaluatorToSplits;
    protected final BlockingQueue<NumberedSplit<InputSplit>> unallocatedSplits;
    private int totalNumberOfSplits;

    /* JADX INFO: Access modifiers changed from: package-private */
    public AbstractEvaluatorToPartitionStrategy(String str, Set<String> set) {
        LOG.fine("AbstractEvaluatorToPartitionStrategy injected");
        Validate.notEmpty(str);
        Validate.notEmpty(set);
        this.locationToSplits = new ConcurrentHashMap();
        this.evaluatorToSplits = new ConcurrentHashMap();
        this.unallocatedSplits = new LinkedBlockingQueue();
        setUp();
        HashMap hashMap = new HashMap();
        Iterator<String> it = set.iterator();
        while (it.hasNext()) {
            DistributedDataSetPartition deserialize = DistributedDataSetPartitionSerializer.deserialize(it.next());
            try {
                JobConf jobConf = (JobConf) new JobConfExternalConstructor(str, deserialize.getPath()).newInstance();
                InputSplit[] splits = jobConf.getInputFormat().getSplits(jobConf, deserialize.getDesiredSplits());
                if (LOG.isLoggable(Level.FINEST)) {
                    LOG.log(Level.FINEST, "Splits for partition: {0} {1}", new Object[]{deserialize, Arrays.toString(splits)});
                }
                this.totalNumberOfSplits += splits.length;
                hashMap.put(deserialize, splits);
            } catch (IOException e) {
                throw new RuntimeException("Unable to get InputSplits using the specified InputFormat", e);
            }
        }
        init(hashMap);
        LOG.log(Level.FINE, "Total Number of splits: {0}", Integer.valueOf(this.totalNumberOfSplits));
    }

    private void init(Map<DistributedDataSetPartition, InputSplit[]> map) {
        Pair<InputSplit[], DistributedDataSetPartition[]> splitsAndPartitions = getSplitsAndPartitions(map);
        InputSplit[] inputSplitArr = (InputSplit[]) splitsAndPartitions.getFirst();
        DistributedDataSetPartition[] distributedDataSetPartitionArr = (DistributedDataSetPartition[]) splitsAndPartitions.getSecond();
        Validate.isTrue(inputSplitArr.length == distributedDataSetPartitionArr.length);
        for (int i = 0; i < inputSplitArr.length; i++) {
            LOG.log(Level.FINE, "Processing split: " + i);
            NumberedSplit<InputSplit> numberedSplit = new NumberedSplit<>(inputSplitArr[i], i, distributedDataSetPartitionArr[i]);
            this.unallocatedSplits.add(numberedSplit);
            updateLocations(numberedSplit);
        }
        if (LOG.isLoggable(Level.FINE)) {
            for (Map.Entry<String, BlockingQueue<NumberedSplit<InputSplit>>> entry : this.locationToSplits.entrySet()) {
                LOG.log(Level.FINE, entry.getKey() + ": " + entry.getValue().toString());
            }
        }
    }

    protected abstract void updateLocations(NumberedSplit<InputSplit> numberedSplit);

    protected abstract NumberedSplit<InputSplit> tryAllocate(NodeDescriptor nodeDescriptor, String str);

    protected void setUp() {
    }

    @Override // org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy
    public NumberedSplit<InputSplit> getInputSplit(NodeDescriptor nodeDescriptor, String str) {
        synchronized (this.evaluatorToSplits) {
            if (this.evaluatorToSplits.containsKey(str)) {
                LOG.log(Level.FINE, "Found an already allocated split, {0}", this.evaluatorToSplits.toString());
                return this.evaluatorToSplits.get(str);
            }
            String name = nodeDescriptor.getName();
            LOG.log(Level.FINE, "Allocated split not found, trying on {0}", name);
            if (this.locationToSplits.containsKey(name)) {
                LOG.log(Level.FINE, "Found splits possibly hosted for {0} at {1}", new Object[]{str, name});
                NumberedSplit<InputSplit> allocateSplit = allocateSplit(str, this.locationToSplits.get(name));
                if (allocateSplit != null) {
                    return allocateSplit;
                }
            }
            LOG.log(Level.FINE, "{0} does not host any splits or someone else took splits hosted here. Picking other ones", name);
            NumberedSplit<InputSplit> tryAllocate = tryAllocate(nodeDescriptor, str);
            if (tryAllocate == null) {
                throw new RuntimeException("Unable to find an input split to evaluator " + str);
            }
            LOG.log(Level.FINE, this.evaluatorToSplits.toString());
            return tryAllocate;
        }
    }

    @Override // org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy
    public int getNumberOfSplits() {
        return this.totalNumberOfSplits;
    }

    private Pair<InputSplit[], DistributedDataSetPartition[]> getSplitsAndPartitions(Map<DistributedDataSetPartition, InputSplit[]> map) {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (Map.Entry<DistributedDataSetPartition, InputSplit[]> entry : map.entrySet()) {
            DistributedDataSetPartition key = entry.getKey();
            for (InputSplit inputSplit : entry.getValue()) {
                arrayList.add(inputSplit);
                arrayList2.add(key);
            }
        }
        return new Pair<>(arrayList.toArray(new InputSplit[arrayList.size()]), arrayList2.toArray(new DistributedDataSetPartition[arrayList2.size()]));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public NumberedSplit<InputSplit> allocateSplit(String str, BlockingQueue<NumberedSplit<InputSplit>> blockingQueue) {
        NumberedSplit<InputSplit> poll;
        if (blockingQueue == null) {
            LOG.log(Level.FINE, "Queue of splits can't be empty. Returning null");
            return null;
        }
        do {
            poll = blockingQueue.poll();
            if (poll != null) {
                if (blockingQueue == this.unallocatedSplits) {
                    break;
                }
            } else {
                return null;
            }
        } while (!this.unallocatedSplits.remove(poll));
        LOG.log(Level.FINE, "Found split-" + poll.getIndex() + " in the queue");
        if (this.evaluatorToSplits.putIfAbsent(str, poll) != null) {
            throw new RuntimeException("Trying to assign different splits to the same evaluator is not supported");
        }
        LOG.log(Level.FINE, "Returning " + poll.getIndex());
        return poll;
    }
}
