package co.cask.cdap.internal.app.runtime.distributed;

import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.api.metrics.MetricsContext;
import co.cask.cdap.app.metrics.MapReduceMetrics;
import co.cask.cdap.app.program.ProgramDescriptor;
import co.cask.cdap.app.runtime.AbstractProgramRuntimeService;
import co.cask.cdap.app.runtime.ProgramController;
import co.cask.cdap.app.runtime.ProgramResourceReporter;
import co.cask.cdap.app.runtime.ProgramRunner;
import co.cask.cdap.app.runtime.ProgramRunnerFactory;
import co.cask.cdap.app.runtime.ProgramRuntimeService;
import co.cask.cdap.app.runtime.distributed.DistributedProgramControllerFactory;
import co.cask.cdap.app.store.Store;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.io.Locations;
import co.cask.cdap.common.security.Impersonator;
import co.cask.cdap.internal.app.program.ProgramTypeMetricTag;
import co.cask.cdap.internal.app.runtime.AbstractResourceReporter;
import co.cask.cdap.internal.app.runtime.artifact.ArtifactDetail;
import co.cask.cdap.internal.app.runtime.artifact.ArtifactRepository;
import co.cask.cdap.internal.app.runtime.service.SimpleRuntimeInfo;
import co.cask.cdap.internal.app.store.RunRecordMeta;
import co.cask.cdap.proto.Containers;
import co.cask.cdap.proto.DistributedProgramLiveInfo;
import co.cask.cdap.proto.Id;
import co.cask.cdap.proto.NotRunningProgramLiveInfo;
import co.cask.cdap.proto.ProgramLiveInfo;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.ProgramType;
import co.cask.cdap.proto.id.ArtifactId;
import co.cask.cdap.proto.id.ProgramId;
import com.google.common.base.Predicate;
import com.google.common.base.Throwables;
import com.google.common.collect.HashBasedTable;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.inject.Inject;
import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import javax.annotation.Nullable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.twill.api.ResourceReport;
import org.apache.twill.api.RunId;
import org.apache.twill.api.TwillController;
import org.apache.twill.api.TwillRunResources;
import org.apache.twill.api.TwillRunner;
import org.apache.twill.internal.RunIds;
import org.apache.twill.internal.yarn.YarnUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService.class */
public final class DistributedProgramRuntimeService extends AbstractProgramRuntimeService {
    private static final Logger LOG = LoggerFactory.getLogger(DistributedProgramRuntimeService.class);
    private final TwillRunner twillRunner;
    private final ProgramRunnerFactory programRunnerFactory;
    private final Store store;
    private final ProgramResourceReporter resourceReporter;
    private final Impersonator impersonator;

    /* loaded from: input_file:co/cask/cdap/internal/app/runtime/distributed/DistributedProgramRuntimeService$ClusterResourceReporter.class */
    private class ClusterResourceReporter extends AbstractResourceReporter {
        private final YarnClient yarnClient;

        ClusterResourceReporter(MetricsCollectionService metricsCollectionService, Configuration configuration) {
            super(metricsCollectionService.getContext(ImmutableMap.of()));
            YarnClient createYarnClient = YarnClient.createYarnClient();
            createYarnClient.init(configuration);
            this.yarnClient = createYarnClient;
        }

        protected void startUp() throws Exception {
            super.startUp();
            this.yarnClient.start();
        }

        /* JADX INFO: Access modifiers changed from: protected */
        @Override // co.cask.cdap.internal.app.runtime.AbstractResourceReporter
        public void shutDown() throws Exception {
            this.yarnClient.stop();
            super.shutDown();
        }

        @Override // co.cask.cdap.app.runtime.ProgramResourceReporter
        public void reportResources() {
            for (TwillRunner.LiveInfo liveInfo : DistributedProgramRuntimeService.this.twillRunner.lookupLive()) {
                Map<String, String> metricContext = getMetricContext(liveInfo);
                if (metricContext != null) {
                    for (TwillController twillController : liveInfo.getControllers()) {
                        ResourceReport resourceReport = twillController.getResourceReport();
                        if (resourceReport != null) {
                            sendMetrics(ImmutableMap.builder().putAll(metricContext).put("run", twillController.getRunId().getId()).build(), 1, resourceReport.getAppMasterResources().getMemoryMB(), resourceReport.getAppMasterResources().getVirtualCores());
                        }
                    }
                }
            }
            reportYarnResources();
        }

        private void reportYarnResources() {
            try {
                long j = 0;
                long j2 = 0;
                long j3 = 0;
                long j4 = 0;
                for (NodeReport nodeReport : this.yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING})) {
                    Resource capability = nodeReport.getCapability();
                    Resource used = nodeReport.getUsed();
                    j += capability.getMemory();
                    j2 += YarnUtils.getVirtualCores(capability);
                    j3 += used.getMemory();
                    j4 += YarnUtils.getVirtualCores(used);
                }
                MetricsContext collector = getCollector();
                DistributedProgramRuntimeService.LOG.trace("YARN Cluster memory total={}MB, used={}MB", Long.valueOf(j), Long.valueOf(j3));
                collector.gauge("resources.total.memory", j);
                collector.gauge(MapReduceMetrics.METRIC_USED_MEMORY, j3);
                collector.gauge("resources.available.memory", j - j3);
                DistributedProgramRuntimeService.LOG.trace("YARN Cluster vcores total={}, used={}", Long.valueOf(j2), Long.valueOf(j4));
                collector.gauge("resources.total.vcores", j2);
                collector.gauge("resources.used.vcores", j4);
                collector.gauge("resources.available.vcores", j2 - j4);
            } catch (Exception e) {
                DistributedProgramRuntimeService.LOG.warn("Failed to gather YARN NodeReports", e);
            }
        }

        private Map<String, String> getMetricContext(TwillRunner.LiveInfo liveInfo) {
            ProgramId fromTwillAppName = TwillAppNames.fromTwillAppName(liveInfo.getApplicationName(), false);
            if (fromTwillAppName == null) {
                return null;
            }
            return DistributedProgramRuntimeService.getMetricsContext(fromTwillAppName.getType(), fromTwillAppName);
        }
    }

    @Inject
    DistributedProgramRuntimeService(ProgramRunnerFactory programRunnerFactory, TwillRunner twillRunner, Store store, MetricsCollectionService metricsCollectionService, Configuration configuration, CConfiguration cConfiguration, ArtifactRepository artifactRepository, Impersonator impersonator) {
        super(cConfiguration, programRunnerFactory, artifactRepository);
        this.programRunnerFactory = programRunnerFactory;
        this.twillRunner = twillRunner;
        this.store = store;
        this.resourceReporter = new ClusterResourceReporter(metricsCollectionService, configuration);
        this.impersonator = impersonator;
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    @Nullable
    protected ProgramRuntimeService.RuntimeInfo createRuntimeInfo(ProgramController programController, ProgramId programId) {
        if (programController instanceof AbstractTwillProgramController) {
            return new SimpleRuntimeInfo(programController, programId, ((AbstractTwillProgramController) programController).getTwillRunId());
        }
        return null;
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    protected void copyArtifact(ArtifactId artifactId, final ArtifactDetail artifactDetail, final File file) throws IOException {
        try {
            this.impersonator.doAs(artifactId.getParent(), new Callable<Void>() { // from class: co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRuntimeService.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    Locations.linkOrCopy(artifactDetail.getDescriptor().getLocation(), file);
                    return null;
                }
            });
        } catch (Exception e) {
            Throwables.propagateIfPossible(e, IOException.class);
            throw Throwables.propagate(e);
        }
    }

    private synchronized boolean isTwillRunIdCached(RunId runId) {
        Iterator<ProgramRuntimeService.RuntimeInfo> it = getRuntimeInfos().iterator();
        while (it.hasNext()) {
            if (runId.equals(it.next().getTwillRunId())) {
                return true;
            }
        }
        return false;
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService, co.cask.cdap.app.runtime.ProgramRuntimeService
    public synchronized ProgramRuntimeService.RuntimeInfo lookup(Id.Program program, RunId runId) {
        ProgramRuntimeService.RuntimeInfo lookup = super.lookup(program, runId);
        if (lookup != null) {
            return lookup;
        }
        for (TwillRunner.LiveInfo liveInfo : this.twillRunner.lookupLive()) {
            ProgramId fromTwillAppName = TwillAppNames.fromTwillAppName(liveInfo.getApplicationName(), false);
            if (fromTwillAppName != null && fromTwillAppName.toId().equals(program)) {
                RunRecordMeta run = this.store.getRun(program, runId.getId());
                if (run == null) {
                    return null;
                }
                if (run.getTwillRunId() == null) {
                    LOG.warn("Twill RunId does not exist for the program {}, runId {}", program, runId.getId());
                    return null;
                }
                RunId fromString = RunIds.fromString(run.getTwillRunId());
                for (TwillController twillController : liveInfo.getControllers()) {
                    if (twillController.getRunId().equals(fromString)) {
                        ProgramRuntimeService.RuntimeInfo createRuntimeInfo = createRuntimeInfo(program, twillController, runId);
                        if (createRuntimeInfo != null) {
                            updateRuntimeInfo(program.getType(), runId, createRuntimeInfo);
                        } else {
                            LOG.warn("Unable to find program for runId {}", runId);
                        }
                        return createRuntimeInfo;
                    }
                }
            }
        }
        return null;
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService, co.cask.cdap.app.runtime.ProgramRuntimeService
    public synchronized Map<RunId, ProgramRuntimeService.RuntimeInfo> list(ProgramType programType) {
        HashMap newHashMap = Maps.newHashMap();
        newHashMap.putAll(super.list(programType));
        HashBasedTable create = HashBasedTable.create();
        for (TwillRunner.LiveInfo liveInfo : this.twillRunner.lookupLive()) {
            ProgramId fromTwillAppName = TwillAppNames.fromTwillAppName(liveInfo.getApplicationName(), false);
            if (fromTwillAppName != null && programType.equals(fromTwillAppName.getType())) {
                for (TwillController twillController : liveInfo.getControllers()) {
                    RunId runId = twillController.getRunId();
                    if (!isTwillRunIdCached(runId)) {
                        create.put(fromTwillAppName.toId(), runId, twillController);
                    }
                }
            }
        }
        if (create.isEmpty()) {
            return ImmutableMap.copyOf(newHashMap);
        }
        final Set columnKeySet = create.columnKeySet();
        for (RunRecordMeta runRecordMeta : this.store.getRuns(ProgramRunStatus.RUNNING, new Predicate<RunRecordMeta>() { // from class: co.cask.cdap.internal.app.runtime.distributed.DistributedProgramRuntimeService.2
            public boolean apply(RunRecordMeta runRecordMeta2) {
                return runRecordMeta2.getTwillRunId() != null && columnKeySet.contains(RunIds.fromString(runRecordMeta2.getTwillRunId()));
            }
        })) {
            String twillRunId = runRecordMeta.getTwillRunId();
            if (twillRunId == null) {
                LOG.warn("No twill runId for in run record {}.", runRecordMeta);
            } else {
                RunId fromString = RunIds.fromString(twillRunId);
                RunId fromString2 = co.cask.cdap.common.app.RunIds.fromString(runRecordMeta.getPid());
                Map.Entry entry = (Map.Entry) ((Map) create.columnMap().get(fromString)).entrySet().iterator().next();
                ProgramRuntimeService.RuntimeInfo createRuntimeInfo = createRuntimeInfo((Id.Program) entry.getKey(), (TwillController) entry.getValue(), fromString2);
                if (createRuntimeInfo != null) {
                    newHashMap.put(fromString2, createRuntimeInfo);
                    updateRuntimeInfo(programType, fromString2, createRuntimeInfo);
                } else {
                    LOG.warn("Unable to find program {} {}", programType, entry.getKey());
                }
            }
        }
        return ImmutableMap.copyOf(newHashMap);
    }

    @Nullable
    private ProgramRuntimeService.RuntimeInfo createRuntimeInfo(Id.Program program, TwillController twillController, RunId runId) {
        try {
            ProgramController createController = createController(this.store.loadProgram(program), twillController, runId);
            if (createController == null) {
                return null;
            }
            return new SimpleRuntimeInfo(createController, program.toEntityId(), twillController.getRunId());
        } catch (Exception e) {
            return null;
        }
    }

    @Nullable
    private ProgramController createController(ProgramDescriptor programDescriptor, TwillController twillController, RunId runId) {
        ProgramId programId = programDescriptor.getProgramId();
        try {
            ProgramRunner create = this.programRunnerFactory.create(programId.getType());
            if (create instanceof DistributedProgramControllerFactory) {
                return ((DistributedProgramControllerFactory) create).createProgramController(twillController, programDescriptor, runId);
            }
            ResourceReport resourceReport = twillController.getResourceReport();
            LOG.error("Unable to create ProgramController for program {} for twill application {}. It is likely caused by invalid CDAP program runtime extension.", programId, resourceReport == null ? "'unknown twill application'" : resourceReport.getApplicationId());
            return null;
        } catch (IllegalArgumentException e) {
            LOG.error("Unsupported program type {} for program {}. It is likely caused by incorrect CDAP installation or upgrade to incompatible CDAP version", programId.getType(), programId);
            return null;
        }
    }

    @Override // co.cask.cdap.app.runtime.ProgramRuntimeService
    public ProgramLiveInfo getLiveInfo(Id.Program program) {
        String twillAppName = TwillAppNames.toTwillAppName(program.toEntityId());
        Iterator it = this.twillRunner.lookup(twillAppName).iterator();
        if (it.hasNext()) {
            TwillController twillController = (TwillController) it.next();
            if (it.hasNext()) {
                LOG.warn("Expected at most one live instance of Twill app {} but found at least two.", twillAppName);
            }
            ResourceReport resourceReport = twillController.getResourceReport();
            if (resourceReport != null) {
                DistributedProgramLiveInfo distributedProgramLiveInfo = new DistributedProgramLiveInfo(program, resourceReport.getApplicationId());
                Containers.ContainerType valueOf = ProgramType.FLOW.equals(program.getType()) ? Containers.ContainerType.FLOWLET : Containers.ContainerType.valueOf(program.getType().name());
                for (Map.Entry entry : resourceReport.getResources().entrySet()) {
                    for (TwillRunResources twillRunResources : (Collection) entry.getValue()) {
                        distributedProgramLiveInfo.addContainer(new Containers.ContainerInfo(valueOf, (String) entry.getKey(), Integer.valueOf(twillRunResources.getInstanceId()), twillRunResources.getContainerId(), twillRunResources.getHost(), Integer.valueOf(twillRunResources.getMemoryMB()), Integer.valueOf(twillRunResources.getVirtualCores()), twillRunResources.getDebugPort()));
                    }
                }
                distributedProgramLiveInfo.addServices(resourceReport.getServices());
                return distributedProgramLiveInfo;
            }
        }
        return new NotRunningProgramLiveInfo(program);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<String, String> getMetricsContext(ProgramType programType, ProgramId programId) {
        return ImmutableMap.of("ns", programId.getNamespace(), "app", programId.getApplication(), ProgramTypeMetricTag.getTagName(programType), programId.getProgram());
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    protected void startUp() throws Exception {
        this.resourceReporter.start();
        LOG.debug("started distributed program runtime service");
    }

    @Override // co.cask.cdap.app.runtime.AbstractProgramRuntimeService
    protected void shutDown() throws Exception {
        this.resourceReporter.stop();
    }
}
