package org.apache.giraph.utils;

import com.google.common.base.Charsets;
import com.google.common.collect.ImmutableList;
import com.google.common.io.Files;
import java.io.File;
import java.io.IOException;
import java.net.ServerSocket;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.antlr.tool.GrammarReport;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.giraph.io.formats.GiraphFileInputFormat;
import org.apache.giraph.io.formats.InMemoryVertexOutputFormat;
import org.apache.giraph.job.GiraphJob;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.log4j.Logger;
import org.apache.zookeeper.server.ServerConfig;
import org.apache.zookeeper.server.ZooKeeperServerMain;
import org.apache.zookeeper.server.quorum.QuorumPeerConfig;

/* loaded from: input_file:org/apache/giraph/utils/InternalVertexRunner.class */
public class InternalVertexRunner {
    public static final int LOCAL_ZOOKEEPER_PORT_FROM = 22182;
    public static final int LOCAL_ZOOKEEPER_PORT_TO = 65535;
    private static final Logger LOG = Logger.getLogger(InternalVertexRunner.class);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/giraph/utils/InternalVertexRunner$InternalZooKeeper.class */
    public static class InternalZooKeeper extends ZooKeeperServerMain {
        private InternalZooKeeper() {
        }

        void end() {
            shutdown();
        }
    }

    private InternalVertexRunner() {
    }

    public static Iterable<String> run(GiraphConfiguration giraphConfiguration, String[] strArr) throws Exception {
        return run(giraphConfiguration, strArr, null);
    }

    private static boolean runZooKeeperAndJob(QuorumPeerConfig quorumPeerConfig, GiraphJob giraphJob) {
        final InternalZooKeeper internalZooKeeper = new InternalZooKeeper();
        final ServerConfig serverConfig = new ServerConfig();
        serverConfig.readFrom(quorumPeerConfig);
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        newSingleThreadExecutor.execute(new Runnable() { // from class: org.apache.giraph.utils.InternalVertexRunner.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    InternalZooKeeper.this.runFromConfig(serverConfig);
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        });
        try {
            try {
                boolean run = giraphJob.run(true);
                internalZooKeeper.end();
                newSingleThreadExecutor.shutdown();
                try {
                    newSingleThreadExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                } catch (InterruptedException e) {
                    LOG.error("runZooKeeperAndJob: Interrupted on waiting", e);
                }
                return run;
            } catch (Throwable th) {
                internalZooKeeper.end();
                newSingleThreadExecutor.shutdown();
                try {
                    newSingleThreadExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                } catch (InterruptedException e2) {
                    LOG.error("runZooKeeperAndJob: Interrupted on waiting", e2);
                }
                throw th;
            }
        } catch (IOException | ClassNotFoundException | InterruptedException e3) {
            LOG.error("runZooKeeperAndJob: Got exception on running", e3);
            internalZooKeeper.end();
            newSingleThreadExecutor.shutdown();
            try {
                newSingleThreadExecutor.awaitTermination(1L, TimeUnit.MINUTES);
                return false;
            } catch (InterruptedException e4) {
                LOG.error("runZooKeeperAndJob: Interrupted on waiting", e4);
                return false;
            }
        }
    }

    public static Iterable<String> run(GiraphConfiguration giraphConfiguration, String[] strArr, String[] strArr2) throws Exception {
        File createTestDir = FileUtils.createTestDir(giraphConfiguration.getComputationName());
        try {
            Iterable<String> run = run(giraphConfiguration, strArr, strArr2, null, createTestDir);
            FileUtils.delete(createTestDir);
            return run;
        } catch (Throwable th) {
            FileUtils.delete(createTestDir);
            throw th;
        }
    }

    public static Iterable<String> run(GiraphConfiguration giraphConfiguration, String[] strArr, String[] strArr2, String str, File file) throws Exception {
        File file2 = null;
        File file3 = null;
        if (giraphConfiguration.hasVertexInputFormat()) {
            file2 = FileUtils.createTempFile(file, "vertices.txt");
        }
        if (giraphConfiguration.hasEdgeInputFormat()) {
            file3 = FileUtils.createTempFile(file, "edges.txt");
        }
        File createTempDir = FileUtils.createTempDir(file, "output");
        File createTempDir2 = FileUtils.createTempDir(file, "_bspZooKeeper");
        File createTempDir3 = FileUtils.createTempDir(file, "_defaultZkManagerDir");
        if (giraphConfiguration.hasVertexInputFormat()) {
            FileUtils.writeLines(file2, strArr);
        }
        if (giraphConfiguration.hasEdgeInputFormat()) {
            FileUtils.writeLines(file3, strArr2);
        }
        int findAvailablePort = findAvailablePort();
        giraphConfiguration.setWorkerConfiguration(1, 1, 100.0f);
        GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConfiguration, false);
        GiraphConstants.LOCAL_TEST_MODE.set(giraphConfiguration, true);
        giraphConfiguration.setZookeeperList("localhost:" + String.valueOf(findAvailablePort));
        giraphConfiguration.set(GiraphConstants.ZOOKEEPER_DIR, createTempDir2.toString());
        GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(giraphConfiguration, createTempDir3.toString());
        if (str == null) {
            str = FileUtils.createTempDir(file, "_checkpoints").toString();
        }
        GiraphConstants.CHECKPOINT_DIRECTORY.set(giraphConfiguration, str);
        GiraphJob giraphJob = new GiraphJob(giraphConfiguration, giraphConfiguration.getComputationName());
        Job internalJob = giraphJob.getInternalJob();
        if (giraphConfiguration.hasVertexInputFormat()) {
            GiraphFileInputFormat.setVertexInputPath(internalJob.getConfiguration(), new Path(file2.toString()));
        }
        if (giraphConfiguration.hasEdgeInputFormat()) {
            GiraphFileInputFormat.setEdgeInputPath(internalJob.getConfiguration(), new Path(file3.toString()));
        }
        FileOutputFormat.setOutputPath(giraphJob.getInternalJob(), new Path(createTempDir.toString()));
        Properties configLocalZooKeeper = configLocalZooKeeper(createTempDir2, findAvailablePort);
        QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
        quorumPeerConfig.parseProperties(configLocalZooKeeper);
        if (!runZooKeeperAndJob(quorumPeerConfig, giraphJob)) {
            return null;
        }
        File file4 = new File(createTempDir, "part-m-00000");
        return (giraphConfiguration.hasVertexOutputFormat() && file4.canRead()) ? Files.readLines(file4, Charsets.UTF_8) : ImmutableList.of();
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> void run(GiraphConfiguration giraphConfiguration, TestGraph<I, V, E> testGraph) throws Exception {
        File createTestDir = FileUtils.createTestDir(giraphConfiguration.getComputationName());
        try {
            run(giraphConfiguration, testGraph, createTestDir, null);
            FileUtils.delete(createTestDir);
        } catch (Throwable th) {
            FileUtils.delete(createTestDir);
            throw th;
        }
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> void run(GiraphConfiguration giraphConfiguration, TestGraph<I, V, E> testGraph, File file, String str) throws Exception {
        File createTempDir = FileUtils.createTempDir(file, "_bspZooKeeper");
        File createTempDir2 = FileUtils.createTempDir(file, "_defaultZkManagerDir");
        if (str == null) {
            str = FileUtils.createTempDir(file, "_checkpoints").toString();
        }
        giraphConfiguration.setVertexInputFormatClass(InMemoryVertexInputFormat.class);
        GiraphJob giraphJob = new GiraphJob(giraphConfiguration, giraphConfiguration.getComputationName());
        InMemoryVertexInputFormat.setGraph(testGraph);
        int findAvailablePort = findAvailablePort();
        giraphConfiguration.setWorkerConfiguration(1, 1, 100.0f);
        GiraphConstants.SPLIT_MASTER_WORKER.set(giraphConfiguration, false);
        GiraphConstants.LOCAL_TEST_MODE.set(giraphConfiguration, true);
        GiraphConstants.ZOOKEEPER_LIST.set(giraphConfiguration, "localhost:" + String.valueOf(findAvailablePort));
        giraphConfiguration.set(GiraphConstants.ZOOKEEPER_DIR, createTempDir.toString());
        GiraphConstants.ZOOKEEPER_MANAGER_DIRECTORY.set(giraphConfiguration, createTempDir2.toString());
        GiraphConstants.CHECKPOINT_DIRECTORY.set(giraphConfiguration, str);
        Properties configLocalZooKeeper = configLocalZooKeeper(createTempDir, findAvailablePort);
        QuorumPeerConfig quorumPeerConfig = new QuorumPeerConfig();
        quorumPeerConfig.parseProperties(configLocalZooKeeper);
        runZooKeeperAndJob(quorumPeerConfig, giraphJob);
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(GiraphConfiguration giraphConfiguration, TestGraph<I, V, E> testGraph) throws Exception {
        File createTestDir = FileUtils.createTestDir(giraphConfiguration.getComputationName());
        try {
            TestGraph<I, V, E> runWithInMemoryOutput = runWithInMemoryOutput(giraphConfiguration, testGraph, createTestDir, null);
            FileUtils.delete(createTestDir);
            return runWithInMemoryOutput;
        } catch (Throwable th) {
            FileUtils.delete(createTestDir);
            throw th;
        }
    }

    public static <I extends WritableComparable, V extends Writable, E extends Writable> TestGraph<I, V, E> runWithInMemoryOutput(GiraphConfiguration giraphConfiguration, TestGraph<I, V, E> testGraph, File file, String str) throws Exception {
        giraphConfiguration.setVertexOutputFormatClass(InMemoryVertexOutputFormat.class);
        InMemoryVertexOutputFormat.initializeOutputGraph(giraphConfiguration);
        run(giraphConfiguration, testGraph, file, str);
        return InMemoryVertexOutputFormat.getOutputGraph();
    }

    private static Properties configLocalZooKeeper(File file, int i) {
        Properties properties = new Properties();
        properties.setProperty("tickTime", "2000");
        properties.setProperty("dataDir", file.getAbsolutePath());
        properties.setProperty("clientPort", String.valueOf(i));
        properties.setProperty("maxClientCnxns", "10000");
        properties.setProperty("minSessionTimeout", "10000");
        properties.setProperty("maxSessionTimeout", "100000");
        properties.setProperty("initLimit", "10");
        properties.setProperty("syncLimit", GrammarReport.Version);
        properties.setProperty("snapCount", "50000");
        return properties;
    }

    private static int findAvailablePort() {
        for (int i = 22182; i < 65535; i++) {
            ServerSocket serverSocket = null;
            try {
                try {
                    serverSocket = new ServerSocket(i);
                    serverSocket.setReuseAddress(true);
                    int i2 = i;
                    if (serverSocket != null && !serverSocket.isClosed()) {
                        try {
                            serverSocket.close();
                        } catch (IOException e) {
                            LOG.info("findAvailablePort: can't close test socket", e);
                        }
                    }
                    return i2;
                } catch (IOException e2) {
                    LOG.info("findAvailablePort: port " + i + " is in use.");
                    if (serverSocket != null && !serverSocket.isClosed()) {
                        try {
                            serverSocket.close();
                        } catch (IOException e3) {
                            LOG.info("findAvailablePort: can't close test socket", e3);
                        }
                    }
                }
            } catch (Throwable th) {
                if (serverSocket != null && !serverSocket.isClosed()) {
                    try {
                        serverSocket.close();
                    } catch (IOException e4) {
                        LOG.info("findAvailablePort: can't close test socket", e4);
                    }
                }
                throw th;
            }
        }
        throw new RuntimeException("No port found in the range [ 22182, 65535)");
    }
}
