package co.cask.cdap.internal.app.runtime.monitor;

import co.cask.cdap.api.Transactional;
import co.cask.cdap.api.Transactionals;
import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.api.messaging.MessagingContext;
import co.cask.cdap.api.retry.RetryableException;
import co.cask.cdap.app.runtime.ProgramStateWriter;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.service.AbstractRetryableScheduledService;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.data2.dataset2.DatasetFramework;
import co.cask.cdap.internal.app.runtime.ProgramOptionConstants;
import co.cask.cdap.internal.app.runtime.distributed.remote.RemoteProcessController;
import co.cask.cdap.internal.app.runtime.distributed.remote.RemoteRuntimeDataset;
import co.cask.cdap.internal.app.store.AppMetadataStore;
import co.cask.cdap.internal.profile.ProfileMetricService;
import co.cask.cdap.logging.remote.RemoteExecutionLogProcessor;
import co.cask.cdap.messaging.data.MessageId;
import co.cask.cdap.proto.Notification;
import co.cask.cdap.proto.ProgramRunStatus;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.ProgramRunId;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/internal/app/runtime/monitor/RuntimeMonitor.class */
public class RuntimeMonitor extends AbstractRetryableScheduledService {
    private static final Logger LOG = LoggerFactory.getLogger(RuntimeMonitor.class);
    private static final Gson GSON = new Gson();
    private final RuntimeMonitorClient monitorClient;
    private final int limit;
    private final ProgramRunId programRunId;
    private final CConfiguration cConf;
    private final Map<String, String> requestKeyToLocalTopic;
    private final long pollTimeMillis;
    private final long gracefulShutdownMillis;
    private final Deque<MonitorMessage> lastProgramStateMessages;
    private final DatasetFramework datasetFramework;
    private final Transactional transactional;
    private final MessagingContext messagingContext;
    private final ScheduledExecutorService scheduledExecutorService;
    private final RemoteExecutionLogProcessor logProcessor;
    private final ProfileMetricService metricScheduledService;
    private final RemoteProcessController remoteProcessController;
    private final ProgramStateWriter programStateWriter;
    private Map<String, MonitorConsumeRequest> topicsToRequest;
    private long programFinishTime;

    public RuntimeMonitor(ProgramRunId programRunId, CConfiguration cConfiguration, RuntimeMonitorClient runtimeMonitorClient, DatasetFramework datasetFramework, Transactional transactional, MessagingContext messagingContext, ScheduledExecutorService scheduledExecutorService, RemoteExecutionLogProcessor remoteExecutionLogProcessor, ProfileMetricService profileMetricService, RemoteProcessController remoteProcessController, ProgramStateWriter programStateWriter) {
        super(RetryStrategies.fromConfiguration(cConfiguration, "system.runtime.monitor."));
        this.programRunId = programRunId;
        this.cConf = cConfiguration;
        this.monitorClient = runtimeMonitorClient;
        this.limit = cConfiguration.getInt("app.program.runtime.monitor.batch.size");
        this.pollTimeMillis = cConfiguration.getLong("app.program.runtime.monitor.polltime.ms");
        this.gracefulShutdownMillis = cConfiguration.getLong("app.program.runtime.monitor.graceful.shutdown.ms");
        this.topicsToRequest = new HashMap();
        this.datasetFramework = datasetFramework;
        this.messagingContext = messagingContext;
        this.transactional = transactional;
        this.scheduledExecutorService = scheduledExecutorService;
        this.logProcessor = remoteExecutionLogProcessor;
        this.programFinishTime = -1L;
        this.lastProgramStateMessages = new LinkedList();
        this.requestKeyToLocalTopic = createTopicConfigs(cConfiguration);
        this.metricScheduledService = profileMetricService;
        this.remoteProcessController = remoteProcessController;
        this.programStateWriter = programStateWriter;
    }

    protected ScheduledExecutorService executor() {
        return this.scheduledExecutorService;
    }

    protected void doStartUp() {
        LOG.debug("Start monitoring program run {}", this.programRunId);
        try {
            this.metricScheduledService.startAndWait();
        } catch (Exception e) {
            LOG.warn("Failed to start metrics service for program run {}, node minute metrics will not be updated.", this.programRunId);
        }
    }

    protected void doShutdown() {
        try {
            this.metricScheduledService.stopAndWait();
        } catch (Exception e) {
            LOG.warn("Failed to stop metrics service for program run {}", this.programRunId);
        }
        LOG.debug("Stopped monitoring program run {}", this.programRunId);
    }

    public void requestStop() throws Exception {
        this.monitorClient.requestStop();
    }

    protected boolean shouldRetry(Exception exc) {
        if (exc instanceof ServiceUnavailableException) {
            return true;
        }
        LOG.warn("Exception raised when fetching monitoring data from program run {}.", this.programRunId, exc);
        return true;
    }

    protected String getServiceName() {
        return "runtime-monitor-" + this.programRunId.getRun();
    }

    protected long runTask() throws Exception {
        if (this.topicsToRequest.isEmpty()) {
            this.topicsToRequest = initTopics(this.requestKeyToLocalTopic.keySet());
        }
        try {
            Map<String, Deque<MonitorMessage>> fetchMessages = this.monitorClient.fetchMessages(this.topicsToRequest);
            updateProgramFinishTime(fetchMessages);
            long longValue = ((Long) Transactionals.execute(this.transactional, datasetContext -> {
                return Long.valueOf(processResponse(fetchMessages, AppMetadataStore.create(this.cConf, datasetContext, this.datasetFramework)));
            })).longValue();
            fetchMessages.forEach(this::updateTopicToRequest);
            if (this.programFinishTime > 0) {
                long currentTimeMillis = System.currentTimeMillis();
                if ((longValue < 0 && currentTimeMillis - (this.gracefulShutdownMillis >> 1) > this.programFinishTime) || currentTimeMillis - this.gracefulShutdownMillis > this.programFinishTime) {
                    triggerRuntimeShutdown();
                    stop();
                }
            }
            return this.pollTimeMillis;
        } catch (ServiceUnavailableException | IOException e) {
            if (this.remoteProcessController.isRunning()) {
                return this.pollTimeMillis;
            }
            this.programStateWriter.error(this.programRunId, new IllegalStateException("Program runtime terminated abnormally. Please inspect logs for root cause.", e));
            clearStates();
            stop();
            return 0L;
        }
    }

    private void updateProgramFinishTime(Map<String, Deque<MonitorMessage>> map) {
        if (map.containsKey("program.status.event.topic")) {
            Deque<MonitorMessage> deque = map.get("program.status.event.topic");
            if (this.programFinishTime < 0) {
                this.programFinishTime = findProgramFinishTime(deque);
            }
            if (this.programFinishTime > 0) {
                this.lastProgramStateMessages.addAll(deque);
                updateTopicToRequest("program.status.event.topic", deque);
                map.remove("program.status.event.topic");
            }
        }
    }

    private Map<String, MonitorConsumeRequest> initTopics(Collection<String> collection) {
        HashMap hashMap = new HashMap();
        Transactionals.execute(this.transactional, datasetContext -> {
            AppMetadataStore create = AppMetadataStore.create(this.cConf, datasetContext, this.datasetFramework);
            Iterator it = collection.iterator();
            while (it.hasNext()) {
                String str = (String) it.next();
                hashMap.put(str, new MonitorConsumeRequest(create.retrieveSubscriberState(str, this.programRunId.getRun()), this.limit));
            }
        });
        return hashMap;
    }

    private long processResponse(Map<String, Deque<MonitorMessage>> map, AppMetadataStore appMetadataStore) throws Exception {
        long j = -1;
        for (Map.Entry<String, Deque<MonitorMessage>> entry : map.entrySet()) {
            String key = entry.getKey();
            Deque<MonitorMessage> value = entry.getValue();
            if (!value.isEmpty()) {
                j = Math.max(publish(key, this.requestKeyToLocalTopic.get(key), value, appMetadataStore), j);
            }
        }
        return j;
    }

    private long publish(String str, String str2, Deque<MonitorMessage> deque, AppMetadataStore appMetadataStore) throws Exception {
        if (str2.startsWith(this.cConf.get("log.tms.topic.prefix"))) {
            this.logProcessor.process(deque.stream().map((v0) -> {
                return v0.getMessage();
            }).iterator());
        } else {
            this.messagingContext.getMessagePublisher().publish(NamespaceId.SYSTEM.getNamespace(), str2, deque.stream().map((v0) -> {
                return v0.getMessage();
            }).iterator());
        }
        MonitorMessage last = deque.getLast();
        appMetadataStore.persistSubscriberState(str, this.programRunId.getRun(), last.getMessageId());
        return getMessagePublishTime(last);
    }

    private void updateTopicToRequest(String str, Deque<MonitorMessage> deque) {
        if (deque.isEmpty()) {
            return;
        }
        this.topicsToRequest.put(str, new MonitorConsumeRequest(deque.getLast().getMessageId(), this.limit));
    }

    private void triggerRuntimeShutdown() {
        LOG.debug("Program run {} completed at {}, shutting down remote runtime.", this.programRunId, Long.valueOf(this.programFinishTime));
        if (!this.lastProgramStateMessages.isEmpty()) {
            String str = "program.status.event.topic";
            String str2 = this.requestKeyToLocalTopic.get("program.status.event.topic");
            Retries.runWithRetries(() -> {
                Transactionals.execute(this.transactional, datasetContext -> {
                    publish(str, str2, this.lastProgramStateMessages, AppMetadataStore.create(this.cConf, datasetContext, this.datasetFramework));
                });
            }, getRetryStrategy());
        }
        try {
            this.monitorClient.requestShutdown();
        } catch (Exception e) {
            try {
                this.remoteProcessController.kill();
            } catch (Exception e2) {
                LOG.warn("Failed to terminate remote process for program run {}", this.programRunId, e2);
            }
        }
        clearStates();
    }

    private void clearStates() {
        try {
            Retries.runWithRetries(() -> {
                Transactionals.execute(this.transactional, datasetContext -> {
                    AppMetadataStore create = AppMetadataStore.create(this.cConf, datasetContext, this.datasetFramework);
                    RemoteRuntimeDataset create2 = RemoteRuntimeDataset.create(datasetContext, this.datasetFramework);
                    Iterator<String> it = this.requestKeyToLocalTopic.keySet().iterator();
                    while (it.hasNext()) {
                        create.deleteSubscriberState(it.next(), this.programRunId.getRun());
                    }
                    create2.delete(this.programRunId);
                }, RetryableException.class);
            }, getRetryStrategy());
        } catch (Exception e) {
            LOG.warn("Exception raised when clearing runtime monitor states", e);
        }
    }

    private long findProgramFinishTime(Deque<MonitorMessage> deque) {
        for (MonitorMessage monitorMessage : deque) {
            Notification notification = (Notification) GSON.fromJson(new String(monitorMessage.getMessage(), StandardCharsets.UTF_8), Notification.class);
            if (notification.getNotificationType() == Notification.Type.PROGRAM_STATUS) {
                Map properties = notification.getProperties();
                String str = (String) properties.get(ProgramOptionConstants.PROGRAM_RUN_ID);
                String str2 = (String) properties.get(ProgramOptionConstants.PROGRAM_STATUS);
                if (str != null && str2 != null && ((ProgramRunId) GSON.fromJson(str, ProgramRunId.class)).equals(this.programRunId) && (str2.equals(ProgramRunStatus.COMPLETED.name()) || str2.equals(ProgramRunStatus.FAILED.name()) || str2.equals(ProgramRunStatus.KILLED.name()))) {
                    try {
                        return Long.parseLong((String) properties.get(ProgramOptionConstants.END_TIME));
                    } catch (Exception e) {
                        return getMessagePublishTime(monitorMessage);
                    }
                }
            }
        }
        return -1L;
    }

    private long getMessagePublishTime(MonitorMessage monitorMessage) {
        return new MessageId(Bytes.fromHexString(monitorMessage.getMessageId())).getPublishTimestamp();
    }

    private static Map<String, String> createTopicConfigs(CConfiguration cConfiguration) {
        return (Map) cConfiguration.getTrimmedStringCollection("app.program.runtime.monitor.topics.configs").stream().flatMap(str -> {
            int lastIndexOf = str.lastIndexOf(58);
            if (lastIndexOf < 0) {
                return Stream.of(Maps.immutableEntry(str, cConfiguration.get(str)));
            }
            try {
                int parseInt = Integer.parseInt(str.substring(lastIndexOf + 1));
                if (parseInt <= 0) {
                    throw new IllegalArgumentException("Total topic number must be positive for system topic config '" + str + "'.");
                }
                String substring = str.substring(0, lastIndexOf);
                return IntStream.range(0, parseInt).mapToObj(i -> {
                    return Maps.immutableEntry(substring + ":" + i, cConfiguration.get(substring) + i);
                });
            } catch (NumberFormatException e) {
                throw new IllegalArgumentException("Total topic number must be a positive number for system topic config'" + str + "'.", e);
            }
        }).collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, (v0) -> {
            return v0.getValue();
        }));
    }
}
