package org.apache.twill.internal.appmaster;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.AbstractIdleService;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.Service;
import java.io.File;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.twill.api.RunId;
import org.apache.twill.internal.ServiceMain;
import org.apache.twill.internal.TwillRuntimeSpecification;
import org.apache.twill.internal.json.TwillRuntimeSpecificationAdapter;
import org.apache.twill.internal.kafka.EmbeddedKafkaServer;
import org.apache.twill.internal.logging.Loggings;
import org.apache.twill.internal.utils.Networks;
import org.apache.twill.internal.yarn.VersionDetectYarnAMClientFactory;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.zookeeper.NodeChildren;
import org.apache.twill.zookeeper.OperationFuture;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKOperations;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/internal/appmaster/ApplicationMasterMain.class */
public final class ApplicationMasterMain extends ServiceMain {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterMain.class);
    private final String kafkaZKConnect;

    /* loaded from: input_file:org/apache/twill/internal/appmaster/ApplicationMasterMain$AppMasterTwillZKPathService.class */
    private static final class AppMasterTwillZKPathService extends ServiceMain.TwillZKPathService {
        private static final Logger LOG = LoggerFactory.getLogger(AppMasterTwillZKPathService.class);
        private final ZKClient zkClient;

        AppMasterTwillZKPathService(ZKClient zKClient, RunId runId) {
            super(zKClient, runId);
            this.zkClient = zKClient;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // org.apache.twill.internal.ServiceMain.TwillZKPathService
        public void shutDown() throws Exception {
            super.shutDown();
            if (delete("/instances")) {
                List children = ((NodeChildren) this.zkClient.getChildren("/discoverable").get(5L, TimeUnit.SECONDS)).getChildren();
                ArrayList arrayList = new ArrayList();
                Iterator it = children.iterator();
                while (it.hasNext()) {
                    String str = "/discoverable/" + ((String) it.next());
                    LOG.info("Removing ZK path: {}{}", this.zkClient.getConnectString(), str);
                    arrayList.add(this.zkClient.delete(str));
                }
                Futures.successfulAsList(arrayList).get(5L, TimeUnit.SECONDS);
                Iterator it2 = arrayList.iterator();
                while (it2.hasNext()) {
                    try {
                        ((OperationFuture) it2.next()).get();
                    } catch (ExecutionException e) {
                        if (!(e.getCause() instanceof KeeperException.NotEmptyException)) {
                            throw e;
                        }
                        return;
                    }
                }
                if (delete("/discoverable")) {
                    delete("/");
                }
            }
        }

        private boolean delete(String str) throws Exception {
            try {
                LOG.info("Removing ZK path: {}{}", this.zkClient.getConnectString(), str);
                this.zkClient.delete(str).get(5L, TimeUnit.SECONDS);
                return true;
            } catch (ExecutionException e) {
                if (e.getCause() instanceof KeeperException.NotEmptyException) {
                    return false;
                }
                throw e;
            }
        }
    }

    /* loaded from: input_file:org/apache/twill/internal/appmaster/ApplicationMasterMain$ApplicationKafkaService.class */
    private static final class ApplicationKafkaService extends AbstractIdleService {
        private static final Logger LOG = LoggerFactory.getLogger(ApplicationKafkaService.class);
        private final ZKClient zkClient;
        private final String kafkaZKPath;
        private final EmbeddedKafkaServer kafkaServer;

        private ApplicationKafkaService(ZKClient zKClient, RunId runId) {
            this.zkClient = zKClient;
            this.kafkaZKPath = "/" + runId.getId() + "/kafka";
            this.kafkaServer = new EmbeddedKafkaServer(generateKafkaConfig(zKClient.getConnectString() + this.kafkaZKPath));
        }

        protected void startUp() throws Exception {
            ZKOperations.ignoreError(this.zkClient.create(this.kafkaZKPath, (byte[]) null, CreateMode.PERSISTENT), KeeperException.NodeExistsException.class, this.kafkaZKPath).get();
            this.kafkaServer.startAndWait();
        }

        protected void shutDown() throws Exception {
            Loggings.forceFlush();
            try {
                try {
                    TimeUnit.SECONDS.sleep(2L);
                    this.kafkaServer.stopAndWait();
                } catch (InterruptedException e) {
                    LOG.info("Kafka shutdown delay interrupted", e);
                    this.kafkaServer.stopAndWait();
                }
            } catch (Throwable th) {
                this.kafkaServer.stopAndWait();
                throw th;
            }
        }

        private Properties generateKafkaConfig(String str) {
            int randomPort = Networks.getRandomPort();
            Preconditions.checkState(randomPort > 0, "Failed to get random port.");
            Properties properties = new Properties();
            properties.setProperty("log.dir", new File("kafka-logs").getAbsolutePath());
            properties.setProperty("port", Integer.toString(randomPort));
            properties.setProperty("broker.id", "1");
            properties.setProperty("socket.send.buffer.bytes", "1048576");
            properties.setProperty("socket.receive.buffer.bytes", "1048576");
            properties.setProperty("socket.request.max.bytes", "104857600");
            properties.setProperty("num.partitions", "1");
            properties.setProperty("log.retention.hours", "24");
            properties.setProperty("log.flush.interval.messages", "10000");
            properties.setProperty("log.flush.interval.ms", "1000");
            properties.setProperty("log.segment.bytes", "536870912");
            properties.setProperty("zookeeper.connect", str);
            properties.setProperty("zookeeper.connection.timeout.ms", "3000");
            properties.setProperty("default.replication.factor", "1");
            return properties;
        }
    }

    /* loaded from: input_file:org/apache/twill/internal/appmaster/ApplicationMasterMain$YarnAMClientService.class */
    private static final class YarnAMClientService extends AbstractIdleService {
        private final YarnAMClient yarnAMClient;
        private final TrackerService trackerService;

        private YarnAMClientService(YarnAMClient yarnAMClient, TrackerService trackerService) {
            this.yarnAMClient = yarnAMClient;
            this.trackerService = trackerService;
        }

        protected void startUp() throws Exception {
            this.trackerService.setHost(this.yarnAMClient.getHost());
            this.trackerService.startAndWait();
            this.yarnAMClient.setTracker(this.trackerService.getBindAddress(), this.trackerService.getUrl());
            try {
                this.yarnAMClient.startAndWait();
            } catch (Exception e) {
                this.trackerService.stopAndWait();
                throw e;
            }
        }

        protected void shutDown() throws Exception {
            try {
                this.yarnAMClient.stopAndWait();
                this.trackerService.stopAndWait();
            } catch (Throwable th) {
                this.trackerService.stopAndWait();
                throw th;
            }
        }
    }

    private ApplicationMasterMain(String str) {
        this.kafkaZKConnect = str;
    }

    public static void main(String[] strArr) throws Exception {
        TwillRuntimeSpecification fromJson = TwillRuntimeSpecificationAdapter.create().fromJson(new File("twillSpec.json"));
        String zkConnectStr = fromJson.getZkConnectStr();
        RunId twillAppRunId = fromJson.getTwillAppRunId();
        Service createZKClient = createZKClient(zkConnectStr, fromJson.getTwillAppName());
        YarnConfiguration yarnConfiguration = new YarnConfiguration(new HdfsConfiguration(new Configuration()));
        setRMSchedulerAddress(yarnConfiguration, fromJson.getRmSchedulerAddr());
        YarnAMClient create = new VersionDetectYarnAMClientFactory(yarnConfiguration).create();
        Service applicationMasterService = new ApplicationMasterService(twillAppRunId, createZKClient, fromJson, create, createAppLocation(yarnConfiguration, fromJson.getFsUser(), fromJson.getTwillAppDir()));
        ArrayList newArrayList = Lists.newArrayList(new Service[]{new YarnAMClientService(create, new TrackerService(applicationMasterService)), createZKClient, new AppMasterTwillZKPathService(createZKClient, twillAppRunId)});
        if (Boolean.parseBoolean(System.getProperty("twill.disable.kafka"))) {
            LOG.info("Log collection through kafka disabled");
        } else {
            newArrayList.add(new ApplicationKafkaService(createZKClient, twillAppRunId));
        }
        new ApplicationMasterMain(String.format("%s/%s/kafka", zkConnectStr, twillAppRunId.getId())).doMain(applicationMasterService, (Service[]) newArrayList.toArray(new Service[newArrayList.size()]));
    }

    private static void setRMSchedulerAddress(Configuration configuration, String str) {
        if (str == null) {
            return;
        }
        String[] propertySources = configuration.getPropertySources("yarn.resourcemanager.scheduler.address");
        if (propertySources == null || propertySources.length == 0 || "yarn-default.xml".equals(propertySources[propertySources.length - 1])) {
            configuration.set("yarn.resourcemanager.scheduler.address", str);
        }
    }

    @Override // org.apache.twill.internal.ServiceMain
    protected String getHostname() {
        try {
            return InetAddress.getLocalHost().getCanonicalHostName();
        } catch (UnknownHostException e) {
            return "unknown";
        }
    }

    @Override // org.apache.twill.internal.ServiceMain
    protected String getKafkaZKConnect() {
        return this.kafkaZKConnect;
    }

    @Override // org.apache.twill.internal.ServiceMain
    protected String getRunnableName() {
        return System.getenv("TWILL_RUNNABLE_NAME");
    }
}
