package org.apache.gobblin.cluster.suite;

import com.google.common.base.Charsets;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.io.Resources;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import com.typesafe.config.ConfigParseOptions;
import com.typesafe.config.ConfigRenderOptions;
import com.typesafe.config.ConfigSyntax;
import java.io.DataOutputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.curator.test.TestingServer;
import org.apache.gobblin.cluster.GobblinClusterManager;
import org.apache.gobblin.cluster.GobblinTaskRunner;
import org.apache.gobblin.cluster.HelixUtils;
import org.apache.gobblin.cluster.TestHelper;
import org.apache.gobblin.metrics.GobblinMetricsRegistry;
import org.apache.gobblin.testing.AssertWithBackoff;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;

/* loaded from: input_file:org/apache/gobblin/cluster/suite/IntegrationBasicSuite.class */
public class IntegrationBasicSuite {
    public static final String JOB_NAME = "HelloWorldTestJob";
    public static final String JOB_CONF_NAME = "HelloWorldJob.conf";
    public static final String WORKER_INSTANCE_0 = "WorkerInstance_0";
    public static final String TEST_INSTANCE_NAME_KEY = "worker.instance.name";
    protected final Config jobConfigOverrides;
    protected Config managerConfig;
    protected Collection<Config> taskDriverConfigs;
    protected Collection<Config> workerConfigs;
    protected Collection<GobblinTaskRunner> workers;
    protected Collection<GobblinTaskRunner> taskDrivers;
    protected GobblinClusterManager manager;
    protected Path workPath;
    protected Path jobConfigPath;
    protected Path jobOutputBasePath;
    protected URL jobConfResourceUrl;
    protected TestingServer testingZKServer;
    private static final Logger log = LoggerFactory.getLogger(IntegrationBasicSuite.class);
    public static Path jobLogOutputFile = Paths.get("gobblin-integration-test-log-dir/gobblin-cluster-test.log", new String[0]);

    public IntegrationBasicSuite() {
        this(ConfigFactory.empty());
    }

    public IntegrationBasicSuite(Config config) {
        this.taskDriverConfigs = Lists.newArrayList();
        this.workerConfigs = Lists.newArrayList();
        this.workers = Lists.newArrayList();
        this.taskDrivers = Lists.newArrayList();
        this.jobConfigOverrides = config;
        try {
            initWorkDir();
            initJobOutputDir();
            initZooKeeper();
            initConfig();
            initJobConfDir();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void initConfig() {
        this.managerConfig = getManagerConfig();
        this.taskDriverConfigs = getTaskDriverConfigs();
        this.workerConfigs = getWorkerConfigs();
    }

    private void initZooKeeper() throws Exception {
        this.testingZKServer = new TestingServer(false);
        log.info("Created testing ZK Server. Connection string : " + this.testingZKServer.getConnectString());
    }

    private void initJobConfDir() throws IOException {
        this.jobConfigPath = Paths.get(this.managerConfig.getString("gobblin.cluster.job.conf.path"), new String[0]);
        Files.createDirectories(this.jobConfigPath, new FileAttribute[0]);
        this.jobConfResourceUrl = Resources.getResource(JOB_CONF_NAME);
        copyJobConfFromResource();
    }

    private void initJobOutputDir() throws IOException {
        this.jobOutputBasePath = Paths.get(this.workPath + "/job-output", new String[0]);
        Files.createDirectory(this.jobOutputBasePath, new FileAttribute[0]);
    }

    private void initWorkDir() throws IOException {
        this.workPath = Paths.get(ConfigFactory.parseURL(Resources.getResource("BasicCluster.conf")).getString("gobblin.cluster.workDir"), new String[0]);
        log.info("Created a new work directory: " + this.workPath.toAbsolutePath());
        deleteWorkDir();
        Files.createDirectory(this.workPath, new FileAttribute[0]);
    }

    public void deleteWorkDir() throws IOException {
        if (this.workPath == null || !Files.exists(this.workPath, new LinkOption[0])) {
            return;
        }
        FileUtils.deleteDirectory(this.workPath.toFile());
    }

    private void copyJobConfFromResource() throws IOException {
        InputStream openStream = this.jobConfResourceUrl.openStream();
        Throwable th = null;
        try {
            overrideJobConfigs(ConfigFactory.parseReader(new InputStreamReader(openStream), ConfigParseOptions.defaults().setSyntax(ConfigSyntax.CONF))).forEach((str, config) -> {
                try {
                    writeJobConf(str, config);
                } catch (IOException e) {
                    log.error("Job " + str + " config cannot be written.");
                }
            });
            if (openStream != null) {
                if (0 == 0) {
                    openStream.close();
                    return;
                }
                try {
                    openStream.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (openStream != null) {
                if (0 != 0) {
                    try {
                        openStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    openStream.close();
                }
            }
            throw th3;
        }
    }

    protected Map<String, Config> overrideJobConfigs(Config config) {
        return ImmutableMap.of("HelloWorldTestJob", this.jobConfigOverrides.withFallback(config));
    }

    private void writeJobConf(String str, Config config) throws IOException {
        String str2 = this.jobConfigPath + "/" + str + ".conf";
        String render = config.root().render(ConfigRenderOptions.defaults());
        DataOutputStream dataOutputStream = new DataOutputStream(new FileOutputStream(str2));
        Throwable th = null;
        try {
            OutputStreamWriter outputStreamWriter = new OutputStreamWriter(dataOutputStream, Charsets.UTF_8);
            Throwable th2 = null;
            try {
                try {
                    outputStreamWriter.write(render);
                    if (outputStreamWriter != null) {
                        if (0 != 0) {
                            try {
                                outputStreamWriter.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            outputStreamWriter.close();
                        }
                    }
                    if (dataOutputStream != null) {
                        if (0 == 0) {
                            dataOutputStream.close();
                            return;
                        }
                        try {
                            dataOutputStream.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    th2 = th5;
                    throw th5;
                }
            } catch (Throwable th6) {
                if (outputStreamWriter != null) {
                    if (th2 != null) {
                        try {
                            outputStreamWriter.close();
                        } catch (Throwable th7) {
                            th2.addSuppressed(th7);
                        }
                    } else {
                        outputStreamWriter.close();
                    }
                }
                throw th6;
            }
        } catch (Throwable th8) {
            if (dataOutputStream != null) {
                if (0 != 0) {
                    try {
                        dataOutputStream.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    dataOutputStream.close();
                }
            }
            throw th8;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Config getClusterConfig() {
        Config parseURL = ConfigFactory.parseURL(Resources.getResource("BasicCluster.conf"));
        HashMap hashMap = new HashMap();
        hashMap.put("gobblin.cluster.zk.connection.string", this.testingZKServer.getConnectString());
        hashMap.put("gobblin.cluster.appWorkDir", this.workPath.toString());
        return ConfigFactory.parseMap(hashMap).withFallback(parseURL);
    }

    public Config getManagerConfig() {
        return ConfigFactory.parseURL(Resources.getResource("BasicManager.conf")).withFallback(getClusterConfig()).resolve();
    }

    protected Collection<Config> getTaskDriverConfigs() {
        return new ArrayList();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Collection<Config> getWorkerConfigs() {
        return Lists.newArrayList(new Config[]{ConfigFactory.parseURL(Resources.getResource("BasicWorker.conf")).withFallback(getClusterConfig()).resolve()});
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Config addInstanceName(Config config, String str) {
        HashMap hashMap = new HashMap();
        hashMap.put(TEST_INSTANCE_NAME_KEY, str);
        return ConfigFactory.parseMap(hashMap).withFallback(config).resolve();
    }

    public void waitForAndVerifyOutputFiles() throws Exception {
        AssertWithBackoff.create().logger(log).timeoutMs(120000L).maxSleepMs(100L).backoffFactor(1.5d).assertTrue(this::hasExpectedFilesBeenCreated, "Waiting for job-completion");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static boolean verifyFileForMessage(Path path, String str) throws IOException {
        return new String(Files.readAllBytes(path)).contains(str);
    }

    protected boolean hasExpectedFilesBeenCreated(Void r4) {
        return getNumOfOutputFiles(this.jobOutputBasePath) == 1;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int getNumOfOutputFiles(Path path) {
        return FileUtils.listFiles(path.toFile(), new String[]{"txt"}, true).size();
    }

    public void startCluster() throws Exception {
        this.testingZKServer.start();
        createHelixCluster();
        startWorker();
        startTaskDriver();
        startManager();
    }

    private void startManager() throws Exception {
        this.manager = new GobblinClusterManager(TestHelper.TEST_APPLICATION_NAME, "1", this.managerConfig, Optional.absent());
        this.manager.start();
    }

    private void startTaskDriver() throws Exception {
        for (Config config : this.taskDriverConfigs) {
            GobblinTaskRunner gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, config.getString(TEST_INSTANCE_NAME_KEY), "1", "1", config, Optional.absent());
            this.taskDrivers.add(gobblinTaskRunner);
            gobblinTaskRunner.getClass();
            new Thread(gobblinTaskRunner::start).start();
        }
    }

    private void startWorker() throws Exception {
        if (this.workerConfigs.size() == 1) {
            this.workers.add(new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, WORKER_INSTANCE_0, "1", "1", this.workerConfigs.iterator().next(), Optional.absent()));
            GobblinTaskRunner next = this.workers.iterator().next();
            next.getClass();
            new Thread(next::start).start();
            return;
        }
        for (Config config : this.workerConfigs) {
            GobblinTaskRunner gobblinTaskRunner = new GobblinTaskRunner(TestHelper.TEST_APPLICATION_NAME, config.getString(TEST_INSTANCE_NAME_KEY), "1", "1", config, Optional.absent());
            this.workers.add(gobblinTaskRunner);
            gobblinTaskRunner.getClass();
            new Thread(gobblinTaskRunner::start).start();
        }
    }

    public void verifyMetricsCleaned() {
        Assert.assertEquals(GobblinMetricsRegistry.getInstance().getMetricsByPattern(".*HelloWorldTestJob.*").size(), 0);
    }

    public void shutdownCluster() throws InterruptedException, IOException {
        this.workers.forEach(gobblinTaskRunner -> {
            gobblinTaskRunner.stop();
        });
        this.taskDrivers.forEach(gobblinTaskRunner2 -> {
            gobblinTaskRunner2.stop();
        });
        this.manager.stop();
        this.testingZKServer.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void createHelixCluster() throws Exception {
        HelixUtils.createGobblinHelixCluster(this.managerConfig.getString("gobblin.cluster.zk.connection.string"), this.managerConfig.getString("gobblin.cluster.helix.cluster.name"));
    }
}
