package org.apache.twill.internal.appmaster;

import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Supplier;
import com.google.common.base.Throwables;
import com.google.common.collect.DiscreteDomains;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multiset;
import com.google.common.collect.Ranges;
import com.google.common.collect.Sets;
import com.google.common.io.Files;
import com.google.common.io.InputSupplier;
import com.google.common.reflect.TypeToken;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.io.Reader;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.util.Records;
import org.apache.twill.api.Command;
import org.apache.twill.api.EventHandler;
import org.apache.twill.api.LocalFile;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.ResourceSpecification;
import org.apache.twill.api.RunId;
import org.apache.twill.api.RuntimeSpecification;
import org.apache.twill.api.TwillSpecification;
import org.apache.twill.api.logging.LogEntry;
import org.apache.twill.common.Threads;
import org.apache.twill.filesystem.Location;
import org.apache.twill.internal.ContainerInfo;
import org.apache.twill.internal.DefaultTwillRunResources;
import org.apache.twill.internal.JvmOptions;
import org.apache.twill.internal.ProcessLauncher;
import org.apache.twill.internal.TwillContainerLauncher;
import org.apache.twill.internal.appmaster.AllocationSpecification;
import org.apache.twill.internal.appmaster.ExpectedContainers;
import org.apache.twill.internal.json.JvmOptionsCodec;
import org.apache.twill.internal.json.LocalFileCodec;
import org.apache.twill.internal.json.TwillSpecificationAdapter;
import org.apache.twill.internal.state.Message;
import org.apache.twill.internal.utils.Instances;
import org.apache.twill.internal.yarn.AbstractYarnTwillService;
import org.apache.twill.internal.yarn.YarnAMClient;
import org.apache.twill.internal.yarn.YarnContainerInfo;
import org.apache.twill.internal.yarn.YarnContainerStatus;
import org.apache.twill.internal.yarn.YarnUtils;
import org.apache.twill.zookeeper.ZKClient;
import org.apache.twill.zookeeper.ZKClients;
import org.apache.zookeeper.CreateMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/twill/internal/appmaster/ApplicationMasterService.class */
public final class ApplicationMasterService extends AbstractYarnTwillService implements Supplier<ResourceReport> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationMasterService.class);
    private static final Gson GSON = new Gson();
    private static final Text AMRM_TOKEN_KIND_NAME = new Text("YARN_AM_RM_TOKEN");
    private final RunId runId;
    private final ZKClient zkClient;
    private final TwillSpecification twillSpec;
    private final ApplicationMasterLiveNodeData amLiveNode;
    private final RunningContainers runningContainers;
    private final ExpectedContainers expectedContainers;
    private final YarnAMClient amClient;
    private final JvmOptions jvmOpts;
    private final int reservedMemory;
    private final EventHandler eventHandler;
    private final Location applicationLocation;
    private final PlacementPolicyManager placementPolicyManager;
    private volatile boolean stopped;
    private Queue<RunnableContainerRequest> runnableContainerRequests;
    private Set<String> restartingRunnables;
    private ExecutorService instanceChangeExecutor;

    public ApplicationMasterService(RunId runId, ZKClient zKClient, File file, YarnAMClient yarnAMClient, Location location) throws Exception {
        super(zKClient, runId, location);
        this.runId = runId;
        this.twillSpec = TwillSpecificationAdapter.create().fromJson(file);
        this.zkClient = zKClient;
        this.applicationLocation = location;
        this.amClient = yarnAMClient;
        this.credentials = createCredentials();
        this.jvmOpts = loadJvmOptions();
        this.reservedMemory = getReservedMemory();
        this.placementPolicyManager = new PlacementPolicyManager(this.twillSpec.getPlacementPolicies());
        this.amLiveNode = new ApplicationMasterLiveNodeData(Integer.parseInt(System.getenv("YARN_APP_ID")), Long.parseLong(System.getenv("YARN_APP_ID_CLUSTER_TIME")), yarnAMClient.getContainerId().toString());
        this.expectedContainers = initExpectedContainers(this.twillSpec);
        this.runningContainers = initRunningContainers(yarnAMClient.getContainerId(), yarnAMClient.getHost());
        this.eventHandler = createEventHandler(this.twillSpec);
        this.restartingRunnables = new ConcurrentSkipListSet();
    }

    private JvmOptions loadJvmOptions() throws IOException {
        final File file = new File("jvm.opts");
        return !file.exists() ? new JvmOptions((String) null, JvmOptions.DebugOptions.NO_DEBUG) : JvmOptionsCodec.decode(new InputSupplier<Reader>() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.1
            /* renamed from: getInput, reason: merged with bridge method [inline-methods] */
            public Reader m232getInput() throws IOException {
                return new FileReader(file);
            }
        });
    }

    private int getReservedMemory() {
        String str = System.getenv("TWILL_RESERVED_MEMORY_MB");
        if (str == null) {
            return 200;
        }
        try {
            return Integer.parseInt(str);
        } catch (Exception e) {
            return 200;
        }
    }

    private EventHandler createEventHandler(TwillSpecification twillSpecification) {
        try {
            Class<?> loadClass = getClass().getClassLoader().loadClass(twillSpecification.getEventHandler().getClassName());
            Preconditions.checkArgument(EventHandler.class.isAssignableFrom(loadClass), "Class {} does not implements {}", new Object[]{loadClass, EventHandler.class.getName()});
            return (EventHandler) Instances.newInstance(loadClass);
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    private RunningContainers initRunningContainers(ContainerId containerId, String str) throws Exception {
        return new RunningContainers(containerId.getApplicationAttemptId().getApplicationId().toString(), new DefaultTwillRunResources(0, containerId.toString(), Integer.parseInt(System.getenv("YARN_CONTAINER_VIRTUAL_CORES")), Integer.parseInt(System.getenv("YARN_CONTAINER_MEMORY_MB")), str, (Integer) null, (LogEntry.Level) null), this.zkClient);
    }

    private ExpectedContainers initExpectedContainers(TwillSpecification twillSpecification) {
        HashMap newHashMap = Maps.newHashMap();
        for (RuntimeSpecification runtimeSpecification : twillSpecification.getRunnables().values()) {
            newHashMap.put(runtimeSpecification.getName(), Integer.valueOf(runtimeSpecification.getResourceSpecification().getInstances()));
        }
        return new ExpectedContainers(newHashMap);
    }

    /* renamed from: get, reason: merged with bridge method [inline-methods] */
    public ResourceReport m231get() {
        return this.runningContainers.getResourceReport();
    }

    protected void doStart() throws Exception {
        LOG.info("Start application master with spec: " + TwillSpecificationAdapter.create().toJson(this.twillSpec));
        this.eventHandler.initialize(new BasicEventHandlerContext(this.twillSpec.getEventHandler()));
        this.instanceChangeExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("instanceChanger"));
        this.zkClient.create("/" + this.runId.getId() + "/runnables", (byte[]) null, CreateMode.PERSISTENT).get();
        this.runningContainers.addWatcher("/discoverable");
        this.runnableContainerRequests = initContainerRequests();
    }

    protected void doStop() throws Exception {
        Thread.interrupted();
        LOG.info("Stop application master with spec: {}", TwillSpecificationAdapter.create().toJson(this.twillSpec));
        try {
            this.eventHandler.destroy();
        } catch (Throwable th) {
            LOG.warn("Exception when calling {}.destroy()", this.twillSpec.getEventHandler().getClassName(), th);
        }
        this.instanceChangeExecutor.shutdownNow();
        final HashSet newHashSet = Sets.newHashSet(this.runningContainers.getContainerIds());
        final YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.2
            public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> list) {
            }

            public void completed(List<YarnContainerStatus> list) {
                for (YarnContainerStatus yarnContainerStatus : list) {
                    ApplicationMasterService.this.handleCompleted(list);
                    newHashSet.remove(yarnContainerStatus.getContainerId());
                }
            }
        };
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor(Threads.createDaemonThreadFactory("stopPoller"));
        newSingleThreadExecutor.execute(new Runnable() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.3
            @Override // java.lang.Runnable
            public void run() {
                while (!newHashSet.isEmpty()) {
                    try {
                        ApplicationMasterService.this.amClient.allocate(0.0f, allocateHandler);
                        TimeUnit.SECONDS.sleep(1L);
                    } catch (Exception e) {
                        ApplicationMasterService.LOG.error("Got exception while getting heartbeat", e);
                    }
                }
            }
        });
        this.runningContainers.stopAll();
        newSingleThreadExecutor.shutdownNow();
        cleanupDir();
    }

    protected Object getLiveNodeData() {
        return this.amLiveNode;
    }

    public ListenableFuture<String> onReceived(String str, Message message) {
        LOG.debug("Message received: {} {}.", str, message);
        SettableFuture<String> create = SettableFuture.create();
        Runnable messageCompletion = getMessageCompletion(str, create);
        if (handleSecureStoreUpdate(message)) {
            this.runningContainers.sendToAll(message, messageCompletion);
            return create;
        }
        if (!handleSetInstances(message, messageCompletion) && !handleRestartRunnablesInstances(message, messageCompletion)) {
            if (message.getScope() == Message.Scope.ALL_RUNNABLE) {
                this.runningContainers.sendToAll(message, messageCompletion);
                return create;
            }
            if (message.getScope() == Message.Scope.RUNNABLE) {
                this.runningContainers.sendToRunnable(message.getRunnableName(), message, messageCompletion);
                return create;
            }
            LOG.info("Message ignored. {}", message);
            return Futures.immediateFuture(str);
        }
        return create;
    }

    protected void triggerShutdown() {
        this.stopped = true;
    }

    private void cleanupDir() {
        try {
            if (this.applicationLocation.delete(true)) {
                LOG.info("Application directory deleted: {}", this.applicationLocation);
            } else {
                LOG.warn("Failed to cleanup directory {}.", this.applicationLocation);
            }
        } catch (Exception e) {
            LOG.warn("Exception while cleanup directory {}.", this.applicationLocation, e);
        }
    }

    protected void doRun() throws Exception {
        Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> entry = null;
        final LinkedList newLinkedList = Lists.newLinkedList();
        YarnAMClient.AllocateHandler allocateHandler = new YarnAMClient.AllocateHandler() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.4
            public void acquired(List<? extends ProcessLauncher<YarnContainerInfo>> list) {
                ApplicationMasterService.this.launchRunnable(list, newLinkedList);
            }

            public void completed(List<YarnContainerStatus> list) {
                ApplicationMasterService.this.handleCompleted(list);
            }
        };
        long j = 0;
        boolean z = false;
        long currentTimeMillis = System.currentTimeMillis() + 30000;
        while (!this.stopped) {
            this.amClient.allocate(0.0f, allocateHandler);
            if (newLinkedList.isEmpty() && this.runnableContainerRequests.isEmpty() && this.runningContainers.isEmpty() && this.restartingRunnables.isEmpty()) {
                LOG.info("All containers completed. Shutting down application master.");
                return;
            }
            while (newLinkedList.isEmpty() && entry == null && !this.runnableContainerRequests.isEmpty()) {
                entry = this.runnableContainerRequests.peek().takeRequest();
                if (entry == null) {
                    this.runnableContainerRequests.poll();
                }
            }
            if (newLinkedList.isEmpty() && entry != null) {
                manageBlacklist(entry);
                addContainerRequests(entry.getKey().getResource(), entry.getValue(), newLinkedList, entry.getKey().getType());
                entry = null;
                j = System.currentTimeMillis();
                z = false;
            }
            if (!newLinkedList.isEmpty() && !z && System.currentTimeMillis() - j > 5000) {
                LOG.info("Relaxing provisioning constraints for request {}", newLinkedList.peek().getRequestId());
                this.amClient.clearBlacklist();
                z = true;
            }
            currentTimeMillis = checkProvisionTimeout(currentTimeMillis);
            if (isRunning()) {
                TimeUnit.SECONDS.sleep(1L);
            }
        }
    }

    private void manageBlacklist(Map.Entry<AllocationSpecification, ? extends Collection<RuntimeSpecification>> entry) {
        this.amClient.clearBlacklist();
        AllocationSpecification key = entry.getKey();
        if (key.getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME) && this.placementPolicyManager.getPlacementPolicy(key.getRunnableName()).getType().equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
            Iterator it = this.placementPolicyManager.getFellowRunnables(entry.getKey().getRunnableName()).iterator();
            while (it.hasNext()) {
                for (ContainerInfo containerInfo : this.runningContainers.getContainerInfo((String) it.next())) {
                    this.amClient.addToBlacklist(containerInfo.getHost().getHostName());
                    this.amClient.addToBlacklist(containerInfo.getHost().getHostName() + ":" + containerInfo.getPort());
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleCompleted(List<YarnContainerStatus> list) {
        Multiset<String> create = HashMultiset.create();
        for (YarnContainerStatus yarnContainerStatus : list) {
            LOG.info("Container {} completed with {}:{}.", new Object[]{yarnContainerStatus.getContainerId(), yarnContainerStatus.getState(), yarnContainerStatus.getDiagnostics()});
            this.runningContainers.handleCompleted(yarnContainerStatus, create);
        }
        for (Multiset.Entry entry : create.entrySet()) {
            LOG.info("Re-request container for {} with {} instances.", entry.getElement(), Integer.valueOf(entry.getCount()));
            for (int i = 0; i < entry.getCount(); i++) {
                this.runnableContainerRequests.add(createRunnableContainerRequest((String) entry.getElement()));
            }
        }
        this.expectedContainers.updateRequestTime(create.elementSet());
    }

    private long checkProvisionTimeout(long j) {
        if (System.currentTimeMillis() < j) {
            return j;
        }
        Map all = this.expectedContainers.getAll();
        Map<String, Integer> countAll = this.runningContainers.countAll();
        Map<String, Integer> completedContainerCount = this.runningContainers.getCompletedContainerCount();
        ArrayList newArrayList = Lists.newArrayList();
        for (Map.Entry entry : all.entrySet()) {
            String str = (String) entry.getKey();
            ExpectedContainers.ExpectedCount expectedCount = (ExpectedContainers.ExpectedCount) entry.getValue();
            int intValue = countAll.containsKey(str) ? countAll.get(str).intValue() : 0;
            if (expectedCount.getCount() > intValue + (completedContainerCount.containsKey(str) ? completedContainerCount.get(str).intValue() : 0)) {
                newArrayList.add(new EventHandler.TimeoutEvent(str, expectedCount.getCount(), intValue, expectedCount.getTimestamp()));
            }
        }
        if (!newArrayList.isEmpty()) {
            try {
                EventHandler.TimeoutAction launchTimeout = this.eventHandler.launchTimeout(newArrayList);
                if (launchTimeout.getTimeout() >= 0) {
                    return j + launchTimeout.getTimeout();
                }
                stop();
            } catch (Throwable th) {
                LOG.warn("Exception when calling EventHandler {}. Ignore the result.", th);
            }
        }
        return j + 30000;
    }

    private Credentials createCredentials() {
        Credentials credentials = new Credentials();
        if (!UserGroupInformation.isSecurityEnabled()) {
            return credentials;
        }
        try {
            credentials.addAll(UserGroupInformation.getCurrentUser().getCredentials());
            Iterator it = credentials.getAllTokens().iterator();
            while (it.hasNext()) {
                if (((Token) it.next()).getKind().equals(AMRM_TOKEN_KIND_NAME)) {
                    it.remove();
                }
            }
        } catch (IOException e) {
            LOG.warn("Failed to get current user. No credentials will be provided to containers.", e);
        }
        return credentials;
    }

    private Queue<RunnableContainerRequest> initContainerRequests() {
        LinkedList newLinkedList = Lists.newLinkedList();
        for (TwillSpecification.Order order : this.twillSpec.getOrders()) {
            Set<String> distributedRunnables = this.placementPolicyManager.getDistributedRunnables(order.getNames());
            HashSet newHashSet = Sets.newHashSet();
            newHashSet.addAll(order.getNames());
            newHashSet.removeAll(distributedRunnables);
            HashMap newHashMap = Maps.newHashMap();
            for (String str : distributedRunnables) {
                RuntimeSpecification runtimeSpecification = (RuntimeSpecification) this.twillSpec.getRunnables().get(str);
                Resource createCapability = createCapability(runtimeSpecification.getResourceSpecification());
                for (int i = 0; i < runtimeSpecification.getResourceSpecification().getInstances(); i++) {
                    addAllocationSpecification(new AllocationSpecification(createCapability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, str, i), newHashMap, runtimeSpecification);
                }
            }
            Iterator it = newHashSet.iterator();
            while (it.hasNext()) {
                RuntimeSpecification runtimeSpecification2 = (RuntimeSpecification) this.twillSpec.getRunnables().get((String) it.next());
                addAllocationSpecification(new AllocationSpecification(createCapability(runtimeSpecification2.getResourceSpecification())), newHashMap, runtimeSpecification2);
            }
            newLinkedList.add(new RunnableContainerRequest(order.getType(), newHashMap));
        }
        return newLinkedList;
    }

    private void addAllocationSpecification(AllocationSpecification allocationSpecification, Map<AllocationSpecification, Collection<RuntimeSpecification>> map, RuntimeSpecification runtimeSpecification) {
        if (!map.containsKey(allocationSpecification)) {
            map.put(allocationSpecification, Lists.newLinkedList());
        }
        map.get(allocationSpecification).add(runtimeSpecification);
    }

    private void addContainerRequests(Resource resource, Collection<RuntimeSpecification> collection, Queue<ProvisionRequest> queue, AllocationSpecification.Type type) {
        for (RuntimeSpecification runtimeSpecification : collection) {
            String name = runtimeSpecification.getName();
            int expected = this.expectedContainers.getExpected(name) - this.runningContainers.count(name);
            if (expected > 0) {
                if (type.equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
                    expected = 1;
                }
                TwillSpecification.PlacementPolicy placementPolicy = this.placementPolicyManager.getPlacementPolicy(name);
                Collection newHashSet = Sets.newHashSet();
                Collection newHashSet2 = Sets.newHashSet();
                if (placementPolicy != null) {
                    newHashSet = placementPolicy.getHosts();
                    newHashSet2 = placementPolicy.getRacks();
                }
                LOG.info("Request {} container with capability {} for runnable {}", new Object[]{Integer.valueOf(expected), resource, name});
                queue.add(new ProvisionRequest(runtimeSpecification, this.amClient.addContainerRequest(resource, expected).addHosts(newHashSet).addRacks(newHashSet2).setPriority(0).apply(), expected, type));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void launchRunnable(List<? extends ProcessLauncher<YarnContainerInfo>> list, Queue<ProvisionRequest> queue) {
        for (ProcessLauncher<YarnContainerInfo> processLauncher : list) {
            LOG.info("Got container {}", ((YarnContainerInfo) processLauncher.getContainerInfo()).getId());
            ProvisionRequest peek = queue.peek();
            if (peek != null) {
                String name = peek.getRuntimeSpec().getName();
                LOG.info("Starting runnable {} with {}", name, processLauncher);
                LOG.debug("Log level for Twill runnable {} is {}", name, System.getenv("TWILL_APP_LOG_LEVEL"));
                int expected = this.expectedContainers.getExpected(name);
                this.runningContainers.start(name, (ContainerInfo) processLauncher.getContainerInfo(), new TwillContainerLauncher((RuntimeSpecification) this.twillSpec.getRunnables().get(name), processLauncher.prepareLaunch(ImmutableMap.builder().put("TWILL_APP_DIR", System.getenv("TWILL_APP_DIR")).put("TWILL_FS_USER", System.getenv("TWILL_FS_USER")).put("TWILL_APP_RUN_ID", this.runId.getId()).put("TWILL_APP_NAME", this.twillSpec.getName()).put("TWILL_APP_LOG_LEVEL", System.getenv("TWILL_APP_LOG_LEVEL")).put("TWILL_ZK_CONNECT", this.zkClient.getConnectString()).put("TWILL_LOG_KAFKA_ZK", getKafkaZKConnect()).build(), getLocalizeFiles(), this.credentials), ZKClients.namespace(this.zkClient, getZKNamespace(name)), expected, this.jvmOpts, this.reservedMemory, getSecureStoreLocation()));
                if (peek.containerAcquired()) {
                    this.amClient.completeContainerRequest(peek.getRequestId());
                }
                if (this.expectedContainers.getExpected(name) == this.runningContainers.count(name) || queue.peek().getType().equals(AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME)) {
                    queue.poll();
                }
                if (this.expectedContainers.getExpected(name) == this.runningContainers.count(name)) {
                    LOG.info("Runnable " + name + " fully provisioned with " + expected + " instances.");
                }
            }
        }
    }

    /* JADX WARN: Type inference failed for: r2v2, types: [org.apache.twill.internal.appmaster.ApplicationMasterService$5] */
    private List<LocalFile> getLocalizeFiles() {
        try {
            BufferedReader newReader = Files.newReader(new File("localizeFiles.json"), Charsets.UTF_8);
            Throwable th = null;
            try {
                List<LocalFile> list = (List) new GsonBuilder().registerTypeAdapter(LocalFile.class, new LocalFileCodec()).create().fromJson(newReader, new TypeToken<List<LocalFile>>() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.5
                }.getType());
                if (newReader != null) {
                    if (0 != 0) {
                        try {
                            newReader.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        newReader.close();
                    }
                }
                return list;
            } finally {
            }
        } catch (IOException e) {
            throw Throwables.propagate(e);
        }
    }

    private String getZKNamespace(String str) {
        return String.format("/%s/runnables/%s", this.runId.getId(), str);
    }

    private String getKafkaZKConnect() {
        return String.format("%s/%s/kafka", this.zkClient.getConnectString(), this.runId.getId());
    }

    private boolean handleSetInstances(Message message, Runnable runnable) {
        if (message.getType() != Message.Type.SYSTEM || message.getScope() != Message.Scope.RUNNABLE) {
            return false;
        }
        Command command = message.getCommand();
        Map options = command.getOptions();
        if (!ProgramOptionConstants.INSTANCES.equals(command.getCommand()) || !options.containsKey("count")) {
            return false;
        }
        String runnableName = message.getRunnableName();
        if (runnableName == null || runnableName.isEmpty() || !this.twillSpec.getRunnables().containsKey(runnableName)) {
            LOG.info("Unknown runnable {}", runnableName);
            return false;
        }
        int parseInt = Integer.parseInt((String) options.get("count"));
        int expected = this.expectedContainers.getExpected(runnableName);
        LOG.info("Received change instances request for {}, from {} to {}.", new Object[]{runnableName, Integer.valueOf(expected), Integer.valueOf(parseInt)});
        if (parseInt == expected) {
            runnable.run();
            return true;
        }
        this.instanceChangeExecutor.execute(createSetInstanceRunnable(message, runnable, expected, parseInt));
        return true;
    }

    private Runnable createSetInstanceRunnable(final Message message, final Runnable runnable, final int i, final int i2) {
        return new Runnable() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.6
            /* JADX WARN: Finally extract failed */
            @Override // java.lang.Runnable
            public void run() {
                String runnableName = message.getRunnableName();
                ApplicationMasterService.LOG.info("Processing change instance request for {}, from {} to {}.", new Object[]{runnableName, Integer.valueOf(i), Integer.valueOf(i2)});
                try {
                    ApplicationMasterService.this.runningContainers.waitForCount(runnableName, i);
                    ApplicationMasterService.LOG.info("Confirmed {} containers running for {}.", Integer.valueOf(i), runnableName);
                    ApplicationMasterService.this.expectedContainers.setExpected(runnableName, i2);
                    try {
                        if (i2 < i) {
                            for (int i3 = 0; i3 < i - i2; i3++) {
                                ApplicationMasterService.this.runningContainers.stopLastAndWait(runnableName);
                            }
                        } else {
                            ApplicationMasterService.this.runnableContainerRequests.add(ApplicationMasterService.this.createRunnableContainerRequest(runnableName, i2 - i));
                        }
                        ApplicationMasterService.this.runningContainers.sendToRunnable(runnableName, message, runnable);
                        ApplicationMasterService.LOG.info("Change instances request completed. From {} to {}.", Integer.valueOf(i), Integer.valueOf(i2));
                    } catch (Throwable th) {
                        ApplicationMasterService.this.runningContainers.sendToRunnable(runnableName, message, runnable);
                        ApplicationMasterService.LOG.info("Change instances request completed. From {} to {}.", Integer.valueOf(i), Integer.valueOf(i2));
                        throw th;
                    }
                } catch (InterruptedException e) {
                    runnable.run();
                }
            }
        };
    }

    private RunnableContainerRequest createRunnableContainerRequest(String str) {
        return createRunnableContainerRequest(str, 1);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public RunnableContainerRequest createRunnableContainerRequest(final String str, int i) {
        TwillSpecification.Order order = (TwillSpecification.Order) Iterables.find(this.twillSpec.getOrders(), new Predicate<TwillSpecification.Order>() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.7
            public boolean apply(TwillSpecification.Order order2) {
                return order2.getNames().contains(str);
            }
        });
        RuntimeSpecification runtimeSpecification = (RuntimeSpecification) this.twillSpec.getRunnables().get(str);
        Resource createCapability = createCapability(runtimeSpecification.getResourceSpecification());
        HashMap newHashMap = Maps.newHashMap();
        if (this.placementPolicyManager.getPlacementPolicyType(str).equals(TwillSpecification.PlacementPolicy.Type.DISTRIBUTED)) {
            for (int i2 = 0; i2 < i; i2++) {
                addAllocationSpecification(new AllocationSpecification(createCapability, AllocationSpecification.Type.ALLOCATE_ONE_INSTANCE_AT_A_TIME, str, i2), newHashMap, runtimeSpecification);
            }
        } else {
            addAllocationSpecification(new AllocationSpecification(createCapability), newHashMap, runtimeSpecification);
        }
        return new RunnableContainerRequest(order.getType(), newHashMap);
    }

    private Runnable getMessageCompletion(final String str, final SettableFuture<String> settableFuture) {
        return new Runnable() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.8
            @Override // java.lang.Runnable
            public void run() {
                settableFuture.set(str);
            }
        };
    }

    private Resource createCapability(ResourceSpecification resourceSpecification) {
        Resource resource = (Resource) Records.newRecord(Resource.class);
        if (!YarnUtils.setVirtualCores(resource, resourceSpecification.getVirtualCores())) {
            LOG.debug("Virtual cores limit not supported.");
        }
        resource.setMemory(resourceSpecification.getMemorySize());
        return resource;
    }

    /* JADX WARN: Type inference failed for: r2v1, types: [org.apache.twill.internal.appmaster.ApplicationMasterService$9] */
    private boolean handleRestartRunnablesInstances(Message message, Runnable runnable) {
        LOG.debug("Check if it should process a restart runnable instances.");
        if (message.getType() != Message.Type.SYSTEM) {
            return false;
        }
        Message.Scope scope = message.getScope();
        if (scope != Message.Scope.RUNNABLE && scope != Message.Scope.RUNNABLES) {
            return false;
        }
        Command command = message.getCommand();
        if (!"restartAllRunnableInstances".equals(command.getCommand()) && !"restartRunnablesInstances".equals(command.getCommand())) {
            return false;
        }
        LOG.debug("Processing restart runnable instances message {}.", message);
        if (Strings.isNullOrEmpty(message.getRunnableName()) || message.getScope() != Message.Scope.RUNNABLE) {
            for (Map.Entry entry : command.getOptions().entrySet()) {
                String str = (String) entry.getKey();
                Set<Integer> set = (Set) GSON.fromJson((String) entry.getValue(), new TypeToken<Set<Integer>>() { // from class: org.apache.twill.internal.appmaster.ApplicationMasterService.9
                }.getType());
                LOG.debug("Start restarting runnable {} instances {}", str, set);
                restartRunnableInstances(str, set);
            }
        } else {
            String runnableName = message.getRunnableName();
            LOG.debug("Start restarting all runnable {} instances.", runnableName);
            restartRunnableInstances(runnableName, null);
        }
        runnable.run();
        return true;
    }

    private void restartRunnableInstances(String str, @Nullable Set<Integer> set) {
        LOG.debug("Begin restart runnable {} instances.", str);
        int count = this.runningContainers.count(str);
        ImmutableSet copyOf = set == null ? null : ImmutableSet.copyOf(set);
        if (copyOf == null) {
            copyOf = Ranges.closedOpen(0, Integer.valueOf(count)).asSet(DiscreteDomains.integers());
        }
        Iterator it = copyOf.iterator();
        while (it.hasNext()) {
            int intValue = ((Integer) it.next()).intValue();
            LOG.debug("Stop instance {} for runnable {}", Integer.valueOf(intValue), str);
            try {
                this.restartingRunnables.add(str + "-" + intValue);
                this.runningContainers.stopByIdAndWait(str, intValue);
            } catch (Exception e) {
                LOG.info("Exception thrown when stopping instance {} probably already stopped.", Integer.valueOf(intValue));
            }
        }
        LOG.info("Restarting instances {} for runnable {}", copyOf, str);
        this.runnableContainerRequests.add(createRunnableContainerRequest(str, copyOf.size()));
        this.expectedContainers.updateRequestTime(Collections.singleton(str));
        this.restartingRunnables.clear();
    }
}
