package org.apache.reef.io.data.loading.api;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.driver.evaluator.EvaluatorRequest;
import org.apache.reef.driver.evaluator.EvaluatorRequestor;
import org.apache.reef.driver.evaluator.FailedEvaluator;
import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
import org.apache.reef.io.data.loading.impl.EvaluatorRequestSerializer;
import org.apache.reef.io.network.util.Pair;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.annotations.Unit;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.wake.EventHandler;
import org.apache.reef.wake.impl.SingleThreadStage;
import org.apache.reef.wake.time.Clock;
import org.apache.reef.wake.time.event.Alarm;
import org.apache.reef.wake.time.event.StartTime;

@DriverSide
@Unit
/* loaded from: input_file:org/apache/reef/io/data/loading/api/DataLoader.class */
public class DataLoader {
    private static final Logger LOG = Logger.getLogger(DataLoader.class.getName());
    private final ConcurrentMap<String, Pair<Configuration, Configuration>> submittedDataEvalConfigs = new ConcurrentHashMap();
    private final ConcurrentMap<String, Configuration> submittedComputeEvalConfigs = new ConcurrentHashMap();
    private final BlockingQueue<Configuration> failedComputeEvalConfigs = new LinkedBlockingQueue();
    private final BlockingQueue<Pair<Configuration, Configuration>> failedDataEvalConfigs = new LinkedBlockingQueue();
    private final AtomicInteger numComputeRequestsToSubmit = new AtomicInteger(0);
    private final DataLoadingService dataLoadingService;
    private final int dataEvalMemoryMB;
    private final int dataEvalCore;
    private final EvaluatorRequest computeRequest;
    private final SingleThreadStage<EvaluatorRequest> resourceRequestStage;
    private final ResourceRequestHandler resourceRequestHandler;
    private final int computeEvalMemoryMB;
    private final int computeEvalCore;
    private final EvaluatorRequestor requestor;

    /* loaded from: input_file:org/apache/reef/io/data/loading/api/DataLoader$EvaluatorAllocatedHandler.class */
    public class EvaluatorAllocatedHandler implements EventHandler<AllocatedEvaluator> {
        public EvaluatorAllocatedHandler() {
        }

        public void onNext(AllocatedEvaluator allocatedEvaluator) {
            String id = allocatedEvaluator.getId();
            DataLoader.LOG.log(Level.FINEST, "Allocated evaluator: {0}", id);
            if (!DataLoader.this.failedComputeEvalConfigs.isEmpty()) {
                DataLoader.LOG.log(Level.FINE, "Failed Compute requests need to be satisfied for {0}", id);
                Configuration configuration = (Configuration) DataLoader.this.failedComputeEvalConfigs.poll();
                if (configuration != null) {
                    DataLoader.LOG.log(Level.FINE, "Satisfying failed configuration for {0}", id);
                    allocatedEvaluator.submitContext(configuration);
                    DataLoader.this.submittedComputeEvalConfigs.put(id, configuration);
                    return;
                }
            }
            if (!DataLoader.this.failedDataEvalConfigs.isEmpty()) {
                DataLoader.LOG.log(Level.FINE, "Failed Data requests need to be satisfied for {0}", id);
                Pair pair = (Pair) DataLoader.this.failedDataEvalConfigs.poll();
                if (pair != null) {
                    DataLoader.LOG.log(Level.FINE, "Satisfying failed configuration for {0}", id);
                    allocatedEvaluator.submitContextAndService((Configuration) pair.first, (Configuration) pair.second);
                    DataLoader.this.submittedDataEvalConfigs.put(id, pair);
                    return;
                }
            }
            int decrementAndGet = DataLoader.this.numComputeRequestsToSubmit.decrementAndGet();
            DataLoader.LOG.log(Level.FINE, "Evaluators for compute request: {0}", Integer.valueOf(decrementAndGet));
            if (decrementAndGet < 0) {
                Pair pair2 = new Pair(DataLoader.this.dataLoadingService.getContextConfiguration(allocatedEvaluator), DataLoader.this.dataLoadingService.getServiceConfiguration(allocatedEvaluator));
                DataLoader.LOG.log(Level.FINE, "Submitting data loading context to {0}", id);
                allocatedEvaluator.submitContextAndService((Configuration) pair2.first, (Configuration) pair2.second);
                DataLoader.this.submittedDataEvalConfigs.put(allocatedEvaluator.getId(), pair2);
                return;
            }
            try {
                Configuration build = ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, DataLoader.this.dataLoadingService.getComputeContextIdPrefix() + decrementAndGet).build();
                DataLoader.LOG.log(Level.FINE, "Submitting Compute Context to {0}", id);
                allocatedEvaluator.submitContext(build);
                DataLoader.this.submittedComputeEvalConfigs.put(allocatedEvaluator.getId(), build);
                if (decrementAndGet == 0) {
                    DataLoader.LOG.log(Level.FINE, "All Compute requests satisfied. Releasing gate");
                    DataLoader.this.resourceRequestHandler.releaseResourceRequestGate();
                }
            } catch (BindException e) {
                throw new RuntimeException("Unable to bind context id for Compute request", e);
            }
        }
    }

    /* loaded from: input_file:org/apache/reef/io/data/loading/api/DataLoader$EvaluatorFailedHandler.class */
    public class EvaluatorFailedHandler implements EventHandler<FailedEvaluator> {
        public EvaluatorFailedHandler() {
        }

        public void onNext(FailedEvaluator failedEvaluator) {
            String id = failedEvaluator.getId();
            Configuration configuration = (Configuration) DataLoader.this.submittedComputeEvalConfigs.remove(id);
            if (configuration != null) {
                DataLoader.LOG.log(Level.INFO, "Received failed compute evaluator: {0}", id);
                DataLoader.this.failedComputeEvalConfigs.add(configuration);
                DataLoader.this.requestor.submit(EvaluatorRequest.newBuilder().setMemory(DataLoader.this.computeEvalMemoryMB).setNumber(1).setNumberOfCores(DataLoader.this.computeEvalCore).build());
                return;
            }
            Pair pair = (Pair) DataLoader.this.submittedDataEvalConfigs.remove(id);
            if (pair == null) {
                DataLoader.LOG.log(Level.SEVERE, "Received unknown failed evaluator " + id, (Throwable) failedEvaluator.getEvaluatorException());
                throw new RuntimeException("Received failed evaluator that I did not submit: " + id);
            }
            DataLoader.LOG.log(Level.INFO, "Received failed data evaluator: {0}", id);
            DataLoader.this.failedDataEvalConfigs.add(pair);
            DataLoader.this.requestor.submit(EvaluatorRequest.newBuilder().setMemory(DataLoader.this.dataEvalMemoryMB).setNumber(1).setNumberOfCores(DataLoader.this.dataEvalCore).build());
        }
    }

    /* loaded from: input_file:org/apache/reef/io/data/loading/api/DataLoader$StartHandler.class */
    public class StartHandler implements EventHandler<StartTime> {
        public StartHandler() {
        }

        public void onNext(StartTime startTime) {
            DataLoader.LOG.log(Level.INFO, "StartTime: {0}", startTime);
            DataLoader.this.resourceRequestHandler.releaseResourceRequestGate();
        }
    }

    @Inject
    public DataLoader(Clock clock, EvaluatorRequestor evaluatorRequestor, DataLoadingService dataLoadingService, @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorMemoryMB.class) int i, @Parameter(DataLoadingRequestBuilder.DataLoadingEvaluatorNumberOfCores.class) int i2, @Parameter(DataLoadingRequestBuilder.DataLoadingComputeRequest.class) String str) {
        clock.scheduleAlarm(30000, new EventHandler<Alarm>() { // from class: org.apache.reef.io.data.loading.api.DataLoader.1
            public void onNext(Alarm alarm) {
                DataLoader.LOG.log(Level.FINE, "Received Alarm: {0}", alarm);
            }
        });
        this.requestor = evaluatorRequestor;
        this.dataLoadingService = dataLoadingService;
        this.dataEvalMemoryMB = i;
        this.dataEvalCore = i2;
        this.resourceRequestHandler = new ResourceRequestHandler(evaluatorRequestor);
        this.resourceRequestStage = new SingleThreadStage<>(this.resourceRequestHandler, 2);
        if (str.equals("NULL")) {
            this.computeRequest = null;
            this.computeEvalMemoryMB = -1;
            this.computeEvalCore = 1;
        } else {
            this.computeRequest = EvaluatorRequestSerializer.deserialize(str);
            this.computeEvalMemoryMB = this.computeRequest.getMegaBytes();
            this.computeEvalCore = this.computeRequest.getNumberOfCores();
            this.numComputeRequestsToSubmit.set(this.computeRequest.getNumber());
            this.resourceRequestStage.onNext(this.computeRequest);
        }
        this.resourceRequestStage.onNext(getDataLoadingRequest());
    }

    private EvaluatorRequest getDataLoadingRequest() {
        return EvaluatorRequest.newBuilder().setNumber(this.dataLoadingService.getNumberOfPartitions()).setMemory(this.dataEvalMemoryMB).setNumberOfCores(this.dataEvalCore).build();
    }
}
