package org.apache.giraph.worker;

import com.yammer.metrics.core.Counter;
import com.yammer.metrics.core.Meter;
import com.yammer.metrics.util.PercentGauge;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.concurrent.Callable;
import org.apache.giraph.comm.WorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphMetricsRegistry;
import org.apache.giraph.metrics.MeterDesc;
import org.apache.giraph.metrics.MetricNames;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.time.Times;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.util.MinimalPrettyPrinter;

/* loaded from: input_file:org/apache/giraph/worker/InputSplitsCallable.class */
public abstract class InputSplitsCallable<I extends WritableComparable, V extends Writable, E extends Writable> implements Callable<VertexEdgeCount> {
    private static final Logger LOG = Logger.getLogger(InputSplitsCallable.class);
    private static final Time TIME = SystemTime.get();
    protected final ImmutableClassesGiraphConfiguration<I, V, E> configuration;
    protected final Mapper<?, ?, ?, ?>.Context context;
    protected final WorkerClientRequestProcessor<I, V, E> workerClientRequestProcessor;
    private final InputSplitsHandler splitsHandler;
    private final ZooKeeperExt zooKeeperExt;
    private final long startNanos = TIME.getNanoseconds();
    private final boolean useLocality;

    public InputSplitsCallable(Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration<I, V, E> immutableClassesGiraphConfiguration, BspServiceWorker<I, V, E> bspServiceWorker, InputSplitsHandler inputSplitsHandler, ZooKeeperExt zooKeeperExt) {
        this.zooKeeperExt = zooKeeperExt;
        this.context = context;
        this.workerClientRequestProcessor = new NettyWorkerClientRequestProcessor(context, immutableClassesGiraphConfiguration, bspServiceWorker);
        this.useLocality = immutableClassesGiraphConfiguration.useInputSplitLocality();
        this.splitsHandler = inputSplitsHandler;
        this.configuration = immutableClassesGiraphConfiguration;
    }

    public abstract GiraphInputFormat getInputFormat();

    public static Meter getTotalEdgesLoadedMeter() {
        return GiraphMetrics.get().perJobRequired().getMeter(MeterDesc.EDGES_LOADED);
    }

    public static Counter getTotalEdgesFilteredCounter() {
        return GiraphMetrics.get().perJobRequired().getCounter(MetricNames.EDGES_FILTERED);
    }

    public static Meter getTotalVerticesLoadedMeter() {
        return GiraphMetrics.get().perJobRequired().getMeter(MeterDesc.VERTICES_LOADED);
    }

    public static Counter getTotalVerticesFilteredCounter() {
        return GiraphMetrics.get().perJobRequired().getCounter(MetricNames.VERTICES_FILTERED);
    }

    public static void initMetrics() {
        GiraphMetricsRegistry perJobRequired = GiraphMetrics.get().perJobRequired();
        final Counter totalEdgesFilteredCounter = getTotalEdgesFilteredCounter();
        final Meter totalEdgesLoadedMeter = getTotalEdgesLoadedMeter();
        perJobRequired.getGauge(MetricNames.EDGES_FILTERED_PCT, new PercentGauge() { // from class: org.apache.giraph.worker.InputSplitsCallable.1
            @Override // com.yammer.metrics.util.RatioGauge
            protected double getNumerator() {
                return Counter.this.count();
            }

            @Override // com.yammer.metrics.util.RatioGauge
            protected double getDenominator() {
                return totalEdgesLoadedMeter.count();
            }
        });
        final Counter totalVerticesFilteredCounter = getTotalVerticesFilteredCounter();
        final Meter totalVerticesLoadedMeter = getTotalVerticesLoadedMeter();
        perJobRequired.getGauge(MetricNames.VERTICES_FILTERED_PCT, new PercentGauge() { // from class: org.apache.giraph.worker.InputSplitsCallable.2
            @Override // com.yammer.metrics.util.RatioGauge
            protected double getNumerator() {
                return Counter.this.count();
            }

            @Override // com.yammer.metrics.util.RatioGauge
            protected double getDenominator() {
                return totalVerticesLoadedMeter.count();
            }
        });
    }

    protected abstract VertexEdgeCount readInputSplit(InputSplit inputSplit) throws IOException, InterruptedException;

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public VertexEdgeCount call() {
        VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
        int i = 0;
        while (true) {
            try {
                String reserveInputSplit = this.splitsHandler.reserveInputSplit();
                if (reserveInputSplit == null) {
                    break;
                }
                vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(loadInputSplit(reserveInputSplit));
                this.context.progress();
                i++;
            } catch (IOException e) {
                throw new IllegalStateException("call: IOException", e);
            } catch (ClassNotFoundException e2) {
                throw new IllegalStateException("call: ClassNotFoundException", e2);
            } catch (IllegalAccessException e3) {
                throw new IllegalStateException("call: IllegalAccessException", e3);
            } catch (InstantiationException e4) {
                throw new IllegalStateException("call: InstantiationException", e4);
            } catch (InterruptedException e5) {
                throw new IllegalStateException("call: InterruptedException", e5);
            } catch (KeeperException e6) {
                throw new IllegalStateException("call: KeeperException", e6);
            }
        }
        if (LOG.isInfoEnabled()) {
            float nanosSince = ((float) Times.getNanosSince(TIME, this.startNanos)) / 1.0E9f;
            LOG.info("call: Loaded " + i + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + "input splits in " + nanosSince + " secs, " + vertexEdgeCount + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + (((float) vertexEdgeCount.getVertexCount()) / nanosSince) + " vertices/sec, " + (((float) vertexEdgeCount.getEdgeCount()) / nanosSince) + " edges/sec");
        }
        try {
            this.workerClientRequestProcessor.flush();
            return vertexEdgeCount;
        } catch (IOException e7) {
            throw new IllegalStateException("call: Flushing failed.", e7);
        }
    }

    private VertexEdgeCount loadInputSplit(String str) throws IOException, ClassNotFoundException, InterruptedException, InstantiationException, IllegalAccessException {
        VertexEdgeCount readInputSplit = readInputSplit(getInputSplit(str));
        if (LOG.isInfoEnabled()) {
            LOG.info("loadFromInputSplit: Finished loading " + str + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + readInputSplit);
        }
        this.splitsHandler.markInputSplitPathFinished(str);
        return readInputSplit;
    }

    protected InputSplit getInputSplit(String str) throws IOException, ClassNotFoundException {
        try {
            byte[] data = this.zooKeeperExt.getData(str, false, (Stat) null);
            this.context.progress();
            DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(data));
            if (this.useLocality) {
                Text.readString(dataInputStream);
            }
            InputSplit readInputSplit = getInputFormat().readInputSplit(dataInputStream);
            if (LOG.isInfoEnabled()) {
                LOG.info("getInputSplit: Reserved " + str + " from ZooKeeper and got input split '" + readInputSplit.toString() + "'");
            }
            return readInputSplit;
        } catch (InterruptedException e) {
            throw new IllegalStateException("getInputSplit: IllegalStateException on " + str, e);
        } catch (KeeperException e2) {
            throw new IllegalStateException("getInputSplit: KeeperException on " + str, e2);
        }
    }
}
