package org.apache.giraph.master;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import it.unimi.dsi.fastutil.ints.Int2ObjectOpenHashMap;
import java.io.ByteArrayOutputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import net.iharder.Base64;
import org.apache.commons.io.FilenameUtils;
import org.apache.giraph.bsp.ApplicationState;
import org.apache.giraph.bsp.BspInputFormat;
import org.apache.giraph.bsp.BspService;
import org.apache.giraph.bsp.CentralizedServiceMaster;
import org.apache.giraph.bsp.CheckpointStatus;
import org.apache.giraph.bsp.SuperstepState;
import org.apache.giraph.comm.MasterClient;
import org.apache.giraph.comm.MasterServer;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.counters.GiraphStats;
import org.apache.giraph.graph.AddressesAndPartitionsWritable;
import org.apache.giraph.graph.GlobalStats;
import org.apache.giraph.graph.GraphFunctions;
import org.apache.giraph.graph.GraphState;
import org.apache.giraph.graph.GraphTaskManager;
import org.apache.giraph.graph.InputSplitEvents;
import org.apache.giraph.graph.InputSplitPaths;
import org.apache.giraph.io.GiraphInputFormat;
import org.apache.giraph.metrics.AggregatedMetrics;
import org.apache.giraph.metrics.GiraphMetrics;
import org.apache.giraph.metrics.GiraphTimer;
import org.apache.giraph.metrics.GiraphTimerContext;
import org.apache.giraph.metrics.ResetSuperstepMetricsObserver;
import org.apache.giraph.metrics.SuperstepMetricsRegistry;
import org.apache.giraph.metrics.WorkerSuperstepMetrics;
import org.apache.giraph.partition.BasicPartitionOwner;
import org.apache.giraph.partition.MasterGraphPartitioner;
import org.apache.giraph.partition.PartitionOwner;
import org.apache.giraph.partition.PartitionStats;
import org.apache.giraph.partition.PartitionUtils;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.giraph.utils.CheckpointingUtils;
import org.apache.giraph.utils.JMapHistoDumper;
import org.apache.giraph.utils.LogStacktraceCallable;
import org.apache.giraph.utils.ProgressableUtils;
import org.apache.giraph.utils.ReactiveJMapHistoDumper;
import org.apache.giraph.utils.WritableUtils;
import org.apache.giraph.worker.WorkerInfo;
import org.apache.giraph.zk.BspEvent;
import org.apache.giraph.zk.PredicateLock;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.Stat;
import org.codehaus.jackson.util.MinimalPrettyPrinter;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: input_file:org/apache/giraph/master/BspServiceMaster.class */
public class BspServiceMaster<I extends WritableComparable, V extends Writable, E extends Writable> extends BspService<I, V, E> implements CentralizedServiceMaster<I, V, E>, ResetSuperstepMetricsObserver {
    public static final int MAX_PRINTABLE_REMAINING_WORKERS = 10;
    public static final String NUM_MASTER_ZK_INPUT_SPLIT_THREADS = "giraph.numMasterZkInputSplitThreads";
    public static final int DEFAULT_INPUT_SPLIT_THREAD_COUNT = 1;
    private static final Time TIME = SystemTime.get();
    private static final Logger LOG = Logger.getLogger(BspServiceMaster.class);
    private boolean isMaster;
    private final int maxWorkers;
    private final int minWorkers;
    private final int maxNumberOfSupersteps;
    private final float minPercentResponded;
    private final int eventWaitMsecs;
    private final int maxSuperstepWaitMsecs;
    private final int partitionLongTailMinPrint;
    private long lastCheckpointedSuperstep;
    private final BspEvent workerWroteCheckpoint;
    private final BspEvent superstepStateChanged;
    private final MasterGraphPartitioner<I, V, E> masterGraphPartitioner;
    private final List<PartitionStats> allPartitionStatsList;
    private MasterAggregatorHandler globalCommHandler;
    private AggregatorToGlobalCommTranslation aggregatorTranslation;
    private MasterCompute masterCompute;
    private MasterClient masterClient;
    private MasterServer masterServer;
    private MasterInfo masterInfo;
    private List<WorkerInfo> chosenWorkerInfoList;
    private final int localityLimit = 5;
    private final MasterObserver[] observers;
    private GiraphTimer masterComputeTimer;
    private final int checkpointFrequency;
    private CheckpointStatus checkpointStatus;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/master/BspServiceMaster$WriteInputSplit.class */
    public class WriteInputSplit implements Callable<Void> {
        private final GiraphInputFormat inputFormat;
        private final InputSplit inputSplit;
        private final String inputSplitsPath;
        private final int index;
        private final boolean writeLocations;

        public WriteInputSplit(GiraphInputFormat giraphInputFormat, InputSplit inputSplit, String str, int i, boolean z) {
            this.inputFormat = giraphInputFormat;
            this.inputSplit = inputSplit;
            this.inputSplitsPath = str;
            this.index = i;
            this.writeLocations = z;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.concurrent.Callable
        public Void call() {
            String str = null;
            try {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                if (this.writeLocations) {
                    String[] locations = this.inputSplit.getLocations();
                    StringBuilder sb = null;
                    if (locations != null) {
                        int min = Math.min(locations.length, 5);
                        sb = new StringBuilder();
                        for (String str2 : locations) {
                            min--;
                            sb.append(str2).append(min > 0 ? "\t" : "");
                        }
                    }
                    Text.writeString(dataOutputStream, sb == null ? "" : sb.toString());
                }
                this.inputFormat.writeInputSplit(this.inputSplit, dataOutputStream);
                str = this.inputSplitsPath + "/" + this.index;
                BspServiceMaster.this.getZkExt().createExt(str, byteArrayOutputStream.toByteArray(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
                if (BspServiceMaster.LOG.isDebugEnabled()) {
                    BspServiceMaster.LOG.debug("call: Created input split with index " + this.index + " serialized as " + byteArrayOutputStream.toString(Charset.defaultCharset().name()));
                }
                return null;
            } catch (IOException e) {
                throw new IllegalStateException("call: IOException", e);
            } catch (InterruptedException e2) {
                throw new IllegalStateException("call: IllegalStateException", e2);
            } catch (KeeperException.NodeExistsException e3) {
                if (!BspServiceMaster.LOG.isInfoEnabled()) {
                    return null;
                }
                BspServiceMaster.LOG.info("call: Node " + str + " already exists.");
                return null;
            } catch (KeeperException e4) {
                throw new IllegalStateException("call: KeeperException", e4);
            }
        }
    }

    public BspServiceMaster(Mapper<?, ?, ?, ?>.Context context, GraphTaskManager<I, V, E> graphTaskManager) {
        super(context, graphTaskManager);
        this.isMaster = false;
        this.lastCheckpointedSuperstep = -1L;
        this.allPartitionStatsList = new ArrayList();
        this.chosenWorkerInfoList = Lists.newArrayList();
        this.localityLimit = 5;
        this.workerWroteCheckpoint = new PredicateLock(context);
        registerBspEvent(this.workerWroteCheckpoint);
        this.superstepStateChanged = new PredicateLock(context);
        registerBspEvent(this.superstepStateChanged);
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        this.maxWorkers = configuration.getMaxWorkers();
        this.minWorkers = configuration.getMinWorkers();
        this.maxNumberOfSupersteps = configuration.getMaxNumberOfSupersteps();
        this.minPercentResponded = GiraphConstants.MIN_PERCENT_RESPONDED.get(configuration);
        this.eventWaitMsecs = configuration.getEventWaitMsecs();
        this.maxSuperstepWaitMsecs = configuration.getMaxMasterSuperstepWaitMsecs();
        this.partitionLongTailMinPrint = GiraphConstants.PARTITION_LONG_TAIL_MIN_PRINT.get(configuration);
        this.masterGraphPartitioner = getGraphPartitionerFactory().createMasterGraphPartitioner();
        if (configuration.isJMapHistogramDumpEnabled()) {
            configuration.addMasterObserverClass(JMapHistoDumper.class);
        }
        if (configuration.isReactiveJmapHistogramDumpEnabled()) {
            configuration.addMasterObserverClass(ReactiveJMapHistoDumper.class);
        }
        this.observers = configuration.createMasterObservers();
        this.checkpointFrequency = configuration.getCheckpointFrequency();
        this.checkpointStatus = CheckpointStatus.NONE;
        GiraphMetrics.get().addSuperstepResetObserver(this);
        GiraphStats.init(context);
    }

    @Override // org.apache.giraph.metrics.ResetSuperstepMetricsObserver
    public void newSuperstep(SuperstepMetricsRegistry superstepMetricsRegistry) {
        this.masterComputeTimer = new GiraphTimer(superstepMetricsRegistry, "master-compute-call", TimeUnit.MILLISECONDS);
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void setJobState(ApplicationState applicationState, long j, long j2) {
        setJobState(applicationState, j, j2, true);
    }

    private void setJobState(ApplicationState applicationState, long j, long j2, boolean z) {
        JSONObject jSONObject = new JSONObject();
        try {
            jSONObject.put(BspService.JSONOBJ_STATE_KEY, applicationState.toString());
            jSONObject.put(BspService.JSONOBJ_APPLICATION_ATTEMPT_KEY, j);
            jSONObject.put(BspService.JSONOBJ_SUPERSTEP_KEY, j2);
            if (LOG.isInfoEnabled()) {
                LOG.info("setJobState: " + jSONObject.toString() + " on superstep " + getSuperstep());
            }
            try {
                getZkExt().createExt(this.masterJobStatePath + "/jobState", jSONObject.toString().getBytes(Charset.defaultCharset()), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL, true);
                LOG.info("setJobState: " + jSONObject);
                if (applicationState == ApplicationState.FAILED && z) {
                    failJob(new IllegalStateException("FAILED"));
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("setJobState: Unknown InterruptedException for " + this.masterJobStatePath, e);
            } catch (KeeperException.NodeExistsException e2) {
                throw new IllegalStateException("setJobState: Imposible that " + this.masterJobStatePath + " already exists!", e2);
            } catch (KeeperException e3) {
                throw new IllegalStateException("setJobState: Unknown KeeperException for " + this.masterJobStatePath, e3);
            }
        } catch (JSONException e4) {
            throw new RuntimeException("setJobState: Couldn't put " + applicationState.toString());
        }
    }

    private void setJobStateFailed(String str) {
        getGraphTaskManager().getJobProgressTracker().logFailure(str);
        setJobState(ApplicationState.FAILED, -1L, -1L, false);
        failJob(new IllegalStateException(str));
    }

    private List<InputSplit> generateInputSplits(GiraphInputFormat giraphInputFormat, int i, String str) {
        String str2 = "generate" + str + "InputSplits";
        try {
            List<InputSplit> splits = giraphInputFormat.getSplits(getContext(), i);
            float f = GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT.get(getConfiguration());
            if (f != GiraphConstants.INPUT_SPLIT_SAMPLE_PERCENT.getDefaultValue()) {
                List<InputSplit> subList = splits.subList(0, (int) ((f * splits.size()) / 100.0f));
                LOG.warn(str2 + ": Using sampling - Processing only " + subList.size() + " instead of " + splits.size() + " expected splits.");
                return subList;
            }
            if (LOG.isInfoEnabled()) {
                LOG.info(str2 + ": Got " + splits.size() + " input splits for " + i + " input threads");
            }
            return splits;
        } catch (IOException e) {
            throw new IllegalStateException(str2 + ": Got IOException", e);
        } catch (InterruptedException e2) {
            throw new IllegalStateException(str2 + ": Got InterruptedException", e2);
        }
    }

    private void failJob(Exception exc) {
        LOG.fatal("failJob: Killing job " + getJobId());
        LOG.fatal("failJob: exception " + exc.toString());
        try {
            try {
                if (getConfiguration().isPureYarnJob()) {
                    throw new RuntimeException("BspServiceMaster (YARN profile) is FAILING this task, throwing exception to end job run.", exc);
                }
                RunningJob job = new JobClient(getContext().getConfiguration()).getJob(JobID.forName(getJobId()));
                if (job != null) {
                    job.killJob();
                } else {
                    LOG.error("Jon not found for jobId=" + getJobId());
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } finally {
            failureCleanup(exc);
        }
    }

    private List<WorkerInfo> getWorkerInfosFromPath(String str, boolean z) {
        ArrayList arrayList = new ArrayList();
        try {
            for (String str2 : getZkExt().getChildrenExt(str, z, false, true)) {
                WorkerInfo workerInfo = new WorkerInfo();
                WritableUtils.readFieldsFromZnode(getZkExt(), str2, true, null, workerInfo);
                arrayList.add(workerInfo);
            }
            return arrayList;
        } catch (InterruptedException e) {
            throw new IllegalStateException("getWorkers: Got InterruptedStateException", e);
        } catch (KeeperException e2) {
            throw new IllegalStateException("getWorkers: Got KeeperException", e2);
        }
    }

    private void getAllWorkerInfos(long j, List<WorkerInfo> list, List<WorkerInfo> list2) {
        String workerInfoHealthyPath = getWorkerInfoHealthyPath(getApplicationAttempt(), j);
        String workerInfoUnhealthyPath = getWorkerInfoUnhealthyPath(getApplicationAttempt(), j);
        try {
            getZkExt().createOnceExt(workerInfoHealthyPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            try {
                getZkExt().createOnceExt(workerInfoUnhealthyPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
                List<WorkerInfo> workerInfosFromPath = getWorkerInfosFromPath(workerInfoHealthyPath, true);
                List<WorkerInfo> workerInfosFromPath2 = getWorkerInfosFromPath(workerInfoUnhealthyPath, false);
                list.clear();
                if (workerInfosFromPath != null) {
                    Iterator<WorkerInfo> it2 = workerInfosFromPath.iterator();
                    while (it2.hasNext()) {
                        list.add(it2.next());
                    }
                }
                list2.clear();
                if (workerInfosFromPath2 != null) {
                    Iterator<WorkerInfo> it3 = workerInfosFromPath2.iterator();
                    while (it3.hasNext()) {
                        list2.add(it3.next());
                    }
                }
            } catch (InterruptedException e) {
                throw new IllegalStateException("getWorkers: InterruptedException", e);
            } catch (KeeperException e2) {
                throw new IllegalStateException("getWorkers: KeeperException", e2);
            }
        } catch (InterruptedException e3) {
            throw new IllegalStateException("getWorkers: InterruptedException", e3);
        } catch (KeeperException e4) {
            throw new IllegalStateException("getWorkers: KeeperException", e4);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public List<WorkerInfo> checkWorkers() {
        boolean z = true;
        long milliseconds = SystemTime.get().getMilliseconds() + this.maxSuperstepWaitMsecs;
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = -1;
        while (true) {
            if (SystemTime.get().getMilliseconds() >= milliseconds) {
                break;
            }
            getContext().progress();
            getAllWorkerInfos(getSuperstep(), arrayList, arrayList2);
            i = arrayList.size() + arrayList2.size();
            if ((i * 100.0f) / this.maxWorkers >= this.minPercentResponded) {
                z = false;
                break;
            }
            getContext().setStatus(getGraphTaskManager().getGraphFunctions() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + "checkWorkers: Only found " + i + " responses of " + this.maxWorkers + " needed to start superstep " + getSuperstep());
            if (getWorkerHealthRegistrationChangedEvent().waitMsecs(this.eventWaitMsecs)) {
                if (LOG.isDebugEnabled()) {
                    LOG.debug("checkWorkers: Got event that health registration changed, not using poll attempt");
                }
                getWorkerHealthRegistrationChangedEvent().reset();
            } else if (LOG.isInfoEnabled()) {
                LOG.info("checkWorkers: Only found " + i + " responses of " + this.maxWorkers + " needed to start superstep " + getSuperstep() + ".  Reporting every " + this.eventWaitMsecs + " msecs, " + (milliseconds - SystemTime.get().getMilliseconds()) + " more msecs left before giving up.");
                if (this.maxWorkers - i <= this.partitionLongTailMinPrint) {
                    logMissingWorkersOnSuperstep(arrayList, arrayList2);
                }
            }
        }
        if (z) {
            LOG.error("checkWorkers: Did not receive enough processes in time (only " + i + " of " + this.minWorkers + " required) after waiting " + this.maxSuperstepWaitMsecs + "msecs).  This occurs if you do not have enough map tasks available simultaneously on your Hadoop instance to fulfill the number of requested workers.");
            return null;
        }
        if (arrayList.size() >= this.minWorkers) {
            getContext().setStatus(getGraphTaskManager().getGraphFunctions() + MinimalPrettyPrinter.DEFAULT_ROOT_VALUE_SEPARATOR + "checkWorkers: Done - Found " + i + " responses of " + this.maxWorkers + " needed to start superstep " + getSuperstep());
            return arrayList;
        }
        LOG.error("checkWorkers: Only " + arrayList.size() + " available when " + this.minWorkers + " are required.");
        logMissingWorkersOnSuperstep(arrayList, arrayList2);
        return null;
    }

    private void logMissingWorkersOnSuperstep(List<WorkerInfo> list, List<WorkerInfo> list2) {
        if (LOG.isInfoEnabled()) {
            TreeSet treeSet = new TreeSet();
            Iterator<WorkerInfo> it2 = list.iterator();
            while (it2.hasNext()) {
                treeSet.add(Integer.valueOf(it2.next().getTaskId()));
            }
            Iterator<WorkerInfo> it3 = list2.iterator();
            while (it3.hasNext()) {
                treeSet.add(Integer.valueOf(it3.next().getTaskId()));
            }
            for (int i = 1; i <= this.maxWorkers; i++) {
                if (!treeSet.contains(Integer.valueOf(i)) && i != getTaskPartition()) {
                    LOG.info("logMissingWorkersOnSuperstep: No response from partition " + i + " (could be master)");
                }
            }
        }
    }

    private int createInputSplits(GiraphInputFormat giraphInputFormat, InputSplitPaths inputSplitPaths, String str) {
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        String str2 = "create" + str + "InputSplits";
        String path = inputSplitPaths.getPath();
        try {
            if (getZkExt().exists(path, false) != null) {
                LOG.info(path + " already exists, no need to create");
                return Integer.parseInt(new String(getZkExt().getData(path, false, (Stat) null), Charset.defaultCharset()));
            }
        } catch (InterruptedException e) {
            throw new IllegalStateException(str2 + ": InterruptedException", e);
        } catch (KeeperException.NoNodeException e2) {
            if (LOG.isInfoEnabled()) {
                LOG.info(str2 + ": Need to create the input splits at " + path);
            }
        } catch (KeeperException e3) {
            throw new IllegalStateException(str2 + ": KeeperException", e3);
        }
        List<WorkerInfo> checkWorkers = checkWorkers();
        if (checkWorkers == null) {
            setJobStateFailed("Not enough healthy workers to create input splits");
            return -1;
        }
        int size = checkWorkers.size() * configuration.getNumInputSplitsThreads();
        List<InputSplit> generateInputSplits = generateInputSplits(giraphInputFormat, size, str);
        if (generateInputSplits.isEmpty()) {
            LOG.fatal(str2 + ": Failing job due to 0 input splits, check input of " + giraphInputFormat.getClass().getName() + "!");
            getContext().setStatus("Failing job due to 0 input splits, check input of " + giraphInputFormat.getClass().getName() + "!");
            setJobStateFailed("Please check your input tables - partitions which you specified are missing. Failing the job!!!");
        }
        if (size > generateInputSplits.size()) {
            LOG.warn(str2 + ": Number of inputSplits=" + generateInputSplits.size() + " < " + size + "=total number of input threads, some threads will be not used");
        }
        int i = configuration.getInt(NUM_MASTER_ZK_INPUT_SPLIT_THREADS, 1);
        if (LOG.isInfoEnabled()) {
            LOG.info(str2 + ": Starting to write input split data to zookeeper with " + i + " threads");
        }
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(i);
        boolean z = GiraphConstants.USE_INPUT_SPLIT_LOCALITY.get(configuration);
        for (int i2 = 0; i2 < generateInputSplits.size(); i2++) {
            newFixedThreadPool.submit(new LogStacktraceCallable(new WriteInputSplit(giraphInputFormat, generateInputSplits.get(i2), path, i2, z)));
        }
        newFixedThreadPool.shutdown();
        ProgressableUtils.awaitExecutorTermination(newFixedThreadPool, getContext());
        if (LOG.isInfoEnabled()) {
            LOG.info(str2 + ": Done writing input split data to zookeeper");
        }
        try {
            getZkExt().createExt(inputSplitPaths.getAllReadyPath(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
        } catch (InterruptedException e4) {
            throw new IllegalStateException(str2 + ": IllegalStateException", e4);
        } catch (KeeperException.NodeExistsException e5) {
            LOG.info(str2 + ": Node " + inputSplitPaths.getAllReadyPath() + " already exists.");
        } catch (KeeperException e6) {
            throw new IllegalStateException(str2 + ": KeeperException", e6);
        }
        return generateInputSplits.size();
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public int createMappingInputSplits() {
        if (getConfiguration().hasMappingInputFormat()) {
            return createInputSplits(getConfiguration().createWrappedMappingInputFormat(), this.mappingInputSplitsPaths, "Mapping");
        }
        return 0;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public int createVertexInputSplits() {
        if (getConfiguration().hasVertexInputFormat()) {
            return createInputSplits(getConfiguration().createWrappedVertexInputFormat(), this.vertexInputSplitsPaths, "Vertex");
        }
        return 0;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public int createEdgeInputSplits() {
        if (getConfiguration().hasEdgeInputFormat()) {
            return createInputSplits(getConfiguration().createWrappedEdgeInputFormat(), this.edgeInputSplitsPaths, "Edge");
        }
        return 0;
    }

    @Override // org.apache.giraph.bsp.CentralizedService
    public List<WorkerInfo> getWorkerInfoList() {
        return this.chosenWorkerInfoList;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public MasterAggregatorHandler getGlobalCommHandler() {
        return this.globalCommHandler;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public AggregatorToGlobalCommTranslation getAggregatorTranslationHandler() {
        return this.aggregatorTranslation;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public MasterCompute getMasterCompute() {
        return this.masterCompute;
    }

    /* JADX WARN: Multi-variable type inference failed */
    private Collection<PartitionOwner> prepareCheckpointRestart(long j) throws IOException, KeeperException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        FileSystem fs = getFs();
        String str = getSavedCheckpointBasePath(j) + CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX;
        LOG.info("Loading checkpoint from " + str);
        FSDataInputStream open = fs.open(new Path(str));
        GlobalStats globalStats = new GlobalStats();
        globalStats.readFields(open);
        updateCounters(globalStats);
        SuperstepClasses superstepClasses = new SuperstepClasses();
        superstepClasses.readFields(open);
        getConfiguration().updateSuperstepClasses(superstepClasses);
        int readInt = open.readInt();
        Int2ObjectOpenHashMap int2ObjectOpenHashMap = new Int2ObjectOpenHashMap();
        for (WorkerInfo workerInfo : this.chosenWorkerInfoList) {
            int2ObjectOpenHashMap.put(workerInfo.getTaskId(), (int) workerInfo);
        }
        String readUTF = open.readUTF();
        for (int i = 0; i < readInt; i++) {
            int readInt2 = open.readInt();
            FSDataInputStream open2 = fs.open(new Path(readUTF + "." + readInt2 + CheckpointingUtils.CHECKPOINT_METADATA_POSTFIX));
            long readInt3 = open2.readInt();
            WorkerInfo workerInfo2 = (WorkerInfo) int2ObjectOpenHashMap.get(readInt2);
            long j2 = 0;
            while (true) {
                long j3 = j2;
                if (j3 < readInt3) {
                    int readInt4 = open2.readInt();
                    BasicPartitionOwner basicPartitionOwner = new BasicPartitionOwner(readInt4, workerInfo2);
                    arrayList.add(basicPartitionOwner);
                    LOG.info("prepareCheckpointRestart partitionId=" + readInt4 + " assigned to " + basicPartitionOwner);
                    j2 = j3 + 1;
                }
            }
            open2.close();
        }
        Collections.sort(arrayList, new Comparator<PartitionOwner>() { // from class: org.apache.giraph.master.BspServiceMaster.1
            @Override // java.util.Comparator
            public int compare(PartitionOwner partitionOwner, PartitionOwner partitionOwner2) {
                return Integer.compare(partitionOwner.getPartitionId(), partitionOwner2.getPartitionId());
            }
        });
        this.globalCommHandler.readFields(open);
        this.aggregatorTranslation.readFields(open);
        this.masterCompute.readFields(open);
        open.close();
        return arrayList;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void setup() {
        if (getRestartedSuperstep() != Long.MIN_VALUE) {
            GiraphStats.getInstance().getSuperstepCounter().setValue(getRestartedSuperstep());
        }
        for (MasterObserver masterObserver : this.observers) {
            masterObserver.preApplication();
            getContext().progress();
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:26:0x00f5, code lost:
    
        org.apache.giraph.counters.GiraphStats.getInstance().getCurrentMasterTaskPartition().setValue(getTaskPartition());
        r8.globalCommHandler = new org.apache.giraph.master.MasterAggregatorHandler(getConfiguration(), getContext());
        r8.aggregatorTranslation = new org.apache.giraph.master.AggregatorToGlobalCommTranslation(getConfiguration(), r8.globalCommHandler);
        r8.globalCommHandler.initialize(r8);
        r8.masterCompute = getConfiguration().createMasterCompute();
        r8.masterCompute.setMasterService(r8);
        r8.masterInfo = new org.apache.giraph.master.MasterInfo();
        r8.masterServer = new org.apache.giraph.comm.netty.NettyMasterServer(getConfiguration(), r8, getContext(), getGraphTaskManager().createUncaughtExceptionHandler());
        r8.masterInfo.setInetSocketAddress(r8.masterServer.getMyAddress());
        r8.masterInfo.setTaskId(getTaskPartition());
        r8.masterClient = new org.apache.giraph.comm.netty.NettyMasterClient(getContext(), getConfiguration(), r8, getGraphTaskManager().createUncaughtExceptionHandler());
     */
    /* JADX WARN: Code restructure failed: missing block: B:27:0x01a6, code lost:
    
        if (org.apache.giraph.master.BspServiceMaster.LOG.isInfoEnabled() == false) goto L29;
     */
    /* JADX WARN: Code restructure failed: missing block: B:28:0x01a9, code lost:
    
        org.apache.giraph.master.BspServiceMaster.LOG.info("becomeMaster: I am now the master!");
     */
    /* JADX WARN: Code restructure failed: missing block: B:29:0x01b2, code lost:
    
        r8.isMaster = true;
     */
    /* JADX WARN: Code restructure failed: missing block: B:30:0x01bb, code lost:
    
        return r8.isMaster;
     */
    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public boolean becomeMaster() {
        /*
            Method dump skipped, instructions count: 503
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.giraph.master.BspServiceMaster.becomeMaster():boolean");
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public MasterInfo getMasterInfo() {
        return this.masterInfo;
    }

    private GlobalStats aggregateWorkerStats(long j) {
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        Class<?> cls = this.masterGraphPartitioner.createPartitionStats().getClass();
        GlobalStats globalStats = new GlobalStats();
        try {
            List<String> childrenExt = getZkExt().getChildrenExt(getWorkerFinishedPath(getApplicationAttempt(), j), false, false, true);
            AggregatedMetrics aggregatedMetrics = new AggregatedMetrics();
            this.allPartitionStatsList.clear();
            for (String str : childrenExt) {
                String name = FilenameUtils.getName(str);
                try {
                    JSONObject jSONObject = new JSONObject(new String(getZkExt().getData(str, false, (Stat) null), Charset.defaultCharset()));
                    for (PartitionStats partitionStats : WritableUtils.readListFieldsFromByteArray(Base64.decode(jSONObject.getString(BspService.JSONOBJ_PARTITION_STATS_KEY)), cls, configuration)) {
                        globalStats.addPartitionStats(partitionStats);
                        this.allPartitionStatsList.add(partitionStats);
                    }
                    globalStats.addMessageCount(jSONObject.getLong(BspService.JSONOBJ_NUM_MESSAGES_KEY));
                    globalStats.addMessageBytesCount(jSONObject.getLong(BspService.JSONOBJ_NUM_MESSAGE_BYTES_KEY));
                    if (configuration.metricsEnabled() && jSONObject.has(BspService.JSONOBJ_METRICS_KEY)) {
                        WorkerSuperstepMetrics workerSuperstepMetrics = new WorkerSuperstepMetrics();
                        WritableUtils.readFieldsFromByteArray(Base64.decode(jSONObject.getString(BspService.JSONOBJ_METRICS_KEY)), workerSuperstepMetrics);
                        aggregatedMetrics.add(workerSuperstepMetrics, name);
                    }
                } catch (IOException e) {
                    throw new IllegalStateException("aggregateWorkerStats: IOException", e);
                } catch (InterruptedException e2) {
                    throw new IllegalStateException("aggregateWorkerStats: InterruptedException", e2);
                } catch (KeeperException e3) {
                    throw new IllegalStateException("aggregateWorkerStats: KeeperException", e3);
                } catch (JSONException e4) {
                    throw new IllegalStateException("aggregateWorkerStats: JSONException", e4);
                }
            }
            if (configuration.metricsEnabled()) {
                if (GiraphConstants.METRICS_DIRECTORY.isDefaultValue(configuration)) {
                    aggregatedMetrics.print(j, System.err);
                } else {
                    printAggregatedMetricsToHDFS(j, aggregatedMetrics);
                }
            }
            if (LOG.isInfoEnabled()) {
                LOG.info("aggregateWorkerStats: Aggregation found " + globalStats + " on superstep = " + getSuperstep());
            }
            return globalStats;
        } catch (InterruptedException e5) {
            throw new IllegalStateException("aggregateWorkerStats: InterruptedException", e5);
        } catch (KeeperException e6) {
            throw new IllegalStateException("aggregateWorkerStats: KeeperException", e6);
        }
    }

    private void printAggregatedMetricsToHDFS(long j, AggregatedMetrics aggregatedMetrics) {
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        PrintStream printStream = null;
        Path path = new Path(GiraphConstants.METRICS_DIRECTORY.get(configuration));
        Path path2 = new Path(GiraphConstants.METRICS_DIRECTORY.get(configuration) + "/superstep_" + j + ".metrics");
        try {
            try {
                FileSystem fileSystem = FileSystem.get(configuration);
                if (!fileSystem.exists(path)) {
                    fileSystem.mkdirs(path);
                }
                if (fileSystem.exists(path2)) {
                    throw new RuntimeException("printAggregatedMetricsToHDFS: metrics file exists");
                }
                PrintStream printStream2 = new PrintStream((OutputStream) fileSystem.create(path2), false, Charset.defaultCharset().name());
                aggregatedMetrics.print(j, printStream2);
                if (printStream2 != null) {
                    printStream2.close();
                }
            } catch (IOException e) {
                throw new RuntimeException("printAggregatedMetricsToHDFS: error creating metrics file", e);
            }
        } catch (Throwable th) {
            if (0 != 0) {
                printStream.close();
            }
            throw th;
        }
    }

    private void finalizeCheckpoint(long j, List<WorkerInfo> list) throws IOException, KeeperException, InterruptedException {
        Path path = new Path(getCheckpointBasePath(j) + CheckpointingUtils.CHECKPOINT_FINALIZED_POSTFIX);
        try {
            getFs().delete(path, false);
        } catch (IOException e) {
            LOG.warn("finalizedValidCheckpointPrefixes: Removed old file " + path);
        }
        DataOutput create = getFs().create(path);
        create.write(getZkExt().getData(getSuperstepFinishedPath(getApplicationAttempt(), j - 1), false, (Stat) null));
        create.writeInt(list.size());
        create.writeUTF(getCheckpointBasePath(j));
        Iterator<WorkerInfo> it2 = list.iterator();
        while (it2.hasNext()) {
            create.writeInt(it2.next().getTaskId());
        }
        this.globalCommHandler.write(create);
        this.aggregatorTranslation.write(create);
        this.masterCompute.write(create);
        create.close();
        this.lastCheckpointedSuperstep = j;
        GiraphStats.getInstance().getLastCheckpointedSuperstep().setValue(j);
    }

    private void assignPartitionOwners() {
        Collection<PartitionOwner> prepareCheckpointRestart;
        if (getSuperstep() == -1) {
            prepareCheckpointRestart = this.masterGraphPartitioner.createInitialPartitionOwners(this.chosenWorkerInfoList, this.maxWorkers);
            if (prepareCheckpointRestart.isEmpty()) {
                throw new IllegalStateException("assignAndExchangePartitions: No partition owners set");
            }
        } else if (getRestartedSuperstep() == getSuperstep()) {
            try {
                prepareCheckpointRestart = prepareCheckpointRestart(getSuperstep());
                this.masterGraphPartitioner.setPartitionOwners(prepareCheckpointRestart);
            } catch (IOException e) {
                throw new IllegalStateException("assignPartitionOwners: IOException on preparing", e);
            } catch (InterruptedException e2) {
                throw new IllegalStateException("assignPartitionOwners: InteruptedException on preparing", e2);
            } catch (KeeperException e3) {
                throw new IllegalStateException("assignPartitionOwners: KeeperException on preparing", e3);
            }
        } else {
            prepareCheckpointRestart = this.masterGraphPartitioner.generateChangedPartitionOwners(this.allPartitionStatsList, this.chosenWorkerInfoList, this.maxWorkers, getSuperstep());
            PartitionUtils.analyzePartitionStats(prepareCheckpointRestart, this.allPartitionStatsList);
        }
        checkPartitions(this.masterGraphPartitioner.getCurrentPartitionOwners());
        if (!prepareCheckpointRestart.isEmpty()) {
            String partitionExchangePath = getPartitionExchangePath(getApplicationAttempt(), getSuperstep());
            try {
                getZkExt().createOnceExt(partitionExchangePath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            } catch (InterruptedException e4) {
                throw new IllegalStateException("assignPartitionOwners: InterruptedException creating " + partitionExchangePath);
            } catch (KeeperException e5) {
                throw new IllegalStateException("assignPartitionOwners: KeeperException creating " + partitionExchangePath);
            }
        }
        WritableUtils.writeToZnode(getZkExt(), getAddressesAndPartitionsPath(getApplicationAttempt(), getSuperstep()), -1, new AddressesAndPartitionsWritable(this.masterInfo, this.chosenWorkerInfoList, prepareCheckpointRestart));
    }

    private void checkPartitions(Collection<PartitionOwner> collection) {
        Iterator<PartitionOwner> it2 = collection.iterator();
        while (it2.hasNext()) {
            int partitionId = it2.next().getPartitionId();
            if (partitionId < 0 || partitionId >= collection.size()) {
                throw new IllegalStateException("checkPartitions: Invalid partition id " + partitionId + " - partition ids must be values from 0 to (numPartitions - 1)");
            }
        }
    }

    private Collection<WorkerInfo> superstepChosenWorkerAlive(String str, List<WorkerInfo> list) throws KeeperException, InterruptedException {
        HashSet hashSet = new HashSet(getWorkerInfosFromPath(str, false));
        ArrayList arrayList = new ArrayList();
        for (WorkerInfo workerInfo : list) {
            if (!hashSet.contains(workerInfo)) {
                arrayList.add(workerInfo);
            }
        }
        return arrayList;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void restartFromCheckpoint(long j) {
        zkDeleteNode(this.vertexInputSplitsPaths.getPath());
        zkDeleteNode(this.edgeInputSplitsPaths.getPath());
        setApplicationAttempt(getApplicationAttempt() + 1);
        setCachedSuperstep(j);
        setRestartedSuperstep(j);
        setJobState(ApplicationState.START_SUPERSTEP, getApplicationAttempt(), j);
    }

    private void zkDeleteNode(String str) {
        try {
            getZkExt().deleteExt(str, -1, true);
        } catch (InterruptedException e) {
            throw new RuntimeException("zkDeleteNode: InterruptedException", e);
        } catch (KeeperException.NoNodeException e2) {
            LOG.info("zkDeleteNode: node has already been removed " + str);
        } catch (KeeperException e3) {
            throw new RuntimeException("zkDeleteNode: KeeperException", e3);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public long getLastGoodCheckpoint() throws IOException {
        if (this.lastCheckpointedSuperstep == -1) {
            try {
                this.lastCheckpointedSuperstep = getLastCheckpointedSuperstep();
            } catch (IOException e) {
                LOG.fatal("getLastGoodCheckpoint: No last good checkpoints can be found, killing the job.", e);
                failJob(e);
            }
        }
        return this.lastCheckpointedSuperstep;
    }

    private boolean barrierOnWorkerList(String str, List<WorkerInfo> list, BspEvent bspEvent, boolean z) {
        try {
            getZkExt().createOnceExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            ArrayList arrayList = new ArrayList(list.size());
            Iterator<WorkerInfo> it2 = list.iterator();
            while (it2.hasNext()) {
                arrayList.add(it2.next().getHostnameId());
            }
            String workerInfoHealthyPath = getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep());
            long currentTimeMillis = System.currentTimeMillis();
            int i = getContext().getConfiguration().getInt("mapred.task.timeout", 600000);
            ArrayList<WorkerInfo> arrayList2 = new ArrayList();
            while (true) {
                try {
                    List<String> childrenExt = getZkExt().getChildrenExt(str, true, false, false);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("barrierOnWorkerList: Got finished worker list = " + childrenExt + ", size = " + childrenExt.size() + ", worker list = " + list + ", size = " + list.size() + " from " + str);
                    }
                    if (LOG.isInfoEnabled() && System.currentTimeMillis() > currentTimeMillis) {
                        currentTimeMillis = System.currentTimeMillis() + 30000;
                        LOG.info("barrierOnWorkerList: " + childrenExt.size() + " out of " + list.size() + " workers finished on superstep " + getSuperstep() + " on path " + str);
                        if (list.size() - childrenExt.size() < 10) {
                            HashSet newHashSet = Sets.newHashSet(arrayList);
                            newHashSet.removeAll(childrenExt);
                            LOG.info("barrierOnWorkerList: Waiting on " + newHashSet);
                        }
                    }
                    getContext().setStatus(getGraphTaskManager().getGraphFunctions() + " - " + childrenExt.size() + " finished out of " + list.size() + " on superstep " + getSuperstep());
                    if (childrenExt.containsAll(arrayList)) {
                        return true;
                    }
                    for (WorkerInfo workerInfo : arrayList2) {
                        if (!childrenExt.contains(workerInfo.getHostnameId())) {
                            LOG.error("barrierOnWorkerList: no results arived from worker that was pronounced dead: " + workerInfo + " on superstep " + getSuperstep());
                            return false;
                        }
                    }
                    bspEvent.waitMsecs(i / 2);
                    bspEvent.reset();
                    getContext().progress();
                    try {
                        arrayList2.addAll(superstepChosenWorkerAlive(workerInfoHealthyPath, list));
                        if (!z && arrayList2.size() > 0) {
                            LOG.error("barrierOnWorkerList: Missing chosen workers " + arrayList2 + " on superstep " + getSuperstep());
                            return false;
                        }
                    } catch (InterruptedException e) {
                        throw new IllegalStateException("barrierOnWorkerList: InterruptedException - Couldn't get " + workerInfoHealthyPath, e);
                    } catch (KeeperException e2) {
                        throw new IllegalStateException("barrierOnWorkerList: KeeperException - Couldn't get " + workerInfoHealthyPath, e2);
                    }
                } catch (InterruptedException e3) {
                    throw new IllegalStateException("barrierOnWorkerList: IllegalException - Couldn't get children of " + str, e3);
                } catch (KeeperException e4) {
                    throw new IllegalStateException("barrierOnWorkerList: KeeperException - Couldn't get children of " + str, e4);
                }
            }
        } catch (InterruptedException e5) {
            throw new IllegalStateException("barrierOnWorkerList: InterruptedException - Couldn't create " + str, e5);
        } catch (KeeperException e6) {
            throw new IllegalStateException("barrierOnWorkerList: KeeperException - Couldn't create " + str, e6);
        }
    }

    private void cleanUpOldSuperstep(long j) throws InterruptedException {
        if (!GiraphConstants.KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration()) || j < 0) {
            return;
        }
        String str = getSuperstepPath(getApplicationAttempt()) + "/" + j;
        try {
            if (LOG.isInfoEnabled()) {
                LOG.info("coordinateSuperstep: Cleaning up old Superstep " + str);
            }
            getZkExt().deleteExt(str, -1, true);
        } catch (KeeperException.NoNodeException e) {
            LOG.warn("coordinateBarrier: Already cleaned up " + str);
        } catch (KeeperException e2) {
            throw new IllegalStateException("coordinateSuperstep: KeeperException on finalizing checkpoint", e2);
        }
    }

    private void coordinateInputSplits(InputSplitPaths inputSplitPaths, InputSplitEvents inputSplitEvents, String str) {
        String str2 = "coordinate" + str + "InputSplits";
        if (!barrierOnWorkerList(inputSplitPaths.getDonePath(), this.chosenWorkerInfoList, inputSplitEvents.getDoneStateChanged(), false)) {
            throw new IllegalStateException(str2 + ": Worker failed during input split (currently not supported)");
        }
        try {
            getZkExt().createExt(inputSplitPaths.getAllDonePath(), null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
        } catch (InterruptedException e) {
            throw new IllegalStateException(str2 + ": IllegalStateException", e);
        } catch (KeeperException.NodeExistsException e2) {
            LOG.info("coordinateInputSplits: Node " + inputSplitPaths.getAllDonePath() + " already exists.");
        } catch (KeeperException e3) {
            throw new IllegalStateException(str2 + ": KeeperException", e3);
        }
    }

    private void initializeAggregatorInputSuperstep() throws InterruptedException {
        this.globalCommHandler.prepareSuperstep();
        prepareMasterCompute(getSuperstep());
        try {
            this.masterCompute.initialize();
            this.aggregatorTranslation.postMasterCompute();
            this.globalCommHandler.finishSuperstep();
            this.globalCommHandler.sendDataToOwners(this.masterClient);
        } catch (IllegalAccessException e) {
            LOG.fatal("initializeAggregatorInputSuperstep: Failed in access", e);
            throw new RuntimeException("initializeAggregatorInputSuperstep: Failed in access", e);
        } catch (InstantiationException e2) {
            LOG.fatal("initializeAggregatorInputSuperstep: Failed in instantiation", e2);
            throw new RuntimeException("initializeAggregatorInputSuperstep: Failed in instantiation", e2);
        }
    }

    private SuperstepClasses prepareMasterCompute(long j) {
        GraphState graphState = new GraphState(j, GiraphStats.getInstance().getVertices().getValue(), GiraphStats.getInstance().getEdges().getValue(), getContext());
        SuperstepClasses superstepClasses = new SuperstepClasses(getConfiguration());
        this.masterCompute.setGraphState(graphState);
        this.masterCompute.setSuperstepClasses(superstepClasses);
        return superstepClasses;
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public SuperstepState coordinateSuperstep() throws KeeperException, InterruptedException {
        for (MasterObserver masterObserver : this.observers) {
            masterObserver.preSuperstep(getSuperstep());
            getContext().progress();
        }
        this.chosenWorkerInfoList = checkWorkers();
        if (this.chosenWorkerInfoList == null) {
            setJobStateFailed("coordinateSuperstep: Not enough healthy workers for superstep " + getSuperstep());
        } else {
            Collections.sort(this.chosenWorkerInfoList, new Comparator<WorkerInfo>() { // from class: org.apache.giraph.master.BspServiceMaster.2
                @Override // java.util.Comparator
                public int compare(WorkerInfo workerInfo, WorkerInfo workerInfo2) {
                    return Integer.compare(workerInfo.getTaskId(), workerInfo2.getTaskId());
                }
            });
            Iterator<WorkerInfo> it2 = this.chosenWorkerInfoList.iterator();
            while (it2.hasNext()) {
                String str = getWorkerInfoHealthyPath(getApplicationAttempt(), getSuperstep()) + "/" + it2.next().getHostnameId();
                if (getZkExt().exists(str, true) == null) {
                    LOG.warn("coordinateSuperstep: Chosen worker " + str + " is no longer valid, failing superstep");
                }
            }
        }
        if (getSuperstep() >= 0) {
            this.aggregatorTranslation.postMasterCompute();
            this.globalCommHandler.finishSuperstep();
        }
        this.masterClient.openConnections();
        GiraphStats.getInstance().getCurrentWorkers().setValue(this.chosenWorkerInfoList.size());
        assignPartitionOwners();
        if (this.checkpointStatus != CheckpointStatus.NONE) {
            if (!barrierOnWorkerList(getWorkerWroteCheckpointPath(getApplicationAttempt(), getSuperstep()), this.chosenWorkerInfoList, getWorkerWroteCheckpointEvent(), this.checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT)) {
                return SuperstepState.WORKER_FAILURE;
            }
            try {
                finalizeCheckpoint(getSuperstep(), this.chosenWorkerInfoList);
                if (this.checkpointStatus == CheckpointStatus.CHECKPOINT_AND_HALT) {
                    return SuperstepState.CHECKPOINT_AND_HALT;
                }
            } catch (IOException e) {
                throw new IllegalStateException("coordinateSuperstep: IOException on finalizing checkpoint", e);
            }
        }
        if (getSuperstep() >= 0) {
            this.globalCommHandler.sendDataToOwners(this.masterClient);
        }
        if (getSuperstep() == -1) {
            initializeAggregatorInputSuperstep();
            if (getConfiguration().hasMappingInputFormat()) {
                coordinateInputSplits(this.mappingInputSplitsPaths, this.mappingInputSplitsEvents, "Mapping");
            }
            if (getConfiguration().hasVertexInputFormat()) {
                coordinateInputSplits(this.vertexInputSplitsPaths, this.vertexInputSplitsEvents, "Vertex");
            }
            if (getConfiguration().hasEdgeInputFormat()) {
                coordinateInputSplits(this.edgeInputSplitsPaths, this.edgeInputSplitsEvents, "Edge");
            }
        }
        if (!barrierOnWorkerList(getWorkerFinishedPath(getApplicationAttempt(), getSuperstep()), this.chosenWorkerInfoList, getSuperstepStateChangedEvent(), false)) {
            return SuperstepState.WORKER_FAILURE;
        }
        this.globalCommHandler.prepareSuperstep();
        this.aggregatorTranslation.prepareSuperstep();
        SuperstepClasses prepareMasterCompute = prepareMasterCompute(getSuperstep() + 1);
        doMasterCompute();
        GlobalStats aggregateWorkerStats = aggregateWorkerStats(getSuperstep());
        if (this.masterCompute.isHalted() || (aggregateWorkerStats.getFinishedVertexCount() == aggregateWorkerStats.getVertexCount() && aggregateWorkerStats.getMessageCount() == 0)) {
            aggregateWorkerStats.setHaltComputation(true);
        } else if (getZkExt().exists(this.haltComputationPath, false) != null) {
            if (LOG.isInfoEnabled()) {
                LOG.info("Halting computation because halt zookeeper node was created");
            }
            aggregateWorkerStats.setHaltComputation(true);
        }
        if (this.maxNumberOfSupersteps != GiraphConstants.MAX_NUMBER_OF_SUPERSTEPS.getDefaultValue() && getSuperstep() == this.maxNumberOfSupersteps - 1) {
            if (LOG.isInfoEnabled()) {
                LOG.info("coordinateSuperstep: Finished " + this.maxNumberOfSupersteps + " supersteps (max specified by the user), halting");
            }
            aggregateWorkerStats.setHaltComputation(true);
        }
        if (!aggregateWorkerStats.getHaltComputation()) {
            prepareMasterCompute.verifyTypesMatch(getConfiguration(), getSuperstep() != 0);
        }
        getConfiguration().updateSuperstepClasses(prepareMasterCompute);
        this.checkpointStatus = getCheckpointStatus(getSuperstep() + 1);
        aggregateWorkerStats.setCheckpointStatus(this.checkpointStatus);
        WritableUtils.writeToZnode(getZkExt(), getSuperstepFinishedPath(getApplicationAttempt(), getSuperstep()), -1, aggregateWorkerStats, prepareMasterCompute);
        updateCounters(aggregateWorkerStats);
        cleanUpOldSuperstep(getSuperstep() - 1);
        incrCachedSuperstep();
        if (getSuperstep() > 0) {
            GiraphStats.getInstance().getSuperstepCounter().increment();
        }
        SuperstepState superstepState = aggregateWorkerStats.getHaltComputation() ? SuperstepState.ALL_SUPERSTEPS_DONE : SuperstepState.THIS_SUPERSTEP_DONE;
        this.globalCommHandler.writeAggregators(getSuperstep(), superstepState);
        return superstepState;
    }

    private CheckpointStatus getCheckpointStatus(long j) {
        try {
            if (getZkExt().exists(this.basePath + BspService.FORCE_CHECKPOINT_USER_FLAG, false) != null) {
                return CheckpointStatus.CHECKPOINT_AND_HALT;
            }
            if (this.checkpointFrequency == 0) {
                return CheckpointStatus.NONE;
            }
            long j2 = 0 + this.checkpointFrequency;
            if (getRestartedSuperstep() != Long.MIN_VALUE) {
                j2 = getRestartedSuperstep() + this.checkpointFrequency;
            }
            if (j >= j2 && (j - j2) % this.checkpointFrequency == 0) {
                return CheckpointStatus.CHECKPOINT;
            }
            return CheckpointStatus.NONE;
        } catch (InterruptedException e) {
            throw new IllegalStateException("cleanupZooKeeper: Got IllegalStateException", e);
        } catch (KeeperException e2) {
            throw new IllegalStateException("cleanupZooKeeper: Got KeeperException", e2);
        }
    }

    private void doMasterCompute() {
        GiraphTimerContext time = this.masterComputeTimer.time();
        this.masterCompute.compute();
        time.stop();
    }

    private void cleanUpZooKeeper() {
        try {
            getZkExt().createExt(this.cleanedUpPath, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
        } catch (InterruptedException e) {
            throw new IllegalStateException("cleanupZooKeeper: Got IllegalStateException", e);
        } catch (KeeperException.NodeExistsException e2) {
            if (LOG.isInfoEnabled()) {
                LOG.info("cleanUpZooKeeper: Node " + this.cleanedUpPath + " already exists, no need to create.");
            }
        } catch (KeeperException e3) {
            throw new IllegalStateException("cleanupZooKeeper: Got KeeperException", e3);
        }
        int maxTasks = BspInputFormat.getMaxTasks(getConfiguration());
        if (getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL || getGraphTaskManager().getGraphFunctions() == GraphFunctions.ALL_EXCEPT_ZOOKEEPER) {
            maxTasks *= 2;
        }
        while (true) {
            try {
                List<String> childrenExt = getZkExt().getChildrenExt(this.cleanedUpPath, true, false, true);
                if (LOG.isInfoEnabled()) {
                    LOG.info("cleanUpZooKeeper: Got " + childrenExt.size() + " of " + maxTasks + " desired children from " + this.cleanedUpPath);
                }
                if (childrenExt.size() == maxTasks) {
                    break;
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("cleanedUpZooKeeper: Waiting for the children of " + this.cleanedUpPath + " to change since only got " + childrenExt.size() + " nodes.");
                }
                getCleanedUpChildrenChangedEvent().waitForever();
                getCleanedUpChildrenChangedEvent().reset();
            } catch (InterruptedException e4) {
                LOG.error("cleanUpZooKeeper: Got InterruptedException, but will continue", e4);
                return;
            } catch (KeeperException e5) {
                LOG.error("cleanUpZooKeeper: Got KeeperException, but will continue", e5);
                return;
            }
        }
        try {
            if (getConfiguration().isZookeeperExternal() && GiraphConstants.KEEP_ZOOKEEPER_DATA.isFalse(getConfiguration())) {
                if (LOG.isInfoEnabled()) {
                    LOG.info("cleanupZooKeeper: Removing the following path and all children - " + this.basePath + " from ZooKeeper list " + getConfiguration().getZookeeperList());
                }
                getZkExt().deleteExt(this.basePath, -1, true);
            }
        } catch (InterruptedException e6) {
            LOG.error("cleanupZooKeeper: Failed to do cleanup of " + this.basePath + " due to InterruptedException", e6);
        } catch (KeeperException e7) {
            LOG.error("cleanupZooKeeper: Failed to do cleanup of " + this.basePath + " due to KeeperException", e7);
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void postApplication() {
        for (MasterObserver masterObserver : this.observers) {
            masterObserver.postApplication();
            getContext().progress();
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void postSuperstep() {
        for (MasterObserver masterObserver : this.observers) {
            masterObserver.postSuperstep(getSuperstep());
            getContext().progress();
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void failureCleanup(Exception exc) {
        for (MasterObserver masterObserver : this.observers) {
            try {
                masterObserver.applicationFailed(exc);
            } catch (RuntimeException e) {
                LOG.error(e.getClass().getName() + " from observer " + masterObserver.getClass().getName(), e);
            }
            getContext().progress();
        }
    }

    @Override // org.apache.giraph.bsp.CentralizedServiceMaster
    public void cleanup(SuperstepState superstepState) throws IOException {
        ImmutableClassesGiraphConfiguration<I, V, E> configuration = getConfiguration();
        String str = this.cleanedUpPath + "/" + getTaskPartition() + BspService.MASTER_SUFFIX;
        try {
            String createExt = getZkExt().createExt(str, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, true);
            if (LOG.isInfoEnabled()) {
                LOG.info("cleanup: Notifying master its okay to cleanup with " + createExt);
            }
        } catch (InterruptedException e) {
            LOG.error("cleanup: Got InterruptedException, continuing", e);
        } catch (KeeperException.NodeExistsException e2) {
            if (LOG.isInfoEnabled()) {
                LOG.info("cleanup: Couldn't create finished node '" + str);
            }
        } catch (KeeperException e3) {
            LOG.error("cleanup: Got KeeperException, continuing", e3);
        }
        if (this.isMaster) {
            getGraphTaskManager().setIsMaster(true);
            cleanUpZooKeeper();
            if (superstepState == SuperstepState.ALL_SUPERSTEPS_DONE && GiraphConstants.CLEANUP_CHECKPOINTS_AFTER_SUCCESS.get(configuration)) {
                boolean delete = getFs().delete(new Path(this.checkpointBasePath), true);
                if (LOG.isInfoEnabled()) {
                    LOG.info("cleanup: Removed HDFS checkpoint directory (" + this.checkpointBasePath + ") with return = " + delete + " since the job " + getContext().getJobName() + " succeeded ");
                }
            }
            if (superstepState == SuperstepState.CHECKPOINT_AND_HALT) {
                getFs().create(CheckpointingUtils.getCheckpointMarkPath(configuration, getJobId()), true);
                failJob(new Exception("Checkpoint and halt requested. Killing this job."));
            }
            this.globalCommHandler.close();
            this.masterClient.closeConnections();
            this.masterServer.close();
        }
        try {
            getZkExt().close();
        } catch (InterruptedException e4) {
            LOG.error("cleanup: Zookeeper failed to close", e4);
        }
    }

    public final BspEvent getWorkerWroteCheckpointEvent() {
        return this.workerWroteCheckpoint;
    }

    public final BspEvent getSuperstepStateChangedEvent() {
        return this.superstepStateChanged;
    }

    private void checkHealthyWorkerFailure(String str) {
        if (getSuperstepFromPath(str) < getSuperstep()) {
            return;
        }
        Collection<PartitionOwner> currentPartitionOwners = this.masterGraphPartitioner.getCurrentPartitionOwners();
        String healthyHostnameIdFromPath = getHealthyHostnameIdFromPath(str);
        for (PartitionOwner partitionOwner : currentPartitionOwners) {
            WorkerInfo workerInfo = partitionOwner.getWorkerInfo();
            WorkerInfo previousWorkerInfo = partitionOwner.getPreviousWorkerInfo();
            if (workerInfo.getHostnameId().equals(healthyHostnameIdFromPath) || (previousWorkerInfo != null && previousWorkerInfo.getHostnameId().equals(healthyHostnameIdFromPath))) {
                LOG.warn("checkHealthyWorkerFailure: at least one healthy worker went down for superstep " + getSuperstep() + " - " + healthyHostnameIdFromPath + ", will try to restart from checkpointed superstep " + this.lastCheckpointedSuperstep);
                this.superstepStateChanged.signal();
            }
        }
    }

    @Override // org.apache.giraph.bsp.BspService
    public boolean processEvent(WatchedEvent watchedEvent) {
        boolean z = false;
        if (watchedEvent.getPath().contains(BspService.WORKER_HEALTHY_DIR) && watchedEvent.getType() == Watcher.Event.EventType.NodeDeleted) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("processEvent: Healthy worker died (node deleted) in " + watchedEvent.getPath());
            }
            checkHealthyWorkerFailure(watchedEvent.getPath());
            this.superstepStateChanged.signal();
            z = true;
        } else if (watchedEvent.getPath().contains(BspService.WORKER_FINISHED_DIR) && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("processEvent: Worker finished (node change) event - superstepStateChanged signaled");
            }
            this.superstepStateChanged.signal();
            z = true;
        } else if (watchedEvent.getPath().contains(BspService.WORKER_WROTE_CHECKPOINT_DIR) && watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("processEvent: Worker wrote checkpoint (node change) event - workerWroteCheckpoint signaled");
            }
            this.workerWroteCheckpoint.signal();
            z = true;
        }
        return z;
    }

    private void updateCounters(GlobalStats globalStats) {
        GiraphStats giraphStats = GiraphStats.getInstance();
        giraphStats.getVertices().setValue(globalStats.getVertexCount());
        giraphStats.getFinishedVertexes().setValue(globalStats.getFinishedVertexCount());
        giraphStats.getEdges().setValue(globalStats.getEdgeCount());
        giraphStats.getSentMessages().setValue(globalStats.getMessageCount());
        giraphStats.getSentMessageBytes().setValue(globalStats.getMessageBytesCount());
        giraphStats.getAggregateSentMessages().increment(globalStats.getMessageCount());
        giraphStats.getAggregateSentMessageBytes().increment(globalStats.getMessageBytesCount());
    }
}
