package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.flow.FlowSpecification;
import co.cask.cdap.api.flow.FlowletDefinition;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsCollector;
import co.cask.cdap.app.program.Program;
import co.cask.cdap.app.queue.QueueSpecification;
import co.cask.cdap.app.queue.QueueSpecificationGenerator;
import co.cask.cdap.app.runtime.AbstractProgramRuntimeService;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramResourceReporter;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.queue.QueueName;
import co.cask.cdap.data2.transaction.queue.QueueAdmin;
import co.cask.cdap.data2.transaction.stream.StreamAdmin;
import co.cask.cdap.internal.app.program.ProgramTypeMetricTag;
import co.cask.cdap.internal.app.queue.SimpleQueueSpecificationGenerator;
import co.cask.cdap.internal.app.runtime.AbstractResourceReporter;
import co.cask.cdap.internal.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.internal.app.runtime.flow.FlowUtils;
import co.cask.cdap.internal.app.runtime.service.SimpleRuntimeInfo;
import co.cask.cdap.internal.app.runtime.spark.metrics.SparkMetricsSink;
import co.cask.cdap.proto.Containers;
import co.cask.cdap.proto.DistributedProgramLiveInfo;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NotRunningProgramLiveInfo;
import co.cask.cdap.proto.ProgramLiveInfo;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.RunRecord;
import co.cask.tephra.TransactionExecutorFactory;
import com.google.common.base.Charsets;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Multimap;
import com.google.common.collect.Table;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonParseException;
import com.google.inject.Inject;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.internal.RunIds;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService.class */
public final class DistributedProgramRuntimeService extends AbstractProgramRuntimeService {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedProgramRuntimeService.class);
    private static final Pattern APP_NAME_PATTERN = Pattern.compile("^(\\S+)\\.(\\S+)\\.(\\S+)\\.(\\S+)$");
    private final TwillRunner twillRunner;
    private final Store store;
    private final QueueAdmin queueAdmin;
    private final StreamAdmin streamAdmin;
    private final TransactionExecutorFactory txExecutorFactory;
    private final ProgramResourceReporter resourceReporter;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRuntimeService$2, reason: invalid class name */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$co$cask$cdap$proto$ProgramType = new int[ProgramType.values().length];

        static {
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.FLOW.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.MAPREDUCE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.WORKFLOW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.WEBAPP.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.SERVICE.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$co$cask$cdap$proto$ProgramType[ProgramType.WORKER.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService$ClusterResourceReporter.class */
    public class ClusterResourceReporter extends AbstractResourceReporter {
        private static final String RM_CLUSTER_METRICS_PATH = "/ws/v1/cluster/metrics";
        private final Path hbasePath;
        private final Path namedspacedPath;
        private final PathFilter namespacedFilter;
        private final List<URL> rmUrls;
        private final String namespace;
        private FileSystem hdfs;

        /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService$ClusterResourceReporter$NamespacedPathFilter.class */
        private class NamespacedPathFilter implements PathFilter {
            private NamespacedPathFilter() {
            }

            public boolean accept(Path path) {
                return path.getName().startsWith(ClusterResourceReporter.this.namespace);
            }
        }

        public ClusterResourceReporter(MetricsCollectionService metricsCollectionService, Configuration configuration, CConfiguration cConfiguration) {
            super(metricsCollectionService.getCollector(ImmutableMap.of()));
            try {
                this.hdfs = FileSystem.get(configuration);
            } catch (IOException e) {
                DistributedProgramRuntimeService.LOG.error("unable to get hdfs, cluster storage metrics will be unavailable");
                this.hdfs = null;
            }
            this.namespace = cConfiguration.get("hdfs.namespace");
            this.namedspacedPath = new Path(this.namespace);
            this.hbasePath = new Path(configuration.get("hbase.rootdir"));
            this.namespacedFilter = new NamespacedPathFilter();
            List<URL> emptyList = Collections.emptyList();
            try {
                emptyList = getResourceManagerURLs(configuration);
            } catch (MalformedURLException e2) {
                DistributedProgramRuntimeService.LOG.error("webapp address for the resourcemanager is malformed. Cluster memory metrics will not be collected.", e2);
            }
            DistributedProgramRuntimeService.LOG.trace("RM urls determined... {}", emptyList);
            this.rmUrls = emptyList;
        }

        private List<URL> getResourceManagerURLs(Configuration configuration) throws MalformedURLException {
            ArrayList newArrayList = Lists.newArrayList();
            if (configuration.getBoolean("yarn.resourcemanager.ha.enabled", false)) {
                DistributedProgramRuntimeService.LOG.trace("HA RM is enabled, determining webapp urls...");
                for (String str : configuration.getStrings("yarn.resourcemanager.ha.rm-ids")) {
                    newArrayList.add(getResourceURL(configuration, str));
                }
            } else {
                DistributedProgramRuntimeService.LOG.trace("HA RM is not enabled, determining webapp url...");
                newArrayList.add(getResourceURL(configuration, null));
            }
            return newArrayList;
        }

        /* JADX WARN: String concatenation convert failed
        jadx.core.utils.exceptions.JadxRuntimeException: Can't remove SSA var: r7v2 java.lang.String, still in use, count: 1, list:
          (r7v2 java.lang.String) from STR_CONCAT (r7v2 java.lang.String), ("."), (r6v0 java.lang.String) A[MD:():java.lang.String (c), SYNTHETIC, WRAPPED]
        	at jadx.core.utils.InsnRemover.removeSsaVar(InsnRemover.java:151)
        	at jadx.core.utils.InsnRemover.unbindResult(InsnRemover.java:116)
        	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:80)
        	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
        	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
        	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
        	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
        	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
        	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
        	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
        	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
        	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
        	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
        	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
        	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
        	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
        	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
        	at jadx.core.utils.InsnRemover.unbindInsn(InsnRemover.java:79)
        	at jadx.core.utils.InsnRemover.unbindArgUsage(InsnRemover.java:163)
        	at jadx.core.utils.InsnRemover.unbindAllArgs(InsnRemover.java:95)
        	at jadx.core.dex.visitors.SimplifyVisitor.removeStringBuilderInsns(SimplifyVisitor.java:495)
        	at jadx.core.dex.visitors.SimplifyVisitor.convertStringBuilderChain(SimplifyVisitor.java:422)
        	at jadx.core.dex.visitors.SimplifyVisitor.convertInvoke(SimplifyVisitor.java:314)
        	at jadx.core.dex.visitors.SimplifyVisitor.simplifyInsn(SimplifyVisitor.java:145)
        	at jadx.core.dex.visitors.SimplifyVisitor.simplifyBlock(SimplifyVisitor.java:86)
        	at jadx.core.dex.visitors.SimplifyVisitor.visit(SimplifyVisitor.java:71)
         */
        private URL getResourceURL(Configuration configuration, String str) throws MalformedURLException {
            String str2;
            String str3;
            str2 = "yarn.resourcemanager.webapp.address";
            String str4 = configuration.get(str != null ? str2 + "." + str : "yarn.resourcemanager.webapp.address");
            if (str4 == null) {
                str4 = new StringBuilder().append(configuration.get(str != null ? str3 + "." + str : "yarn.resourcemanager.hostname")).append(":").append(8088).toString();
            }
            String str5 = "http://" + str4 + RM_CLUSTER_METRICS_PATH;
            DistributedProgramRuntimeService.LOG.trace("Adding {} as a rm address.", str5);
            return new URL(str5);
        }

        @Override // co.cask.cdap.app.runtime.ProgramResourceReporter
        public void reportResources() {
            for (TwillRunner.LiveInfo liveInfo : DistributedProgramRuntimeService.this.twillRunner.lookupLive()) {
                Map<String, String> metricContext = getMetricContext(liveInfo);
                if (metricContext != null) {
                    for (TwillController twillController : liveInfo.getControllers()) {
                        ResourceReport resourceReport = twillController.getResourceReport();
                        if (resourceReport != null) {
                            sendMetrics(ImmutableMap.builder().putAll(metricContext).put("run", twillController.getRunId().getId()).build(), 1, resourceReport.getAppMasterResources().getMemoryMB(), resourceReport.getAppMasterResources().getVirtualCores());
                        }
                    }
                }
            }
            reportClusterStorage();
            boolean z = false;
            Iterator<URL> it = this.rmUrls.iterator();
            while (true) {
                if (it.hasNext()) {
                    if (reportClusterMemory(it.next())) {
                        z = true;
                        break;
                    }
                } else {
                    break;
                }
            }
            if (z) {
                return;
            }
            DistributedProgramRuntimeService.LOG.warn("unable to get resource manager metrics, cluster memory metrics will be unavailable");
        }

        private boolean reportClusterMemory(URL url) {
            InputStreamReader inputStreamReader = null;
            HttpURLConnection httpURLConnection = null;
            DistributedProgramRuntimeService.LOG.trace("getting cluster memory from url {}", url);
            try {
                try {
                    httpURLConnection = (HttpURLConnection) url.openConnection();
                    httpURLConnection.setRequestMethod("GET");
                    inputStreamReader = new InputStreamReader(httpURLConnection.getInputStream(), Charsets.UTF_8);
                    try {
                        JsonObject jsonObject = (JsonObject) new Gson().fromJson(inputStreamReader, JsonObject.class);
                        if (jsonObject == null) {
                            if (inputStreamReader != null) {
                                try {
                                    inputStreamReader.close();
                                } catch (IOException e) {
                                    DistributedProgramRuntimeService.LOG.error("Exception closing reader", e);
                                }
                            }
                            if (httpURLConnection != null) {
                                httpURLConnection.disconnect();
                            }
                            return false;
                        }
                        JsonObject asJsonObject = jsonObject.getAsJsonObject("clusterMetrics");
                        long asLong = asJsonObject.get("totalMB").getAsLong();
                        long asLong2 = asJsonObject.get("availableMB").getAsLong();
                        MetricsCollector collector = getCollector();
                        DistributedProgramRuntimeService.LOG.trace("resource manager, total memory = " + asLong + " available = " + asLong2);
                        collector.gauge("resources.total.memory", asLong);
                        collector.gauge("resources.available.memory", asLong2);
                        if (inputStreamReader != null) {
                            try {
                                inputStreamReader.close();
                            } catch (IOException e2) {
                                DistributedProgramRuntimeService.LOG.error("Exception closing reader", e2);
                            }
                        }
                        if (httpURLConnection != null) {
                            httpURLConnection.disconnect();
                        }
                        return true;
                    } catch (JsonParseException e3) {
                        if (inputStreamReader != null) {
                            try {
                                inputStreamReader.close();
                            } catch (IOException e4) {
                                DistributedProgramRuntimeService.LOG.error("Exception closing reader", e4);
                            }
                        }
                        if (httpURLConnection != null) {
                            httpURLConnection.disconnect();
                        }
                        return false;
                    }
                } catch (Throwable th) {
                    if (inputStreamReader != null) {
                        try {
                            inputStreamReader.close();
                        } catch (IOException e5) {
                            DistributedProgramRuntimeService.LOG.error("Exception closing reader", e5);
                        }
                    }
                    if (httpURLConnection != null) {
                        httpURLConnection.disconnect();
                    }
                    throw th;
                }
            } catch (Exception e6) {
                DistributedProgramRuntimeService.LOG.error("Exception getting cluster memory from ", e6);
                if (inputStreamReader != null) {
                    try {
                        inputStreamReader.close();
                    } catch (IOException e7) {
                        DistributedProgramRuntimeService.LOG.error("Exception closing reader", e7);
                    }
                }
                if (httpURLConnection != null) {
                    httpURLConnection.disconnect();
                }
                return false;
            }
        }

        private void reportClusterStorage() {
            try {
                ContentSummary contentSummary = this.hdfs.getContentSummary(this.namedspacedPath);
                long spaceConsumed = contentSummary.getSpaceConsumed();
                long fileCount = contentSummary.getFileCount();
                long directoryCount = contentSummary.getDirectoryCount();
                for (FileStatus fileStatus : this.hdfs.listStatus(this.hbasePath, this.namespacedFilter)) {
                    ContentSummary contentSummary2 = this.hdfs.getContentSummary(fileStatus.getPath());
                    spaceConsumed += contentSummary2.getSpaceConsumed();
                    fileCount += contentSummary2.getFileCount();
                    directoryCount += contentSummary2.getDirectoryCount();
                }
                FsStatus status = this.hdfs.getStatus();
                long capacity = status.getCapacity();
                long remaining = status.getRemaining();
                MetricsCollector collector = getCollector();
                DistributedProgramRuntimeService.LOG.trace("total cluster storage = " + capacity + " total used = " + spaceConsumed);
                collector.gauge("resources.total.storage", (capacity / 1024) / 1024);
                collector.gauge("resources.available.storage", (remaining / 1024) / 1024);
                collector.gauge("resources.used.storage", (spaceConsumed / 1024) / 1024);
                collector.gauge("resources.used.files", fileCount);
                collector.gauge("resources.used.directories", directoryCount);
            } catch (IOException e) {
                DistributedProgramRuntimeService.LOG.warn("Exception getting hdfs metrics", e);
            }
        }

        private Map<String, String> getMetricContext(TwillRunner.LiveInfo liveInfo) {
            ProgramType type;
            Matcher matcher = DistributedProgramRuntimeService.APP_NAME_PATTERN.matcher(liveInfo.getApplicationName());
            if (matcher.matches() && (type = DistributedProgramRuntimeService.this.getType(matcher.group(1))) != null) {
                return DistributedProgramRuntimeService.getMetricsContext(type, Id.Program.from(matcher.group(2), matcher.group(3), type, matcher.group(4)));
            }
            return null;
        }
    }

    @Inject
    DistributedProgramRuntimeService(ProgramRunnerFactory programRunnerFactory, TwillRunner twillRunner, Store store, QueueAdmin queueAdmin, StreamAdmin streamAdmin, MetricsCollectionService metricsCollectionService, Configuration configuration, CConfiguration cConfiguration, TransactionExecutorFactory transactionExecutorFactory) {
        super(programRunnerFactory);
        this.twillRunner = twillRunner;
        this.store = store;
        this.queueAdmin = queueAdmin;
        this.streamAdmin = streamAdmin;
        this.txExecutorFactory = transactionExecutorFactory;
        this.resourceReporter = new ClusterResourceReporter(metricsCollectionService, configuration, cConfiguration);
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    protected ProgramRuntimeService.RuntimeInfo createRuntimeInfo(ProgramController programController, Program program) {
        if (programController instanceof AbstractTwillProgramController) {
            return new SimpleRuntimeInfo(programController, program, ((AbstractTwillProgramController) programController).getTwillRunId());
        }
        return null;
    }

    private synchronized boolean isTwillRunIdCached(RunId runId) {
        Iterator<ProgramRuntimeService.RuntimeInfo> it = getRuntimeInfos().iterator();
        while (it.hasNext()) {
            if (runId.equals(it.next().getTwillRunId())) {
                return true;
            }
        }
        return false;
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService, co.cask.cdap.app.runtime.ProgramRuntimeService
    public synchronized ProgramRuntimeService.RuntimeInfo lookup(Id.Program program, RunId runId) {
        ProgramRuntimeService.RuntimeInfo lookup = super.lookup(program, runId);
        if (lookup != null) {
            return lookup;
        }
        for (TwillRunner.LiveInfo liveInfo : this.twillRunner.lookupLive()) {
            Matcher matcher = APP_NAME_PATTERN.matcher(liveInfo.getApplicationName());
            if (matcher.matches()) {
                if (Id.Program.from(matcher.group(2), matcher.group(3), getType(matcher.group(1)), matcher.group(4)).equals(program)) {
                    RunRecord run = this.store.getRun(program, runId.getId());
                    if (run == null) {
                        return null;
                    }
                    if (run.getTwillRunId() == null) {
                        LOG.warn("Twill RunId does not exist for the program {}, runId {}", program, runId.getId());
                        return null;
                    }
                    RunId fromString = RunIds.fromString(run.getTwillRunId());
                    for (TwillController twillController : liveInfo.getControllers()) {
                        if (twillController.getRunId().equals(fromString)) {
                            ProgramRuntimeService.RuntimeInfo createRuntimeInfo = createRuntimeInfo(program.getType(), program, twillController, runId);
                            if (createRuntimeInfo != null) {
                                updateRuntimeInfo(program.getType(), runId, createRuntimeInfo);
                            } else {
                                LOG.warn("Unable to find program for runId {}", runId);
                            }
                            return createRuntimeInfo;
                        }
                    }
                } else {
                    continue;
                }
            }
        }
        return null;
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService, co.cask.cdap.app.runtime.ProgramRuntimeService
    public synchronized Map<RunId, ProgramRuntimeService.RuntimeInfo> list(ProgramType programType) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.putAll(super.list(programType));
        HashBasedTable create = HashBasedTable.create();
        for (TwillRunner.LiveInfo liveInfo : this.twillRunner.lookupLive()) {
            Matcher matcher = APP_NAME_PATTERN.matcher(liveInfo.getApplicationName());
            if (matcher.matches() && programType.equals(getType(matcher.group(1)))) {
                for (TwillController twillController : liveInfo.getControllers()) {
                    RunId runId = twillController.getRunId();
                    if (!isTwillRunIdCached(runId)) {
                        create.put(Id.Program.from(matcher.group(2), matcher.group(3), programType, matcher.group(4)), runId, twillController);
                    }
                }
            }
        }
        if (create.isEmpty()) {
            return ImmutableMap.copyOf(newHashMap);
        }
        final Set columnKeySet = create.columnKeySet();
        for (RunRecord runRecord : this.store.getRuns(ProgramRunStatus.RUNNING, new Predicate<RunRecord>() { // from class: co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRuntimeService.1
            public boolean apply(RunRecord runRecord2) {
                return runRecord2.getTwillRunId() != null && columnKeySet.contains(RunIds.fromString(runRecord2.getTwillRunId()));
            }
        })) {
            RunId fromString = RunIds.fromString(runRecord.getTwillRunId());
            RunId fromString2 = co.cask.cdap.common.app.RunIds.fromString(runRecord.getPid());
            Map.Entry entry = (Map.Entry) ((Map) create.columnMap().get(fromString)).entrySet().iterator().next();
            ProgramRuntimeService.RuntimeInfo createRuntimeInfo = createRuntimeInfo(programType, (Id.Program) entry.getKey(), (TwillController) entry.getValue(), fromString2);
            if (createRuntimeInfo != null) {
                newHashMap.put(fromString2, createRuntimeInfo);
                updateRuntimeInfo(programType, fromString2, createRuntimeInfo);
            } else {
                LOG.warn("Unable to find program {} {}", programType, entry.getKey());
            }
        }
        return ImmutableMap.copyOf(newHashMap);
    }

    private ProgramRuntimeService.RuntimeInfo createRuntimeInfo(ProgramType programType, Id.Program program, TwillController twillController, RunId runId) {
        try {
            Program loadProgram = this.store.loadProgram(program, programType);
            Preconditions.checkNotNull(loadProgram, "Program not found");
            ProgramController createController = createController(loadProgram, twillController, runId);
            if (createController == null) {
                return null;
            }
            return new SimpleRuntimeInfo(createController, program, twillController.getRunId());
        } catch (Exception e) {
            LOG.error("Got exception: ", e);
            return null;
        }
    }

    private ProgramController createController(Program program, TwillController twillController, RunId runId) {
        AbstractTwillProgramController abstractTwillProgramController = null;
        String id = program.getId().getId();
        switch (AnonymousClass2.$SwitchMap$co$cask$cdap$proto$ProgramType[program.getType().ordinal()]) {
            case SparkMetricsSink.CONSOLE_DEFAULT_PERIOD /* 1 */:
                abstractTwillProgramController = new FlowTwillProgramController(id, twillController, new DistributedFlowletInstanceUpdater(program, twillController, this.queueAdmin, this.streamAdmin, getFlowletQueues(program, program.getApplicationSpecification().getFlows().get(id)), this.txExecutorFactory), runId);
                break;
            case 2:
                abstractTwillProgramController = new MapReduceTwillProgramController(id, twillController, runId);
                break;
            case 3:
                abstractTwillProgramController = new WorkflowTwillProgramController(id, twillController, runId);
                break;
            case 4:
                abstractTwillProgramController = new WebappTwillProgramController(id, twillController, runId);
                break;
            case 5:
                abstractTwillProgramController = new ServiceTwillProgramController(id, twillController, new DistributedServiceRunnableInstanceUpdater(program, twillController), runId);
                break;
            case 6:
                abstractTwillProgramController = new WorkerTwillProgramController(id, twillController, runId);
                break;
        }
        if (abstractTwillProgramController == null) {
            return null;
        }
        return abstractTwillProgramController.startListen();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public ProgramType getType(String str) {
        try {
            return ProgramType.valueOf(str.toUpperCase());
        } catch (IllegalArgumentException e) {
            return null;
        }
    }

    private Multimap<String, QueueName> getFlowletQueues(Program program, FlowSpecification flowSpecification) {
        Table<QueueSpecificationGenerator.Node, String, Set<QueueSpecification>> create = new SimpleQueueSpecificationGenerator(Id.Application.from(program.getNamespaceId(), program.getApplicationId())).create(flowSpecification);
        ImmutableSetMultimap.Builder builder = ImmutableSetMultimap.builder();
        for (Map.Entry entry : flowSpecification.getFlowlets().entrySet()) {
            String str = (String) entry.getKey();
            FlowUtils.generateConsumerGroupId(program, str);
            ((FlowletDefinition) entry.getValue()).getInstances();
            Iterator it = Iterables.concat(create.column(str).values()).iterator();
            while (it.hasNext()) {
                builder.put(str, ((QueueSpecification) it.next()).getQueueName());
            }
        }
        return builder.build();
    }

    @Override // co.cask.cdap.app.runtime.ProgramRuntimeService
    public ProgramLiveInfo getLiveInfo(Id.Program program, ProgramType programType) {
        String format = String.format("%s.%s.%s.%s", programType.name().toLowerCase(), program.getNamespaceId(), program.getApplicationId(), program.getId());
        Iterator it = this.twillRunner.lookup(format).iterator();
        if (it.hasNext()) {
            TwillController twillController = (TwillController) it.next();
            if (it.hasNext()) {
                LOG.warn("Expected at most one live instance of Twill app {} but found at least two.", format);
            }
            ResourceReport resourceReport = twillController.getResourceReport();
            if (resourceReport != null) {
                DistributedProgramLiveInfo distributedProgramLiveInfo = new DistributedProgramLiveInfo(program, programType, resourceReport.getApplicationId());
                Containers.ContainerType valueOf = ProgramType.FLOW.equals(programType) ? Containers.ContainerType.FLOWLET : Containers.ContainerType.valueOf(programType.name());
                for (Map.Entry entry : resourceReport.getResources().entrySet()) {
                    for (TwillRunResources twillRunResources : (Collection) entry.getValue()) {
                        distributedProgramLiveInfo.addContainer(new Containers.ContainerInfo(valueOf, (String) entry.getKey(), twillRunResources.getInstanceId(), twillRunResources.getContainerId(), twillRunResources.getHost(), twillRunResources.getMemoryMB(), twillRunResources.getVirtualCores(), twillRunResources.getDebugPort()));
                    }
                }
                distributedProgramLiveInfo.addServices(resourceReport.getServices());
                return distributedProgramLiveInfo;
            }
        }
        return new NotRunningProgramLiveInfo(program, programType);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, String> getMetricsContext(ProgramType programType, Id.Program program) {
        return ImmutableMap.of("ns", program.getNamespaceId(), "app", program.getApplicationId(), ProgramTypeMetricTag.getTagName(programType), program.getId());
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    protected void startUp() throws Exception {
        this.resourceReporter.start();
        LOG.debug("started distributed program runtime service");
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    protected void shutDown() throws Exception {
        this.resourceReporter.stop();
    }
}
