package org.apache.reef.io.data.loading.impl;

import java.util.Random;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.reef.annotations.audience.DriverSide;
import org.apache.reef.driver.context.ActiveContext;
import org.apache.reef.driver.context.ContextConfiguration;
import org.apache.reef.driver.context.ServiceConfiguration;
import org.apache.reef.driver.evaluator.AllocatedEvaluator;
import org.apache.reef.io.data.loading.api.DataLoadingRequestBuilder;
import org.apache.reef.io.data.loading.api.DataLoadingService;
import org.apache.reef.io.data.loading.api.DataSet;
import org.apache.reef.io.data.loading.api.EvaluatorToPartitionStrategy;
import org.apache.reef.io.data.loading.impl.InputSplitExternalConstructor;
import org.apache.reef.io.data.loading.impl.JobConfExternalConstructor;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Tang;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.exceptions.BindException;
import org.apache.reef.tang.formats.Param;
import org.apache.reef.wake.WakeParameters;

@DriverSide
/* loaded from: input_file:org/apache/reef/io/data/loading/impl/InputFormatLoadingService.class */
public class InputFormatLoadingService<K, V> implements DataLoadingService {
    private static final String DATA_LOAD_CONTEXT_PREFIX = "DataLoadContext-";
    private final EvaluatorToPartitionStrategy<InputSplit> evaluatorToPartitionStrategy;
    private final boolean inMemory;
    private final String inputFormatClass;
    private static final Logger LOG = Logger.getLogger(InputFormatLoadingService.class.getName());
    private static final String COMPUTE_CONTEXT_PREFIX = "ComputeContext-" + new Random(3381).nextInt(WakeParameters.MAX_FRAME_LENGTH) + HelpFormatter.DEFAULT_OPT_PREFIX;

    @Inject
    public InputFormatLoadingService(EvaluatorToPartitionStrategy<InputSplit> evaluatorToPartitionStrategy, @Parameter(DataLoadingRequestBuilder.LoadDataIntoMemory.class) boolean z, @Parameter(JobConfExternalConstructor.InputFormatClass.class) String str) {
        this.inMemory = z;
        this.inputFormatClass = str;
        this.evaluatorToPartitionStrategy = evaluatorToPartitionStrategy;
    }

    @Override // org.apache.reef.io.data.loading.api.DataLoadingService
    public int getNumberOfPartitions() {
        return this.evaluatorToPartitionStrategy.getNumberOfSplits();
    }

    @Override // org.apache.reef.io.data.loading.api.DataLoadingService
    public Configuration getContextConfiguration(AllocatedEvaluator allocatedEvaluator) {
        return ContextConfiguration.CONF.set(ContextConfiguration.IDENTIFIER, DATA_LOAD_CONTEXT_PREFIX + this.evaluatorToPartitionStrategy.getInputSplit(allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(), allocatedEvaluator.getId()).getIndex()).build();
    }

    @Override // org.apache.reef.io.data.loading.api.DataLoadingService
    public Configuration getServiceConfiguration(AllocatedEvaluator allocatedEvaluator) {
        try {
            NumberedSplit<InputSplit> inputSplit = this.evaluatorToPartitionStrategy.getInputSplit(allocatedEvaluator.getEvaluatorDescriptor().getNodeDescriptor(), allocatedEvaluator.getId());
            return Tang.Factory.getTang().newConfigurationBuilder(ServiceConfiguration.CONF.set((Param) ServiceConfiguration.SERVICES, (Class) (this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class)).build()).bindImplementation(DataSet.class, this.inMemory ? InMemoryInputFormatDataSet.class : InputFormatDataSet.class).bindNamedParameter(JobConfExternalConstructor.InputFormatClass.class, this.inputFormatClass).bindNamedParameter(JobConfExternalConstructor.InputPath.class, inputSplit.getPath()).bindNamedParameter(InputSplitExternalConstructor.SerializedInputSplit.class, WritableSerializer.serialize(inputSplit.getEntry())).bindConstructor(InputSplit.class, InputSplitExternalConstructor.class).bindConstructor(JobConf.class, JobConfExternalConstructor.class).build();
        } catch (BindException e) {
            String str = "Unable to create configuration for evaluator " + allocatedEvaluator.getId();
            LOG.log(Level.WARNING, str, (Throwable) e);
            throw new RuntimeException(str, e);
        }
    }

    @Override // org.apache.reef.io.data.loading.api.DataLoadingService
    public String getComputeContextIdPrefix() {
        return COMPUTE_CONTEXT_PREFIX;
    }

    @Override // org.apache.reef.io.data.loading.api.DataLoadingService
    public boolean isComputeContext(ActiveContext activeContext) {
        return activeContext.getId().startsWith(COMPUTE_CONTEXT_PREFIX);
    }

    @Override // org.apache.reef.io.data.loading.api.DataLoadingService
    public boolean isDataLoadedContext(ActiveContext activeContext) {
        return activeContext.getId().startsWith(DATA_LOAD_CONTEXT_PREFIX);
    }
}
