package org.apache.flink.connectors.hive;

import com.klarna.hiverunner.HiveServerContext;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import org.apache.flink.shaded.guava30.com.google.common.base.Joiner;
import org.apache.flink.shaded.guava30.com.google.common.base.Throwables;
import org.apache.flink.table.catalog.hive.HiveTestUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStore;
import org.junit.rules.ExternalResource;
import org.junit.rules.TestRule;
import org.junit.runners.model.InitializationError;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner.class */
public class FlinkStandaloneHiveRunner extends FlinkEmbeddedHiveRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkStandaloneHiveRunner.class);
    private static final Duration HMS_START_TIMEOUT = Duration.ofSeconds(90);
    private Future<Void> hmsWatcher;
    private int hmsPort;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/connectors/hive/FlinkStandaloneHiveRunner$LogRedirect.class */
    public static class LogRedirect implements Runnable {
        private final InputStream inputStream;
        private final Logger logger;

        LogRedirect(InputStream inputStream, Logger logger) {
            this.inputStream = inputStream;
            this.logger = logger;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Stream<String> lines = new BufferedReader(new InputStreamReader(this.inputStream)).lines();
                Logger logger = this.logger;
                logger.getClass();
                lines.forEach(logger::info);
            } catch (Exception e) {
                this.logger.error(Throwables.getStackTraceAsString(e));
            }
        }
    }

    public FlinkStandaloneHiveRunner(Class<?> cls) throws InitializationError {
        super(cls);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.connectors.hive.FlinkEmbeddedHiveRunner
    public List<TestRule> classRules() {
        try {
            this.hmsPort = HiveTestUtils.getFreePort();
            List<TestRule> classRules = super.classRules();
            classRules.add(classRules.size() - 1, new ExternalResource() { // from class: org.apache.flink.connectors.hive.FlinkStandaloneHiveRunner.1
                protected void before() throws Throwable {
                    FlinkStandaloneHiveRunner.this.hmsWatcher = FlinkStandaloneHiveRunner.startHMS(FlinkStandaloneHiveRunner.this.context, FlinkStandaloneHiveRunner.this.hmsPort);
                }

                protected void after() {
                    if (FlinkStandaloneHiveRunner.this.hmsWatcher != null) {
                        FlinkStandaloneHiveRunner.this.hmsWatcher.cancel(true);
                    }
                }
            });
            return classRules;
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Future<Void> startHMS(HiveServerContext hiveServerContext, int i) throws Exception {
        hiveServerContext.init();
        hiveServerContext.getHiveConf().setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + i);
        HiveConf hiveConf = hiveServerContext.getHiveConf();
        ArrayList arrayList = new ArrayList();
        arrayList.add(Joiner.on(File.separator).join(System.getProperty("java.home"), "bin", new Object[]{"java"}));
        arrayList.add("-cp");
        arrayList.add(System.getProperty("java.class.path"));
        arrayList.add(hiveCmdLineConfig(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, hiveConf.getVar(HiveConf.ConfVars.METASTOREWAREHOUSE)));
        arrayList.add(hiveCmdLineConfig(HiveConf.ConfVars.SCRATCHDIR.varname, hiveConf.getVar(HiveConf.ConfVars.SCRATCHDIR)));
        arrayList.add(hiveCmdLineConfig(HiveConf.ConfVars.LOCALSCRATCHDIR.varname, hiveConf.getVar(HiveConf.ConfVars.LOCALSCRATCHDIR)));
        arrayList.add(hiveCmdLineConfig(HiveConf.ConfVars.HIVEHISTORYFILELOC.varname, hiveConf.getVar(HiveConf.ConfVars.HIVEHISTORYFILELOC)));
        arrayList.add(hiveCmdLineConfig("hive.warehouse.subdir.inherit.perms", String.valueOf(hiveConf.getBoolean("hive.warehouse.subdir.inherit.perms", true))));
        arrayList.add(hiveCmdLineConfig("hadoop.tmp.dir", hiveConf.get("hadoop.tmp.dir")));
        arrayList.add(hiveCmdLineConfig("test.log.dir", hiveConf.get("test.log.dir")));
        arrayList.add(hiveCmdLineConfig(HiveConf.ConfVars.METASTORECONNECTURLKEY.varname, hiveConf.getVar(HiveConf.ConfVars.METASTORECONNECTURLKEY)));
        File createTempFile = File.createTempFile("derby", ".log");
        createTempFile.deleteOnExit();
        arrayList.add(hiveCmdLineConfig("derby.stream.error.file", createTempFile.getAbsolutePath()));
        if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST)) {
            arrayList.add(hiveCmdLineConfig(HiveConf.ConfVars.HIVE_IN_TEST.varname, "true"));
        }
        arrayList.add(HiveMetaStore.class.getCanonicalName());
        arrayList.add("-p");
        arrayList.add(String.valueOf(i));
        Process start = new ProcessBuilder(arrayList).start();
        try {
            Thread thread = new Thread(new LogRedirect(start.getInputStream(), LOGGER));
            Thread thread2 = new Thread(new LogRedirect(start.getErrorStream(), LOGGER));
            thread.setDaemon(true);
            thread.setName("HMS-IN-Logger");
            thread2.setDaemon(true);
            thread2.setName("HMS-ERR-Logger");
            thread.start();
            thread2.start();
            FutureTask futureTask = new FutureTask(() -> {
                try {
                    try {
                        int waitFor = start.waitFor();
                        thread.join();
                        thread2.join();
                        if (waitFor != 0) {
                            throw new RuntimeException("HMS process exited with " + waitFor);
                        }
                    } catch (InterruptedException e) {
                        LOGGER.info("Shutting down HMS");
                        if (start.isAlive()) {
                            start.destroy();
                            try {
                                start.waitFor(5L, TimeUnit.SECONDS);
                            } catch (InterruptedException e2) {
                                LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
                            }
                            start.destroyForcibly();
                        }
                    }
                } finally {
                    if (start.isAlive()) {
                        start.destroy();
                        try {
                            start.waitFor(5L, TimeUnit.SECONDS);
                        } catch (InterruptedException e3) {
                            LOGGER.info("Interrupted waiting for HMS to shut down, killing it forcibly");
                        }
                        start.destroyForcibly();
                    }
                }
            }, null);
            Thread thread3 = new Thread(futureTask);
            thread3.setName("HMS-Watcher");
            thread3.setDaemon(false);
            thread3.start();
            waitForHMSStart(i);
            return futureTask;
        } catch (Throwable th) {
            start.destroyForcibly();
            throw th;
        }
    }

    private static void waitForHMSStart(int i) throws Exception {
        long currentTimeMillis = System.currentTimeMillis() + HMS_START_TIMEOUT.toMillis();
        while (System.currentTimeMillis() < currentTimeMillis) {
            try {
                SocketChannel open = SocketChannel.open(new InetSocketAddress("localhost", i));
                Throwable th = null;
                try {
                    try {
                        LOGGER.info("HMS started at port {}", Integer.valueOf(i));
                        if (open != null) {
                            if (0 == 0) {
                                open.close();
                                return;
                            }
                            try {
                                open.close();
                                return;
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                                return;
                            }
                        }
                        return;
                    } catch (Throwable th3) {
                        th = th3;
                        throw th3;
                        break;
                    }
                } finally {
                }
            } catch (ConnectException e) {
                LOGGER.info("Waiting for HMS to start...");
                Thread.sleep(1000L);
            }
        }
        throw new TimeoutException("Timeout waiting for HMS to start");
    }

    private static String hiveCmdLineConfig(String str, String str2) {
        return String.format("-D%s=%s", str, str2);
    }
}
