package org.apache.giraph.worker;

import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import net.iharder.Base64;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceWorker;
import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.comm.ServerData;
import org.apache.giraph.comm.WorkerClient;
import org.apache.giraph.comm.WorkerServer;
import org.apache.giraph.comm.aggregators.WorkerAggregatorRequestProcessor;
import org.apache.giraph.comm.messages.queue.AsyncMessageStoreWrapper;
import org.apache.giraph.comm.netty.NettyWorkerAggregatorRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerClient;
import org.apache.giraph.comm.netty.NettyWorkerClientRequestProcessor;
import org.apache.giraph.comm.netty.NettyWorkerServer;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.FinishedSuperstepStats;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.graph.VertexEdgeCount;
import org.apache.giraph.io.EdgeWriter;
import org.apache.giraph.io.VertexWriter;
import org.apache.giraph.io.internal.WrappedEdgeOutputFormat;
import org.apache.giraph.io.internal.WrappedVertexOutputFormat;
import org.apache.giraph.io.superstep_output.SuperstepOutput;
import org.apache.giraph.mapping.translate.TranslateEdge;
import org.apache.giraph.master.MasterInfo;
import org.apache.giraph.master.SuperstepClasses;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphTimer;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.WorkerSuperstepMetrics;
import org.apache.giraph.partition.Partition;
import org.apache.giraph.partition.PartitionExchange;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionStore;
import org.apache.giraph.partition.WorkerGraphPartitioner;
import org.apache.giraph.utils.CallableFactory;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.utils.LoggerUtils;
import org.apache.giraph.utils.MemoryUtils;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.json.JSONArray;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: input_file:org/apache/giraph/worker/BspServiceWorker.class */
public class BspServiceWorker<I extends WritableComparable, V extends Writable, E extends Writable> extends BspService<I, V, E> implements CentralizedServiceWorker<I, V, E>, ResetSuperstepMetricsObserver {
    public static final String TIMER_WAIT_REQUESTS = "wait-requests-us";
    private static final Logger LOG = Logger.getLogger(BspServiceWorker.class);
    private String myHealthZnode;
    private final WorkerInfo workerInfo;
    private final WorkerGraphPartitioner<I, V, E> workerGraphPartitioner;
    private final LocalData<I, V, E, ? extends Writable> localData;
    private final TranslateEdge<I, E> translateEdge;
    private final WorkerClient<I, V, E> workerClient;
    private final WorkerServer<I, V, E> workerServer;
    private final WorkerAggregatorRequestProcessor workerAggregatorRequestProcessor;
    private MasterInfo masterInfo;
    private List<WorkerInfo> workerInfoList;
    private final BspEvent partitionExchangeChildrenChanged;
    private final WorkerContext workerContext;
    private final WorkerAggregatorHandler globalCommHandler;
    private final SuperstepOutput<I, V, E> superstepOutput;
    private final WorkerObserver[] observers;
    private final WorkerProgressWriter workerProgressWriter;
    private GiraphTimer wcPostSuperstepTimer;
    private GiraphTimer waitRequestsTimer;

    public BspServiceWorker(Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) throws IOException, InterruptedException {
        super(context, graphTaskManager);
        this.masterInfo = new MasterInfo();
        this.workerInfoList = Lists.newArrayList();
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        this.localData = new LocalData<>(configuration);
        this.translateEdge = getConfiguration().edgeTranslationInstance();
        if (this.translateEdge != null) {
            this.translateEdge.initialize(this);
        }
        this.partitionExchangeChildrenChanged = new PredicateLock(context);
        registerBspEvent(this.partitionExchangeChildrenChanged);
        this.workerGraphPartitioner = getGraphPartitionerFactory().createWorkerGraphPartitioner();
        this.workerInfo = new WorkerInfo();
        this.workerServer = new NettyWorkerServer(configuration, this, context, graphTaskManager.createUncaughtExceptionHandler());
        this.workerInfo.setInetSocketAddress(this.workerServer.getMyAddress());
        this.workerInfo.setTaskId(getTaskPartition());
        this.workerClient = new NettyWorkerClient(context, configuration, this, graphTaskManager.createUncaughtExceptionHandler());
        this.workerAggregatorRequestProcessor = new NettyWorkerAggregatorRequestProcessor(getContext(), configuration, this);
        this.globalCommHandler = new WorkerAggregatorHandler(this, configuration, context);
        this.workerContext = configuration.createWorkerContext();
        this.workerContext.setWorkerGlobalCommUsage(this.globalCommHandler);
        this.superstepOutput = configuration.createSuperstepOutput(context);
        if (configuration.isJMapHistogramDumpEnabled()) {
            configuration.addWorkerObserverClass(JMapHistoDumper.class);
        }
        if (configuration.isReactiveJmapHistogramDumpEnabled()) {
            configuration.addWorkerObserverClass(ReactiveJMapHistoDumper.class);
        }
        this.observers = configuration.createWorkerObservers();
        WorkerProgress.get().setTaskId(getTaskPartition());
        this.workerProgressWriter = configuration.trackJobProgressOnClient() ? new WorkerProgressWriter(graphTaskManager.getJobProgressTracker()) : null;
        GiraphMetrics.get().addSuperstepResetObserver(this);
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        this.waitRequestsTimer = new GiraphTimer(superstepMetricsRegistry, TIMER_WAIT_REQUESTS, TimeUnit.MICROSECONDS);
        this.wcPostSuperstepTimer = new GiraphTimer(superstepMetricsRegistry, "worker-context-post-superstep", TimeUnit.MICROSECONDS);
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public WorkerContext getWorkerContext() {
        return this.workerContext;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public WorkerObserver[] getWorkerObservers() {
        return this.observers;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public WorkerClient<I, V, E> getWorkerClient() {
        return this.workerClient;
    }

    public LocalData<I, V, E, ? extends Writable> getLocalData() {
        return this.localData;
    }

    public TranslateEdge<I, E> getTranslateEdge() {
        return this.translateEdge;
    }

    public boolean isHealthy() {
        return true;
    }

    private VertexEdgeCount loadInputSplits(List<String> list, CallableFactory<VertexEdgeCount> callableFactory) throws KeeperException, InterruptedException {
        VertexEdgeCount vertexEdgeCount = new VertexEdgeCount();
        int min = Math.min(getConfiguration().getNumInputSplitsThreads(), ((list.size() - 1) / getConfiguration().getMaxWorkers()) + 1);
        if (LOG.isInfoEnabled()) {
            LOG.info("loadInputSplits: Using " + min + " thread(s), originally " + getConfiguration().getNumInputSplitsThreads() + " threads(s) for " + list.size() + " total splits.");
        }
        Iterator it2 = ProgressableUtils.getResultsWithNCallables(callableFactory, min, "load-%d", getContext()).iterator();
        while (it2.hasNext()) {
            vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount((VertexEdgeCount) it2.next());
        }
        this.workerClient.waitAllRequests();
        return vertexEdgeCount;
    }

    private long loadMapping() throws KeeperException, InterruptedException {
        List<String> childrenExt = getZkExt().getChildrenExt(this.mappingInputSplitsPaths.getPath(), false, false, true);
        MappingInputSplitsCallableFactory mappingInputSplitsCallableFactory = new MappingInputSplitsCallableFactory(getConfiguration().createWrappedMappingInputFormat(), new InputSplitPathOrganizer(getZkExt(), childrenExt, getWorkerInfo().getHostname(), getConfiguration().useInputSplitLocality()), getContext(), getConfiguration(), this, getZkExt());
        long j = 0;
        int min = Math.min(getConfiguration().getNumInputSplitsThreads(), childrenExt.size());
        if (LOG.isInfoEnabled()) {
            LOG.info("loadInputSplits: Using " + min + " thread(s), originally " + getConfiguration().getNumInputSplitsThreads() + " threads(s) for " + childrenExt.size() + " total splits.");
        }
        while (ProgressableUtils.getResultsWithNCallables(mappingInputSplitsCallableFactory, min, "load-mapping-%d", getContext()).iterator().hasNext()) {
            j += ((Integer) r0.next()).intValue();
        }
        this.localData.getMappingStore().postFilling();
        return j;
    }

    private VertexEdgeCount loadVertices() throws KeeperException, InterruptedException {
        List<String> childrenExt = getZkExt().getChildrenExt(this.vertexInputSplitsPaths.getPath(), false, false, true);
        return loadInputSplits(childrenExt, new VertexInputSplitsCallableFactory(getConfiguration().createWrappedVertexInputFormat(), getContext(), getConfiguration(), this, new InputSplitsHandler(new InputSplitPathOrganizer(getZkExt(), childrenExt, getWorkerInfo().getHostname(), getConfiguration().useInputSplitLocality()), getZkExt(), getContext(), BspService.VERTEX_INPUT_SPLIT_RESERVED_NODE, BspService.VERTEX_INPUT_SPLIT_FINISHED_NODE), getZkExt()));
    }

    private long loadEdges() throws KeeperException, InterruptedException {
        List<String> childrenExt = getZkExt().getChildrenExt(this.edgeInputSplitsPaths.getPath(), false, false, true);
        return loadInputSplits(childrenExt, new EdgeInputSplitsCallableFactory(getConfiguration().createWrappedEdgeInputFormat(), getContext(), getConfiguration(), this, new InputSplitsHandler(new InputSplitPathOrganizer(getZkExt(), childrenExt, getWorkerInfo().getHostname(), getConfiguration().useInputSplitLocality()), getZkExt(), getContext(), BspService.EDGE_INPUT_SPLIT_RESERVED_NODE, BspService.EDGE_INPUT_SPLIT_FINISHED_NODE), getZkExt())).getEdgeCount();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public MasterInfo getMasterInfo() {
        return this.masterInfo;
    }

    @Override // org.apache.giraph.bsp.CentralizedService
    public List<WorkerInfo> getWorkerInfoList() {
        return this.workerInfoList;
    }

    private void ensureInputSplitsReady(InputSplitPaths inputSplitPaths, InputSplitEvents inputSplitEvents) {
        while (getZkExt().exists(inputSplitPaths.getAllReadyPath(), true) == null) {
            try {
                inputSplitEvents.getAllReadyChanged().waitForever();
                inputSplitEvents.getAllReadyChanged().reset();
            } catch (InterruptedException e) {
                throw new IllegalStateException("ensureInputSplitsReady: InterruptedException waiting on input splits", e);
            } catch (KeeperException e2) {
                throw new IllegalStateException("ensureInputSplitsReady: KeeperException waiting on input splits", e2);
            }
        }
    }

    private void markCurrentWorkerDoneThenWaitForOthers(InputSplitPaths inputSplitPaths, InputSplitEvents inputSplitEvents) {
        try {
            getZkExt().createExt(inputSplitPaths.getDonePath() + "/" + getWorkerInfo().getHostnameId(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            while (getZkExt().exists(inputSplitPaths.getAllDonePath(), true) == null) {
                try {
                    inputSplitEvents.getAllDoneChanged().waitForever();
                    inputSplitEvents.getAllDoneChanged().reset();
                } catch (InterruptedException e) {
                    throw new IllegalStateException("markCurrentWorkerDoneThenWaitForOthers: InterruptedException waiting on worker done splits", e);
                } catch (KeeperException e2) {
                    throw new IllegalStateException("markCurrentWorkerDoneThenWaitForOthers: KeeperException waiting on worker done splits", e2);
                }
            }
        } catch (InterruptedException e3) {
            throw new IllegalStateException("markCurrentWorkerDoneThenWaitForOthers: InterruptedException creating worker done splits", e3);
        } catch (KeeperException e4) {
            throw new IllegalStateException("markCurrentWorkerDoneThenWaitForOthers: KeeperException creating worker done splits", e4);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public FinishedSuperstepStats setup() {
        VertexEdgeCount vertexEdgeCount;
        if (getRestartedSuperstep() != Long.MIN_VALUE) {
            setCachedSuperstep(getRestartedSuperstep());
            return new FinishedSuperstepStats(0L, false, 0L, 0L, true, CheckpointStatus.NONE);
        }
        JSONObject jobState = getJobState();
        if (jobState != null) {
            try {
                if (ApplicationState.valueOf(jobState.getString(BspService.JSONOBJ_STATE_KEY)) == ApplicationState.START_SUPERSTEP && jobState.getLong(BspService.JSONOBJ_SUPERSTEP_KEY) == getSuperstep()) {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("setup: Restarting from an automated checkpointed superstep " + getSuperstep() + ", attempt " + getApplicationAttempt());
                    }
                    setRestartedSuperstep(getSuperstep());
                    return new FinishedSuperstepStats(0L, false, 0L, 0L, true, CheckpointStatus.NONE);
                }
            } catch (JSONException e) {
                throw new RuntimeException("setup: Failed to get key-values from " + jobState.toString(), e);
            }
        }
        Collection<? extends PartitionOwner> startSuperstep = startSuperstep();
        this.workerGraphPartitioner.updatePartitionOwners(getWorkerInfo(), startSuperstep);
        this.workerClient.setup(getConfiguration().authenticate());
        this.globalCommHandler.prepareSuperstep(this.workerAggregatorRequestProcessor);
        if (getConfiguration().hasMappingInputFormat()) {
            ensureInputSplitsReady(this.mappingInputSplitsPaths, this.mappingInputSplitsEvents);
            getContext().progress();
            try {
                long loadMapping = loadMapping();
                getGraphPartitionerFactory().initialize(this.localData);
                getContext().progress();
                if (LOG.isInfoEnabled()) {
                    LOG.info("setup: Finally loaded a total of " + loadMapping + " entries from inputSplits");
                }
                markCurrentWorkerDoneThenWaitForOthers(this.mappingInputSplitsPaths, this.mappingInputSplitsEvents);
                this.localData.printStats();
            } catch (InterruptedException e2) {
                throw new IllegalStateException("setup: loadMapping failed with InterruptedException", e2);
            } catch (KeeperException e3) {
                throw new IllegalStateException("setup: loadMapping failed with KeeperException", e3);
            }
        }
        if (getConfiguration().hasVertexInputFormat()) {
            ensureInputSplitsReady(this.vertexInputSplitsPaths, this.vertexInputSplitsEvents);
            getContext().progress();
            try {
                vertexEdgeCount = loadVertices();
                getContext().progress();
            } catch (InterruptedException e4) {
                throw new IllegalStateException("setup: loadVertices failed with InterruptedException", e4);
            } catch (KeeperException e5) {
                throw new IllegalStateException("setup: loadVertices failed with KeeperException", e5);
            }
        } else {
            vertexEdgeCount = new VertexEdgeCount();
        }
        WorkerProgress.get().finishLoadingVertices();
        if (getConfiguration().hasEdgeInputFormat()) {
            ensureInputSplitsReady(this.edgeInputSplitsPaths, this.edgeInputSplitsEvents);
            getContext().progress();
            try {
                vertexEdgeCount = vertexEdgeCount.incrVertexEdgeCount(0L, loadEdges());
                getContext().progress();
            } catch (InterruptedException e6) {
                throw new IllegalStateException("setup: loadEdges failed with InterruptedException", e6);
            } catch (KeeperException e7) {
                throw new IllegalStateException("setup: loadEdges failed with KeeperException", e7);
            }
        }
        WorkerProgress.get().finishLoadingEdges();
        if (LOG.isInfoEnabled()) {
            LOG.info("setup: Finally loaded a total of " + vertexEdgeCount);
        }
        if (getConfiguration().hasVertexInputFormat()) {
            markCurrentWorkerDoneThenWaitForOthers(this.vertexInputSplitsPaths, this.vertexInputSplitsEvents);
        }
        if (getConfiguration().hasEdgeInputFormat()) {
            markCurrentWorkerDoneThenWaitForOthers(this.edgeInputSplitsPaths, this.edgeInputSplitsEvents);
        }
        for (PartitionOwner partitionOwner : startSuperstep) {
            if (partitionOwner.getWorkerInfo().equals(getWorkerInfo()) && !getPartitionStore().hasPartition(Integer.valueOf(partitionOwner.getPartitionId()))) {
                getPartitionStore().addPartition(getConfiguration().createPartition(partitionOwner.getPartitionId(), getContext()));
            }
        }
        this.localData.removeMappingStoreIfPossible();
        if (getConfiguration().hasEdgeInputFormat()) {
            getServerData().getEdgeStore().moveEdgesToVertices();
        }
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it2 = getPartitionStore().getPartitionIds().iterator();
        while (it2.hasNext()) {
            Partition<I, V, E> orCreatePartition = getPartitionStore().getOrCreatePartition(it2.next());
            arrayList.add(new PartitionStats(orCreatePartition.getId(), orCreatePartition.getVertexCount(), 0L, orCreatePartition.getEdgeCount(), 0L, 0L));
            getPartitionStore().putPartition(orCreatePartition);
        }
        this.workerGraphPartitioner.finalizePartitionStats(arrayList, getPartitionStore());
        return finishSuperstep(arrayList, null);
    }

    private void registerHealth(long j) {
        JSONArray jSONArray = new JSONArray();
        jSONArray.put(getHostname());
        jSONArray.put(this.workerInfo.getPort());
        String str = (isHealthy() ? getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep()) : getWorkerInfoUnhealthyPath(getApplicationAttempt(), getSuperstep())) + "/" + this.workerInfo.getHostnameId();
        try {
            this.myHealthZnode = getZkExt().createExt(str, WritableUtils.writeToByteArray(this.workerInfo), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, true);
            if (LOG.isInfoEnabled()) {
                LOG.info("registerHealth: Created my health node for attempt=" + getApplicationAttempt() + ", superstep=" + getSuperstep() + " with " + this.myHealthZnode + " and workerInfo= " + this.workerInfo);
            }
        } catch (InterruptedException e) {
            throw new IllegalStateException("Creating " + str + " failed with InterruptedException", e);
        } catch (KeeperException.NodeExistsException e2) {
            LOG.warn("registerHealth: myHealthPath already exists (likely from previous failure): " + str + ".  Waiting for change in attempts to re-join the application");
            getApplicationAttemptChangedEvent().waitForever();
            if (LOG.isInfoEnabled()) {
                LOG.info("registerHealth: Got application attempt changed event, killing self");
            }
            throw new IllegalStateException("registerHealth: Trying to get the new application attempt by killing self", e2);
        } catch (KeeperException e3) {
            throw new IllegalStateException("Creating " + str + " failed with KeeperException", e3);
        }
    }

    private void unregisterHealth() {
        LOG.error("unregisterHealth: Got failure, unregistering health on " + this.myHealthZnode + " on superstep " + getSuperstep());
        try {
            getZkExt().deleteExt(this.myHealthZnode, -1, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException("unregisterHealth: InterruptedException - Couldn't delete " + this.myHealthZnode, e);
        } catch (KeeperException e2) {
            throw new IllegalStateException("unregisterHealth: KeeperException - Couldn't delete " + this.myHealthZnode, e2);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public void failureCleanup() {
        unregisterHealth();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public Collection<? extends PartitionOwner> startSuperstep() {
        if (getSuperstep() != -1) {
            this.workerServer.prepareSuperstep();
        }
        registerHealth(getSuperstep());
        String addressesAndPartitionsPath = getAddressesAndPartitionsPath(getApplicationAttempt(), getSuperstep());
        AddressesAndPartitionsWritable addressesAndPartitionsWritable = new AddressesAndPartitionsWritable(this.workerGraphPartitioner.createPartitionOwner().getClass());
        while (getZkExt().exists(addressesAndPartitionsPath, true) == null) {
            try {
                getAddressesAndPartitionsReadyChangedEvent().waitForever();
                getAddressesAndPartitionsReadyChangedEvent().reset();
            } catch (InterruptedException e) {
                throw new IllegalStateException("startSuperstep: InterruptedException getting assignments", e);
            } catch (KeeperException e2) {
                throw new IllegalStateException("startSuperstep: KeeperException getting assignments", e2);
            }
        }
        WritableUtils.readFieldsFromZnode(getZkExt(), addressesAndPartitionsPath, false, null, addressesAndPartitionsWritable);
        this.workerInfoList.clear();
        this.workerInfoList = addressesAndPartitionsWritable.getWorkerInfos();
        this.masterInfo = addressesAndPartitionsWritable.getMasterInfo();
        if (LOG.isInfoEnabled()) {
            LOG.info("startSuperstep: " + this.masterInfo);
            LOG.info("startSuperstep: Ready for computation on superstep " + getSuperstep() + " since worker selection and vertex range assignments are done in " + addressesAndPartitionsPath);
        }
        getContext().setStatus("startSuperstep: " + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep());
        if (LOG.isDebugEnabled()) {
            LOG.debug("startSuperstep: addressesAndPartitions" + addressesAndPartitionsWritable.getWorkerInfos());
            for (PartitionOwner partitionOwner : addressesAndPartitionsWritable.getPartitionOwners()) {
                LOG.debug(partitionOwner.getPartitionId() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + partitionOwner.getWorkerInfo());
            }
        }
        return addressesAndPartitionsWritable.getPartitionOwners();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public FinishedSuperstepStats finishSuperstep(List<PartitionStats> list, GiraphTimerContext giraphTimerContext) {
        waitForRequestsToFinish();
        getGraphTaskManager().notifyFinishedCommunication();
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        for (PartitionStats partitionStats : list) {
            j += partitionStats.getMessagesSentCount();
            j2 += partitionStats.getMessageBytesSentCount();
            j3 += partitionStats.getVertexCount();
        }
        if (getSuperstep() != -1) {
            postSuperstepCallbacks();
        }
        this.globalCommHandler.finishSuperstep(this.workerAggregatorRequestProcessor);
        Object incomingMessageStore = getServerData().getIncomingMessageStore();
        if (incomingMessageStore instanceof AsyncMessageStoreWrapper) {
            ((AsyncMessageStoreWrapper) incomingMessageStore).waitToComplete();
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("finishSuperstep: Superstep " + getSuperstep() + ", messages = " + j + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + ", message bytes = " + j2 + " , " + MemoryUtils.getRuntimeMemoryStats());
        }
        if (giraphTimerContext != null) {
            giraphTimerContext.stop();
        }
        writeFinshedSuperstepInfoToZK(list, j, j2);
        LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "finishSuperstep: (waiting for rest of workers) " + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep());
        String superstepFinishedPath = getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep());
        waitForOtherWorkers(superstepFinishedPath);
        GlobalStats globalStats = new GlobalStats();
        SuperstepClasses superstepClasses = new SuperstepClasses();
        WritableUtils.readFieldsFromZnode(getZkExt(), superstepFinishedPath, false, null, globalStats, superstepClasses);
        if (LOG.isInfoEnabled()) {
            LOG.info("finishSuperstep: Completed superstep " + getSuperstep() + " with global stats " + globalStats + " and classes " + superstepClasses);
        }
        getContext().setStatus("finishSuperstep: (all workers done) " + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep());
        incrCachedSuperstep();
        getConfiguration().updateSuperstepClasses(superstepClasses);
        return new FinishedSuperstepStats(j3, globalStats.getHaltComputation(), globalStats.getVertexCount(), globalStats.getEdgeCount(), false, globalStats.getCheckpointStatus());
    }

    private void postSuperstepCallbacks() {
        GiraphTimerContext time = this.wcPostSuperstepTimer.time();
        getWorkerContext().postSuperstep();
        time.stop();
        getContext().progress();
        for (WorkerObserver workerObserver : getWorkerObservers()) {
            workerObserver.postSuperstep(getSuperstep());
            getContext().progress();
        }
    }

    private void waitForRequestsToFinish() {
        if (LOG.isInfoEnabled()) {
            LOG.info("finishSuperstep: Waiting on all requests, superstep " + getSuperstep() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + MemoryUtils.getRuntimeMemoryStats());
        }
        GiraphTimerContext time = this.waitRequestsTimer.time();
        this.workerClient.waitAllRequests();
        time.stop();
    }

    private void waitForOtherWorkers(String str) {
        while (getZkExt().exists(str, true) == null) {
            try {
                getSuperstepFinishedEvent().waitForever();
                getSuperstepFinishedEvent().reset();
            } catch (InterruptedException e) {
                throw new IllegalStateException("finishSuperstep: Failed while waiting for master to signal completion of superstep " + getSuperstep(), e);
            } catch (KeeperException e2) {
                throw new IllegalStateException("finishSuperstep: Failed while waiting for master to signal completion of superstep " + getSuperstep(), e2);
            }
        }
    }

    private void writeFinshedSuperstepInfoToZK(List<PartitionStats> list, long j, long j2) {
        byte[] writeListToByteArray = WritableUtils.writeListToByteArray(new ArrayList(this.workerGraphPartitioner.finalizePartitionStats(list, getPartitionStore())));
        WorkerSuperstepMetrics workerSuperstepMetrics = new WorkerSuperstepMetrics();
        workerSuperstepMetrics.readFromRegistry();
        byte[] writeToByteArray = WritableUtils.writeToByteArray(workerSuperstepMetrics);
        JSONObject jSONObject = new JSONObject();
        try {
            jSONObject.put(BspService.JSONOBJ_PARTITION_STATS_KEY, Base64.encodeBytes(writeListToByteArray));
            jSONObject.put(BspService.JSONOBJ_NUM_MESSAGES_KEY, j);
            jSONObject.put(BspService.JSONOBJ_NUM_MESSAGE_BYTES_KEY, j2);
            jSONObject.put(BspService.JSONOBJ_METRICS_KEY, Base64.encodeBytes(writeToByteArray));
            String str = getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()) + "/" + getHostnamePartitionId();
            try {
                getZkExt().createExt(str, jSONObject.toString().getBytes(Charset.defaultCharset()), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            } catch (InterruptedException e) {
                throw new IllegalStateException("Creating " + str + " failed with InterruptedException", e);
            } catch (KeeperException.NodeExistsException e2) {
                LOG.warn("finishSuperstep: finished worker path " + str + " already exists!");
            } catch (KeeperException e3) {
                throw new IllegalStateException("Creating " + str + " failed with KeeperException", e3);
            }
        } catch (JSONException e4) {
            throw new RuntimeException(e4);
        }
    }

    private void saveVertices(long j) throws IOException, InterruptedException {
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        if (configuration.getVertexOutputFormatClass() == null) {
            LOG.warn("saveVertices: " + GiraphConstants.VERTEX_OUTPUT_FORMAT_CLASS + " not specified -- there will be no saved output");
            return;
        }
        if (configuration.doOutputDuringComputation()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("saveVertices: The option for doing output during computation is selected, so there will be no saving of the output in the end of application");
                return;
            }
            return;
        }
        int numPartitions = getPartitionStore().getNumPartitions();
        int min = Math.min(getConfiguration().getNumOutputThreads(), numPartitions);
        LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveVertices: Starting to save " + j + " vertices using " + min + " threads");
        final WrappedVertexOutputFormat<I, V, E> createWrappedVertexOutputFormat = getConfiguration().createWrappedVertexOutputFormat();
        final Queue linkedList = numPartitions == 0 ? new LinkedList() : new ArrayBlockingQueue(numPartitions);
        Iterables.addAll(linkedList, getPartitionStore().getPartitionIds());
        long j2 = 0;
        PartitionStore<I, V, E> partitionStore = getPartitionStore();
        Iterator<Integer> it2 = partitionStore.getPartitionIds().iterator();
        while (it2.hasNext()) {
            Partition<I, V, E> orCreatePartition = partitionStore.getOrCreatePartition(Integer.valueOf(it2.next().intValue()));
            j2 += orCreatePartition.getVertexCount();
            partitionStore.putPartition(orCreatePartition);
        }
        WorkerProgress.get().startStoring(j2, getPartitionStore().getNumPartitions());
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.1
            @Override // org.apache.giraph.utils.CallableFactory
            /* renamed from: newCallable */
            public Callable<Void> newCallable2(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.1.1
                    private static final long VERTICES_TO_UPDATE_PROGRESS = 100000;

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Integer num;
                        VertexWriter<I, V, E> createVertexWriter = createWrappedVertexOutputFormat.createVertexWriter(BspServiceWorker.this.getContext());
                        createVertexWriter.setConf(BspServiceWorker.this.getConfiguration());
                        createVertexWriter.initialize(BspServiceWorker.this.getContext());
                        long j3 = 0;
                        long j4 = 100000;
                        long currentTimeMillis = System.currentTimeMillis() + 15000;
                        int i2 = 0;
                        int numPartitions2 = BspServiceWorker.this.getPartitionStore().getNumPartitions();
                        while (!linkedList.isEmpty() && (num = (Integer) linkedList.poll()) != null) {
                            Partition<I, V, E> orCreatePartition2 = BspServiceWorker.this.getPartitionStore().getOrCreatePartition(num);
                            long j5 = 0;
                            Iterator<Vertex<I, V, E>> it3 = orCreatePartition2.iterator();
                            while (it3.hasNext()) {
                                createVertexWriter.writeVertex((Vertex) it3.next());
                                j5++;
                                if (j5 > j3 && System.currentTimeMillis() > currentTimeMillis) {
                                    LoggerUtils.setStatusAndLog(BspServiceWorker.this.getContext(), BspServiceWorker.LOG, Level.INFO, "saveVertices: Saved " + j5 + " out of " + orCreatePartition2.getVertexCount() + " partition vertices, on partition " + i2 + " out of " + numPartitions2);
                                    currentTimeMillis = System.currentTimeMillis() + 15000;
                                    j3 = j5 + 250000;
                                }
                                if (j5 >= j4) {
                                    WorkerProgress.get().addVerticesStored(VERTICES_TO_UPDATE_PROGRESS);
                                    j4 += VERTICES_TO_UPDATE_PROGRESS;
                                }
                            }
                            BspServiceWorker.this.getPartitionStore().putPartition(orCreatePartition2);
                            i2++;
                            WorkerProgress.get().addVerticesStored(j5 % VERTICES_TO_UPDATE_PROGRESS);
                            WorkerProgress.get().incrementPartitionsStored();
                        }
                        createVertexWriter.close(BspServiceWorker.this.getContext());
                        return null;
                    }
                };
            }
        }, min, "save-vertices-%d", getContext());
        LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveVertices: Done saving vertices.");
        if (!getConfiguration().isPureYarnJob() || getConfiguration().getVertexOutputFormatClass() == null) {
            return;
        }
        try {
            OutputCommitter outputCommitter = createWrappedVertexOutputFormat.getOutputCommitter(getContext());
            if (outputCommitter.needsTaskCommit(getContext())) {
                LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "OutputCommitter: committing task output.");
                outputCommitter.commitTask(getContext());
            }
        } catch (IOException e) {
            LOG.error("Master task's attempt to commit output has FAILED.", e);
        } catch (InterruptedException e2) {
            LOG.error("Interrupted while attempting to obtain OutputCommitter.", e2);
        }
    }

    private void saveEdges() throws IOException, InterruptedException {
        final ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        if (configuration.getEdgeOutputFormatClass() == null) {
            LOG.warn("saveEdges: " + GiraphConstants.EDGE_OUTPUT_FORMAT_CLASS + "Make sure that the EdgeOutputFormat is not required.");
            return;
        }
        int numPartitions = getPartitionStore().getNumPartitions();
        int min = Math.min(configuration.getNumOutputThreads(), numPartitions);
        LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveEdges: Starting to save the edges using " + min + " threads");
        final WrappedEdgeOutputFormat<I, V, E> createWrappedEdgeOutputFormat = configuration.createWrappedEdgeOutputFormat();
        final Queue linkedList = numPartitions == 0 ? new LinkedList() : new ArrayBlockingQueue(numPartitions);
        Iterables.addAll(linkedList, getPartitionStore().getPartitionIds());
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.2
            @Override // org.apache.giraph.utils.CallableFactory
            /* renamed from: newCallable */
            public Callable<Void> newCallable2(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.2.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    /* JADX WARN: Multi-variable type inference failed */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Integer num;
                        EdgeWriter createEdgeWriter = createWrappedEdgeOutputFormat.createEdgeWriter(BspServiceWorker.this.getContext());
                        createEdgeWriter.setConf(configuration);
                        createEdgeWriter.initialize(BspServiceWorker.this.getContext());
                        long j = 0;
                        long currentTimeMillis = System.currentTimeMillis() + 15000;
                        int i2 = 0;
                        int numPartitions2 = BspServiceWorker.this.getPartitionStore().getNumPartitions();
                        while (!linkedList.isEmpty() && (num = (Integer) linkedList.poll()) != null) {
                            Partition<I, V, E> orCreatePartition = BspServiceWorker.this.getPartitionStore().getOrCreatePartition(num);
                            long j2 = 0;
                            long j3 = 0;
                            long edgeCount = orCreatePartition.getEdgeCount();
                            Iterator<Vertex<I, V, E>> it2 = orCreatePartition.iterator();
                            while (it2.hasNext()) {
                                Vertex vertex = (Vertex) it2.next();
                                Iterator<Edge<I, E>> it3 = vertex.getEdges().iterator();
                                while (it3.hasNext()) {
                                    createEdgeWriter.writeEdge(vertex.getId(), vertex.getValue(), it3.next());
                                    j3++;
                                }
                                j2++;
                                if (j2 > j && System.currentTimeMillis() > currentTimeMillis) {
                                    LoggerUtils.setStatusAndLog(BspServiceWorker.this.getContext(), BspServiceWorker.LOG, Level.INFO, "saveEdges: Saved " + j3 + " edges out of " + edgeCount + " partition edges, on partition " + i2 + " out of " + numPartitions2);
                                    currentTimeMillis = System.currentTimeMillis() + 15000;
                                    j = j2 + 250000;
                                }
                            }
                            BspServiceWorker.this.getPartitionStore().putPartition(orCreatePartition);
                            i2++;
                        }
                        createEdgeWriter.close(BspServiceWorker.this.getContext());
                        return null;
                    }
                };
            }
        }, min, "save-vertices-%d", getContext());
        LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "saveEdges: Done saving edges.");
        if (!configuration.isPureYarnJob() || configuration.getVertexOutputFormatClass() == null) {
            return;
        }
        try {
            OutputCommitter outputCommitter = createWrappedEdgeOutputFormat.getOutputCommitter(getContext());
            if (outputCommitter.needsTaskCommit(getContext())) {
                LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "OutputCommitter: committing task output.");
                outputCommitter.commitTask(getContext());
            }
        } catch (IOException e) {
            LOG.error("Master task's attempt to commit output has FAILED.", e);
        } catch (InterruptedException e2) {
            LOG.error("Interrupted while attempting to obtain OutputCommitter.", e2);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public void cleanup(FinishedSuperstepStats finishedSuperstepStats) throws IOException, InterruptedException {
        this.workerClient.closeConnections();
        setCachedSuperstep(getSuperstep() - 1);
        if (finishedSuperstepStats.getCheckpointStatus() != CheckpointStatus.CHECKPOINT_AND_HALT) {
            saveVertices(finishedSuperstepStats.getLocalVertexCount());
            saveEdges();
        }
        WorkerProgress.get().finishStoring();
        if (this.workerProgressWriter != null) {
            this.workerProgressWriter.stop();
        }
        getPartitionStore().shutdown();
        String str = this.cleanedUpPath + "/" + getTaskPartition() + BspService.WORKER_SUFFIX;
        try {
            String createExt = getZkExt().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            if (LOG.isInfoEnabled()) {
                LOG.info("cleanup: Notifying master its okay to cleanup with " + createExt);
            }
        } catch (InterruptedException e) {
            LOG.error("cleanup: Got InterruptedException on notification to master about cleanup", e);
        } catch (KeeperException.NodeExistsException e2) {
            if (LOG.isInfoEnabled()) {
                LOG.info("cleanup: Couldn't create finished node '" + str);
            }
        } catch (KeeperException e3) {
            LOG.error("cleanup: Got KeeperException on notification to master about cleanup", e3);
        }
        try {
            getZkExt().close();
        } catch (InterruptedException e4) {
            LOG.error("cleanup: Zookeeper failed to close with " + e4);
        }
        if (getConfiguration().metricsEnabled()) {
            GiraphMetrics.get().dumpToStream(System.err);
        }
        this.workerServer.close();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public void storeCheckpoint() throws IOException {
        LoggerUtils.setStatusAndLog(getContext(), LOG, Level.INFO, "storeCheckpoint: Starting checkpoint " + getGraphTaskManager().getGraphFunctions().toString() + " - Attempt=" + getApplicationAttempt() + ", Superstep=" + getSuperstep());
        Path createCheckpointFilePathSafe = createCheckpointFilePathSafe(CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
        Path createCheckpointFilePathSafe2 = createCheckpointFilePathSafe(CheckpointingUtils.CHECKPOINT_VALID_POSTFIX);
        Path createCheckpointFilePathSafe3 = createCheckpointFilePathSafe(CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
        FSDataOutputStream create = getFs().create(createCheckpointFilePathSafe);
        create.writeInt(getPartitionStore().getNumPartitions());
        Iterator<Integer> it2 = getPartitionStore().getPartitionIds().iterator();
        while (it2.hasNext()) {
            create.writeInt(it2.next().intValue());
        }
        create.close();
        storeCheckpointVertices();
        DataOutput create2 = getFs().create(createCheckpointFilePathSafe3);
        this.workerContext.write(create2);
        getContext().progress();
        for (Integer num : getPartitionStore().getPartitionIds()) {
            create2.writeInt(num.intValue());
            getServerData().getCurrentMessageStore().writePartition(create2, num.intValue());
            getContext().progress();
        }
        WritableUtils.writeList(getServerData().getCurrentWorkerToWorkerMessages(), create2);
        create2.close();
        getFs().createNewFile(createCheckpointFilePathSafe2);
        String str = getWorkerWroteCheckpointPath(getApplicationAttempt(), getSuperstep()) + "/" + getHostnamePartitionId();
        try {
            getZkExt().createExt(str, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
        } catch (InterruptedException e) {
            throw new IllegalStateException("Creating " + str + " failed with InterruptedException", e);
        } catch (KeeperException.NodeExistsException e2) {
            LOG.warn("storeCheckpoint: wrote checkpoint worker path " + str + " already exists!");
        } catch (KeeperException e3) {
            throw new IllegalStateException("Creating " + str + " failed with KeeperException", e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Path createCheckpointFilePathSafe(String str) throws IOException {
        Path path = new Path(getCheckpointBasePath(getSuperstep()) + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + getTaskPartition() + str);
        if (getFs().delete(path, false)) {
            LOG.warn("storeCheckpoint: Removed " + str + " file " + path);
        }
        return path;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Path getSavedCheckpoint(long j, String str) {
        return new Path(getSavedCheckpointBasePath(j) + DefaultExpressionEngine.DEFAULT_PROPERTY_DELIMITER + getTaskPartition() + str);
    }

    private void storeCheckpointVertices() {
        int numPartitions = getPartitionStore().getNumPartitions();
        int min = Math.min(GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()), numPartitions);
        final Queue linkedList = numPartitions == 0 ? new LinkedList() : new ArrayBlockingQueue(numPartitions);
        Iterables.addAll(linkedList, getPartitionStore().getPartitionIds());
        final CompressionCodec codec = new CompressionCodecFactory(getConfiguration()).getCodec(new Path(GiraphConstants.CHECKPOINT_COMPRESSION_CODEC.get(getConfiguration())));
        long currentTimeMillis = System.currentTimeMillis();
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.3
            @Override // org.apache.giraph.utils.CallableFactory
            /* renamed from: newCallable */
            public Callable<Void> newCallable2(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.3.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Integer num;
                        while (!linkedList.isEmpty() && (num = (Integer) linkedList.poll()) != null) {
                            DataOutputStream create = BspServiceWorker.this.getFs().create(BspServiceWorker.this.createCheckpointFilePathSafe("_" + num + CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX));
                            DataOutputStream dataOutputStream = codec == null ? create : new DataOutputStream(codec.createOutputStream(create));
                            Partition<I, V, E> orCreatePartition = BspServiceWorker.this.getPartitionStore().getOrCreatePartition(num);
                            orCreatePartition.write(dataOutputStream);
                            BspServiceWorker.this.getPartitionStore().putPartition(orCreatePartition);
                            dataOutputStream.close();
                            create.close();
                        }
                        return null;
                    }
                };
            }
        }, min, "checkpoint-vertices-%d", getContext());
        LOG.info("Save checkpoint in " + (System.currentTimeMillis() - currentTimeMillis) + " ms, using " + min + " threads");
    }

    private void loadCheckpointVertices(final long j, List<Integer> list) {
        int min = Math.min(GiraphConstants.NUM_CHECKPOINT_IO_THREADS.get(getConfiguration()), list.size());
        final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue(list);
        final CompressionCodec codec = new CompressionCodecFactory(getConfiguration()).getCodec(new Path(GiraphConstants.CHECKPOINT_COMPRESSION_CODEC.get(getConfiguration())));
        long currentTimeMillis = System.currentTimeMillis();
        ProgressableUtils.getResultsWithNCallables(new CallableFactory<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.4
            @Override // org.apache.giraph.utils.CallableFactory
            /* renamed from: newCallable */
            public Callable<Void> newCallable2(int i) {
                return new Callable<Void>() { // from class: org.apache.giraph.worker.BspServiceWorker.4.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.util.concurrent.Callable
                    public Void call() throws Exception {
                        Integer num;
                        while (!concurrentLinkedQueue.isEmpty() && (num = (Integer) concurrentLinkedQueue.poll()) != null) {
                            DataInputStream open = BspServiceWorker.this.getFs().open(BspServiceWorker.this.getSavedCheckpoint(j, "_" + num + CheckpointingUtils.CHECKPOINT_VERTICES_POSTFIX));
                            DataInputStream dataInputStream = codec == null ? open : new DataInputStream(codec.createInputStream(open));
                            Partition<I, V, E> createPartition = BspServiceWorker.this.getConfiguration().createPartition(num.intValue(), BspServiceWorker.this.getContext());
                            createPartition.readFields(dataInputStream);
                            BspServiceWorker.this.getPartitionStore().addPartition(createPartition);
                            dataInputStream.close();
                        }
                        return null;
                    }
                };
            }
        }, min, "load-vertices-%d", getContext());
        LOG.info("Loaded checkpoint in " + (System.currentTimeMillis() - currentTimeMillis) + " ms, using " + min + " threads");
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public VertexEdgeCount loadCheckpoint(long j) {
        Path savedCheckpoint = getSavedCheckpoint(j, CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX);
        Path savedCheckpoint2 = getSavedCheckpoint(j, CheckpointingUtils.CHECKPOINT_DATA_POSTFIX);
        try {
            FSDataInputStream open = getFs().open(savedCheckpoint);
            int readInt = open.readInt();
            ArrayList arrayList = new ArrayList(readInt);
            for (int i = 0; i < readInt; i++) {
                arrayList.add(Integer.valueOf(open.readInt()));
            }
            loadCheckpointVertices(j, arrayList);
            getContext().progress();
            open.close();
            FSDataInputStream open2 = getFs().open(savedCheckpoint2);
            this.workerContext.readFields(open2);
            GlobalStats globalStats = new GlobalStats();
            SuperstepClasses superstepClasses = new SuperstepClasses();
            FSDataInputStream open3 = getFs().open(new Path(getSavedCheckpointBasePath(j) + CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX));
            globalStats.readFields(open3);
            superstepClasses.readFields(open3);
            getConfiguration().updateSuperstepClasses(superstepClasses);
            getServerData().resetMessageStores();
            for (int i2 = 0; i2 < readInt; i2++) {
                getServerData().getCurrentMessageStore().readFieldsForPartition(open2, open2.readInt());
            }
            getServerData().getCurrentWorkerToWorkerMessages().addAll(WritableUtils.readList(open2));
            open2.close();
            if (LOG.isInfoEnabled()) {
                LOG.info("loadCheckpoint: Loaded " + this.workerGraphPartitioner.getPartitionOwners().size() + " total.");
            }
            this.workerClient.setup(getConfiguration().authenticate());
            return new VertexEdgeCount(globalStats.getVertexCount(), globalStats.getEdgeCount());
        } catch (IOException e) {
            throw new RuntimeException("loadCheckpoint: Failed for superstep=" + j, e);
        }
    }

    private void sendWorkerPartitions(Map<WorkerInfo, List<Integer>> map) {
        ArrayList<Map.Entry> arrayList = new ArrayList(map.entrySet());
        Collections.shuffle(arrayList);
        NettyWorkerClientRequestProcessor nettyWorkerClientRequestProcessor = new NettyWorkerClientRequestProcessor(getContext(), getConfiguration(), this);
        for (Map.Entry entry : arrayList) {
            for (Integer num : (List) entry.getValue()) {
                Partition<I, V, E> removePartition = getPartitionStore().removePartition(num);
                if (removePartition == null) {
                    throw new IllegalStateException("sendWorkerPartitions: Couldn't find partition " + num + " to send to " + entry.getKey());
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("sendWorkerPartitions: Sending worker " + entry.getKey() + " partition " + num);
                }
                nettyWorkerClientRequestProcessor.sendPartitionRequest((WorkerInfo) entry.getKey(), removePartition);
            }
        }
        try {
            nettyWorkerClientRequestProcessor.flush();
            this.workerClient.waitAllRequests();
            String partitionExchangeWorkerPath = getPartitionExchangeWorkerPath(getApplicationAttempt(), getSuperstep(), getWorkerInfo());
            try {
                getZkExt().createExt(partitionExchangeWorkerPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
                if (LOG.isInfoEnabled()) {
                    LOG.info("sendWorkerPartitions: Done sending all my partitions.");
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("sendWorkerPartitions: InterruptedException to create " + partitionExchangeWorkerPath, e);
            } catch (KeeperException e2) {
                throw new IllegalStateException("sendWorkerPartitions: KeeperException to create " + partitionExchangeWorkerPath, e2);
            }
        } catch (IOException e3) {
            throw new IllegalStateException("sendWorkerPartitions: Flush failed", e3);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public final void exchangeVertexPartitions(Collection<? extends PartitionOwner> collection) {
        PartitionExchange updatePartitionOwners = this.workerGraphPartitioner.updatePartitionOwners(getWorkerInfo(), collection);
        this.workerClient.openConnections();
        Map<WorkerInfo, List<Integer>> sendWorkerPartitionMap = updatePartitionOwners.getSendWorkerPartitionMap();
        if (!getPartitionStore().isEmpty()) {
            sendWorkerPartitions(sendWorkerPartitionMap);
        }
        Set<WorkerInfo> myDependencyWorkerSet = updatePartitionOwners.getMyDependencyWorkerSet();
        HashSet hashSet = new HashSet();
        for (WorkerInfo workerInfo : myDependencyWorkerSet) {
            if (!hashSet.add(workerInfo.getHostnameId())) {
                throw new IllegalStateException("exchangeVertexPartitions: Duplicate entry " + workerInfo);
            }
        }
        if (myDependencyWorkerSet.isEmpty() && getPartitionStore().isEmpty()) {
            if (LOG.isInfoEnabled()) {
                LOG.info("exchangeVertexPartitions: Nothing to exchange, exiting early");
                return;
            }
            return;
        }
        String partitionExchangePath = getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
        while (true) {
            try {
                hashSet.removeAll(getZkExt().getChildrenExt(partitionExchangePath, true, false, false));
                if (hashSet.isEmpty()) {
                    break;
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("exchangeVertexPartitions: Waiting for workers " + hashSet);
                }
                getPartitionExchangeChildrenChangedEvent().waitForever();
                getPartitionExchangeChildrenChangedEvent().reset();
            } catch (InterruptedException | KeeperException e) {
                throw new RuntimeException("exchangeVertexPartitions: Got runtime exception", e);
            }
        }
        if (LOG.isInfoEnabled()) {
            LOG.info("exchangeVertexPartitions: Done with exchange.");
        }
    }

    public final BspEvent getPartitionExchangeChildrenChangedEvent() {
        return this.partitionExchangeChildrenChanged;
    }

    @Override // org.apache.giraph.bsp.BspService
    protected boolean processEvent(WatchedEvent watchedEvent) {
        boolean z = false;
        if (watchedEvent.getPath().startsWith(this.masterJobStatePath) && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
            if (LOG.isInfoEnabled()) {
                LOG.info("processEvent: Job state changed, checking to see if it needs to restart");
            }
            JSONObject jobState = getJobState();
            if (getConfiguration().isPureYarnJob() && null == jobState) {
                LOG.error("BspServiceWorker#getJobState() came back NULL.");
                return false;
            }
            try {
                if (ApplicationState.valueOf(jobState.getString(BspService.JSONOBJ_STATE_KEY)) == ApplicationState.START_SUPERSTEP && jobState.getLong(BspService.JSONOBJ_APPLICATION_ATTEMPT_KEY) != getApplicationAttempt()) {
                    LOG.fatal("processEvent: Worker will restart from command - " + jobState.toString());
                    System.exit(-1);
                }
                z = true;
            } catch (JSONException e) {
                throw new RuntimeException("processEvent: Couldn't properly get job state from " + jobState.toString());
            }
        } else if (watchedEvent.getPath().contains(BspService.PARTITION_EXCHANGE_DIR) && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
            if (LOG.isInfoEnabled()) {
                LOG.info("processEvent : partitionExchangeChildrenChanged (at least one worker is done sending partitions)");
            }
            this.partitionExchangeChildrenChanged.signal();
            z = true;
        }
        return z;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public WorkerInfo getWorkerInfo() {
        return this.workerInfo;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public PartitionStore<I, V, E> getPartitionStore() {
        return getServerData().getPartitionStore();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public PartitionOwner getVertexPartitionOwner(I i) {
        return this.workerGraphPartitioner.getPartitionOwner(i);
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public Iterable<? extends PartitionOwner> getPartitionOwners() {
        return this.workerGraphPartitioner.getPartitionOwners();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public int getPartitionId(I i) {
        return getVertexPartitionOwner(i).getPartitionId();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public boolean hasPartition(Integer num) {
        return getPartitionStore().hasPartition(num);
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public ServerData<I, V, E> getServerData() {
        return this.workerServer.getServerData();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public WorkerAggregatorHandler getAggregatorHandler() {
        return this.globalCommHandler;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public void prepareSuperstep() {
        if (getSuperstep() != -1) {
            this.globalCommHandler.prepareSuperstep(this.workerAggregatorRequestProcessor);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public SuperstepOutput<I, V, E> getSuperstepOutput() {
        return this.superstepOutput;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceWorker
    public GlobalStats getGlobalStats() {
        GlobalStats globalStats = new GlobalStats();
        if (getSuperstep() > Math.max(-1L, getRestartedSuperstep())) {
            WritableUtils.readFieldsFromZnode(getZkExt(), getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep() - 1), false, null, globalStats);
        }
        return globalStats;
    }
}
