package org.apache.giraph.zk;

import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import java.io.File;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.commons.io.FileUtils;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.time.SystemTime;
import org.apache.giraph.time.Time;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/zk/ZooKeeperManager.class */
public class ZooKeeperManager {
    private static final Logger LOG = Logger.getLogger(ZooKeeperManager.class);
    private static final String HOSTNAME_TASK_SEPARATOR = " ";
    private static final String ZOOKEEPER_SERVER_LIST_FILE_PREFIX = "zkServerList_";
    private Mapper<?, ?, ?, ?>.Context context;
    private final ImmutableClassesGiraphConfiguration conf;
    private final int taskPartition;
    private final Path baseDirectory;
    private final Path taskDirectory;
    private final Path serverDirectory;
    private final Path myClosedPath;
    private final int pollMsecs;
    private final FileSystem fs;
    private ZooKeeperRunner zkRunner;
    private final String zkDir;
    private final ZookeeperConfig config;
    private String zkServerHost;
    private int zkServerTask;
    private int zkBasePort;
    private String zkServerPortString;
    private String myHostname;
    private final String jobId;
    private final Time time = SystemTime.get();

    /* loaded from: input_file:org/apache/giraph/zk/ZooKeeperManager$State.class */
    public enum State {
        FAILED,
        FINISHED
    }

    public ZooKeeperManager(Mapper<?, ?, ?, ?>.Context context, ImmutableClassesGiraphConfiguration immutableClassesGiraphConfiguration) throws IOException {
        this.myHostname = null;
        this.context = context;
        this.conf = immutableClassesGiraphConfiguration;
        this.taskPartition = this.conf.getTaskPartition();
        this.jobId = this.conf.getJobId();
        this.baseDirectory = new Path(GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.getWithDefault(this.conf, getFinalZooKeeperPath()));
        this.taskDirectory = new Path(this.baseDirectory, "_task");
        this.serverDirectory = new Path(this.baseDirectory, "_zkServer");
        this.myClosedPath = new Path(this.taskDirectory, new ComputationDoneName(this.taskPartition).getName());
        this.pollMsecs = GiraphConstants.ZOOKEEPER_SERVERLIST_POLL_MSECS.get(this.conf);
        String str = this.conf.get("job.local.dir");
        this.zkDir = this.conf.get(GiraphConstants.ZOOKEEPER_DIR, str != null ? str + "/_bspZooKeeper" : System.getProperty("user.dir") + "/" + GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue());
        this.config = new ZookeeperConfig();
        this.zkBasePort = GiraphConstants.ZOOKEEPER_SERVER_PORT.get(this.conf);
        this.myHostname = this.conf.getLocalHostname();
        this.fs = FileSystem.get(this.conf);
    }

    private String getFinalZooKeeperPath() {
        return GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.getDefaultValue() + "/" + this.jobId;
    }

    public static String getBasePath(Configuration configuration) {
        String str = configuration.get(GiraphConstants.BASE_ZNODE_KEY, "");
        if (str.equals("") || str.startsWith("/")) {
            return str;
        }
        throw new IllegalArgumentException("Value for giraph.zkBaseZNode must start with /: " + str);
    }

    public void setup() throws IOException, InterruptedException {
        createCandidateStamp();
        getZooKeeperServerList();
    }

    public void createCandidateStamp() {
        try {
            this.fs.mkdirs(this.baseDirectory);
            LOG.info("createCandidateStamp: Made the directory " + this.baseDirectory);
        } catch (IOException e) {
            LOG.error("createCandidateStamp: Failed to mkdirs " + this.baseDirectory);
        }
        try {
            this.fs.mkdirs(this.serverDirectory);
            LOG.info("createCandidateStamp: Made the directory " + this.serverDirectory);
        } catch (IOException e2) {
            LOG.error("createCandidateStamp: Failed to mkdirs " + this.serverDirectory);
        }
        try {
            if (!this.fs.getFileStatus(this.baseDirectory).isDir()) {
                throw new IllegalArgumentException("createCandidateStamp: " + this.baseDirectory + " is not a directory, but should be.");
            }
            Path path = new Path(this.taskDirectory, this.myHostname + " " + this.taskPartition);
            try {
                if (LOG.isInfoEnabled()) {
                    LOG.info("createCandidateStamp: Creating my filestamp " + path);
                }
                this.fs.createNewFile(path);
            } catch (IOException e3) {
                LOG.error("createCandidateStamp: Failed (maybe previous task failed) to create filestamp " + path, e3);
            }
        } catch (IOException e4) {
            throw new IllegalArgumentException("createCandidateStamp: Couldn't get file status for base directory " + this.baseDirectory + ".  If there is an issue with this directory, please set an accesible base directory with the Hadoop configuration option " + GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.getKey(), e4);
        }
    }

    private static void createNewFileWithRetries(FileSystem fileSystem, Path path, int i, int i2) {
        int i3 = 0;
        while (i3 < i) {
            try {
                fileSystem.createNewFile(path);
                return;
            } catch (IOException e) {
                LOG.warn("createNewFileWithRetries: Failed to create file at path " + path + " on attempt " + i3 + " of " + i + ".", e);
                i3++;
                Uninterruptibles.sleepUninterruptibly(i2, TimeUnit.MILLISECONDS);
            }
        }
        throw new IllegalStateException("createNewFileWithRetries: Failed to create file at path " + path + " after " + i3 + " attempts");
    }

    private void createZooKeeperClosedStamp() {
        LOG.info("createZooKeeperClosedStamp: Creating my filestamp " + this.myClosedPath);
        createNewFileWithRetries(this.fs, this.myClosedPath, this.conf.getHdfsFileCreationRetries(), this.conf.getHdfsFileCreationRetryWaitMs());
    }

    public boolean computationDone() {
        try {
            return this.fs.exists(this.myClosedPath);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private void createZooKeeperServerList() throws IOException, InterruptedException {
        FileStatus[] listStatus;
        while (true) {
            listStatus = this.fs.listStatus(this.taskDirectory);
            if (listStatus.length > 0) {
                break;
            } else {
                Thread.sleep(this.pollMsecs);
            }
        }
        FileStatus fileStatus = listStatus[0];
        String[] split = fileStatus.getPath().getName().split(" ");
        Preconditions.checkState(split.length == 2, "createZooKeeperServerList: Task 0 failed to parse " + fileStatus.getPath().getName());
        Path path = new Path(this.baseDirectory, ZOOKEEPER_SERVER_LIST_FILE_PREFIX + split[0] + " " + split[1]);
        if (LOG.isInfoEnabled()) {
            LOG.info("createZooKeeperServerList: Creating the final ZooKeeper file '" + path + "'");
        }
        this.fs.createNewFile(path);
    }

    private String getServerListFile() throws IOException {
        String str = null;
        FileStatus[] listStatus = this.fs.listStatus(this.baseDirectory);
        int length = listStatus.length;
        int i = 0;
        while (true) {
            if (i >= length) {
                break;
            }
            FileStatus fileStatus = listStatus[i];
            if (fileStatus.getPath().getName().startsWith(ZOOKEEPER_SERVER_LIST_FILE_PREFIX)) {
                str = fileStatus.getPath().getName();
                break;
            }
            i++;
        }
        return str;
    }

    private void getZooKeeperServerList() throws IOException, InterruptedException {
        String serverListFile;
        if (this.taskPartition == 0 && getServerListFile() == null) {
            createZooKeeperServerList();
        }
        while (true) {
            serverListFile = getServerListFile();
            if (LOG.isInfoEnabled()) {
                LOG.info("getZooKeeperServerList: For task " + this.taskPartition + ", got file '" + serverListFile + "' (polling period is " + this.pollMsecs + ")");
            }
            if (serverListFile != null) {
                break;
            }
            try {
                Thread.sleep(this.pollMsecs);
            } catch (InterruptedException e) {
                LOG.warn("getZooKeeperServerList: Strange interrupted exception " + e.getMessage());
            }
        }
        String[] split = serverListFile.substring(ZOOKEEPER_SERVER_LIST_FILE_PREFIX.length()).split(" ");
        if (LOG.isInfoEnabled()) {
            LOG.info("getZooKeeperServerList: Found " + Arrays.toString(split) + " hosts in filename '" + serverListFile + "'");
        }
        this.zkServerHost = split[0];
        this.zkServerTask = Integer.parseInt(split[1]);
        updateZkPortString();
    }

    private void updateZkPortString() {
        this.zkServerPortString = this.zkServerHost + ":" + this.zkBasePort;
    }

    public String getZooKeeperServerPortString() {
        return this.zkServerPortString;
    }

    private void generateZooKeeperConfig() {
        if (LOG.isInfoEnabled()) {
            LOG.info("generateZooKeeperConfig: with base port " + this.zkBasePort);
        }
        File file = new File(this.zkDir);
        boolean mkdirs = file.mkdirs();
        if (LOG.isInfoEnabled()) {
            LOG.info("generateZooKeeperConfigFile: Make directory of " + file.getName() + " = " + mkdirs);
        }
        System.setProperty("zookeeper.snapCount", Integer.toString(50000));
        System.setProperty("zookeeper.forceSync", GiraphConstants.ZOOKEEPER_FORCE_SYNC.get(this.conf) ? "yes" : "no");
        System.setProperty("zookeeper.skipACL", GiraphConstants.ZOOKEEPER_SKIP_ACL.get(this.conf) ? "yes" : "no");
        this.config.setDataDir(this.zkDir);
        this.config.setDataLogDir(this.zkDir);
        this.config.setClientPortAddress(new InetSocketAddress(this.zkBasePort));
        this.config.setMinSessionTimeout(this.conf.getZooKeeperMinSessionTimeout());
        this.config.setMaxSessionTimeout(this.conf.getZooKeeperMaxSessionTimeout());
    }

    public void onlineZooKeeperServer() throws IOException {
        if (this.zkServerTask == this.taskPartition) {
            File file = new File(this.zkDir);
            try {
                if (LOG.isInfoEnabled()) {
                    LOG.info("onlineZooKeeperServers: Trying to delete old directory " + this.zkDir);
                }
                FileUtils.deleteDirectory(file);
            } catch (IOException e) {
                LOG.warn("onlineZooKeeperServers: Failed to delete directory " + this.zkDir, e);
            }
            generateZooKeeperConfig();
            synchronized (this) {
                this.zkRunner = createRunner();
                int start = this.zkRunner.start(this.zkDir, this.config);
                if (start > 0) {
                    this.zkBasePort = start;
                    updateZkPortString();
                }
            }
            int i = 0;
            int zookeeperConnectionAttempts = this.conf.getZookeeperConnectionAttempts();
            while (i < zookeeperConnectionAttempts) {
                try {
                    if (LOG.isInfoEnabled()) {
                        LOG.info("onlineZooKeeperServers: Connect attempt " + i + " of " + zookeeperConnectionAttempts + " max trying to connect to " + this.myHostname + ":" + this.zkBasePort + " with poll msecs = " + this.pollMsecs);
                    }
                    InetSocketAddress inetSocketAddress = new InetSocketAddress(this.myHostname, this.zkBasePort);
                    new Socket().connect(inetSocketAddress, 5000);
                    if (LOG.isInfoEnabled()) {
                        LOG.info("onlineZooKeeperServers: Connected to " + inetSocketAddress + "!");
                    }
                    break;
                } catch (ConnectException e2) {
                    LOG.warn("onlineZooKeeperServers: Got ConnectException", e2);
                    i++;
                    try {
                        Thread.sleep(this.pollMsecs);
                    } catch (InterruptedException e3) {
                        LOG.warn("onlineZooKeeperServers: Sleep of " + this.pollMsecs + " interrupted - " + e3.getMessage());
                    }
                } catch (SocketTimeoutException e4) {
                    LOG.warn("onlineZooKeeperServers: Got SocketTimeoutException", e4);
                    i++;
                    Thread.sleep(this.pollMsecs);
                } catch (IOException e5) {
                    LOG.warn("onlineZooKeeperServers: Got IOException", e5);
                    i++;
                    Thread.sleep(this.pollMsecs);
                }
            }
            if (i == zookeeperConnectionAttempts) {
                throw new IllegalStateException("onlineZooKeeperServers: Failed to connect in " + i + " tries!");
            }
            Path path = new Path(this.serverDirectory, this.myHostname + " " + this.taskPartition + " " + this.zkBasePort);
            try {
                if (LOG.isInfoEnabled()) {
                    LOG.info("onlineZooKeeperServers: Creating my filestamp " + path);
                }
                this.fs.createNewFile(path);
                return;
            } catch (IOException e6) {
                LOG.error("onlineZooKeeperServers: Failed (maybe previous task failed) to create filestamp " + path, e6);
                return;
            }
        }
        int i2 = 0;
        String str = null;
        while (true) {
            try {
                FileStatus[] listStatus = this.fs.listStatus(this.serverDirectory);
                if (listStatus != null && listStatus.length > 0) {
                    for (int i3 = 0; i3 < listStatus.length; i3++) {
                        String[] split = listStatus[i3].getPath().getName().split(" ");
                        if (split.length != 3) {
                            throw new RuntimeException("getZooKeeperServerList: Task 0 failed to parse " + listStatus[i3].getPath().getName());
                            break;
                        }
                        str = split[0];
                        this.zkBasePort = Integer.parseInt(split[2]);
                        updateZkPortString();
                    }
                    if (LOG.isInfoEnabled()) {
                        LOG.info("onlineZooKeeperServers: Got " + str + " on port " + this.zkBasePort + " (polling period is " + this.pollMsecs + ") on attempt " + i2);
                    }
                    if (this.zkServerHost.equals(str)) {
                        return;
                    }
                } else if (LOG.isInfoEnabled()) {
                    LOG.info("onlineZooKeeperServers: Empty directory " + this.serverDirectory + ", waiting " + this.pollMsecs + " msecs.");
                }
                Thread.sleep(this.pollMsecs);
                i2++;
            } catch (IOException e7) {
                throw new RuntimeException(e7);
            } catch (InterruptedException e8) {
                LOG.warn("onlineZooKeeperServers: Strange interrupt from " + e8.getMessage(), e8);
            }
        }
    }

    private void waitUntilAllTasksDone(int i) {
        int i2;
        int i3 = 0;
        long milliseconds = this.time.getMilliseconds() + this.conf.getWaitTaskDoneTimeoutMs();
        do {
            boolean[] zArr = new boolean[i];
            try {
                FileStatus[] listStatus = this.fs.listStatus(this.taskDirectory);
                i2 = 0;
                if (listStatus.length > 0) {
                    for (FileStatus fileStatus : listStatus) {
                        String name = fileStatus.getPath().getName();
                        if (ComputationDoneName.isName(name)) {
                            i2++;
                            zArr[ComputationDoneName.fromName(name).getWorkerId()] = true;
                        }
                    }
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("waitUntilAllTasksDone: Got " + i2 + " and " + i + " desired (polling period is " + this.pollMsecs + ") on attempt " + i3);
                }
            } catch (IOException e) {
                LOG.warn("waitUntilAllTasksDone: Got IOException.", e);
            } catch (InterruptedException e2) {
                LOG.warn("waitUntilAllTasksDone: Got InterruptedException", e2);
            }
            if (i2 >= i) {
                return;
            }
            StringBuilder sb = new StringBuilder();
            for (int i4 = 0; i4 < zArr.length; i4++) {
                if (!zArr[i4]) {
                    sb.append(i4).append(", ");
                }
            }
            LOG.info("waitUntilAllTasksDone: Still waiting on tasks " + sb.toString());
            i3++;
            Thread.sleep(this.pollMsecs);
            this.context.progress();
        } while (this.time.getMilliseconds() <= milliseconds);
        throw new IllegalStateException("waitUntilAllTasksDone: Tasks did not finish by the maximum time of " + this.conf.getWaitTaskDoneTimeoutMs() + " milliseconds");
    }

    public void offlineZooKeeperServers(State state) {
        if (state == State.FINISHED) {
            createZooKeeperClosedStamp();
        }
        synchronized (this) {
            if (this.zkRunner != null) {
                boolean z = GiraphConstants.IS_PURE_YARN_JOB.get(this.conf);
                int mapTasks = this.conf.getMapTasks();
                if (z) {
                    mapTasks = this.conf.getInt(GiraphConstants.MAX_WORKERS, 0) + 1;
                }
                LOG.info("offlineZooKeeperServers: Will wait for " + mapTasks + " tasks");
                waitUntilAllTasksDone(mapTasks);
                this.zkRunner.stop();
                try {
                    FileUtils.deleteDirectory(new File(this.zkDir));
                } catch (IOException e) {
                    LOG.warn("offlineZooKeeperSevers: IOException, but continuing", e);
                }
                if (LOG.isInfoEnabled()) {
                    LOG.info("offlineZooKeeperServers: deleted directory " + this.zkDir);
                }
                this.zkRunner = null;
            }
        }
    }

    private ZooKeeperRunner createRunner() {
        InProcessZooKeeperRunner inProcessZooKeeperRunner = new InProcessZooKeeperRunner();
        inProcessZooKeeperRunner.setConf(this.conf);
        return inProcessZooKeeperRunner;
    }

    public boolean runsZooKeeper() {
        boolean z;
        synchronized (this) {
            z = this.zkRunner != null;
        }
        return z;
    }

    public void cleanupOnExit() {
        try {
            this.fs.deleteOnExit(this.baseDirectory);
        } catch (IOException e) {
            LOG.error("cleanupOnExit: Failed to delete on exit " + this.baseDirectory);
        }
    }

    public void cleanup() {
        synchronized (this) {
            if (this.zkRunner != null) {
                this.zkRunner.cleanup();
            }
        }
    }
}
