package org.apache.streampipes.manager.monitoring.pipeline;

import com.fasterxml.jackson.core.JsonProcessingException;
import java.io.IOException;
import java.util.List;
import org.apache.http.client.fluent.Request;
import org.apache.streampipes.commons.constants.InstanceIdExtractor;
import org.apache.streampipes.commons.prometheus.pipelines.PipelineFlowStats;
import org.apache.streampipes.manager.execution.ExtensionServiceExecutions;
import org.apache.streampipes.model.client.user.Principal;
import org.apache.streampipes.model.connect.adapter.AdapterDescription;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.monitoring.SpEndpointMonitoringInfo;
import org.apache.streampipes.resource.management.SpResourceManager;
import org.apache.streampipes.serializers.json.JacksonSerializer;
import org.apache.streampipes.svcdiscovery.SpServiceDiscovery;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTags;
import org.apache.streampipes.svcdiscovery.api.model.DefaultSpServiceTypes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.93.0.jar:org/apache/streampipes/manager/monitoring/pipeline/ExtensionsServiceLogExecutor.class */
public class ExtensionsServiceLogExecutor implements Runnable {
    private static final String LOG_PATH = "/monitoring";
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ExtensionsServiceLogExecutor.class);
    private static final PipelineFlowStats pipelineFlowStats = new PipelineFlowStats();

    @Override // java.lang.Runnable
    public void run() {
        triggerUpdate();
        updatePipelineFlow();
    }

    public void triggerUpdate() {
        getActiveExtensionsEndpoints().forEach(str -> {
            try {
                ExtensionsLogProvider.INSTANCE.addMonitoringInfos(parseLogResponse(makeRequest(str).execute().returnContent().asString()));
            } catch (IOException e) {
                LOG.info("Could not fetch log info from endpoint {}", str);
            }
        });
    }

    private void updatePipelineFlow() {
        pipelineFlowStats.clear();
        ExtensionsLogProvider.INSTANCE.getAllMetricsInfos().forEach((str, spMetricsEntry) -> {
            String simpleName = InstanceIdExtractor.getSimpleName(str);
            if (AdapterDescription.class.getSimpleName().toLowerCase().equals(simpleName)) {
                pipelineFlowStats.increaseReceivedTotalData(spMetricsEntry.getMessagesOut().getCounter());
                return;
            }
            if (DataProcessorInvocation.class.getSimpleName().toLowerCase().equals(simpleName)) {
                spMetricsEntry.getMessagesIn().forEach((str, messageCounter) -> {
                    pipelineFlowStats.increaseElementInputTotalData(messageCounter.getCounter());
                });
                pipelineFlowStats.increaseElementOutputTotalData(spMetricsEntry.getMessagesOut().getCounter());
            } else if (DataSinkInvocation.class.getSimpleName().toLowerCase().equals(simpleName)) {
                spMetricsEntry.getMessagesIn().forEach((str2, messageCounter2) -> {
                    pipelineFlowStats.increasePipelineProcessedData(messageCounter2.getCounter());
                });
            }
        });
        pipelineFlowStats.metrics();
    }

    private Request makeRequest(String str) {
        return ExtensionServiceExecutions.extServiceGetRequest(makeLogUrl(str));
    }

    private Principal getServiceAdmin() {
        return new SpResourceManager().manageUsers().getServiceAdmin();
    }

    private List<String> getActiveExtensionsEndpoints() {
        return SpServiceDiscovery.getServiceDiscovery().getServiceEndpoints(DefaultSpServiceTypes.EXT, true, List.of(DefaultSpServiceTags.PE.asString(), DefaultSpServiceTags.CONNECT_WORKER.asString()));
    }

    private String makeLogUrl(String str) {
        return str + "/monitoring";
    }

    private SpEndpointMonitoringInfo parseLogResponse(String str) throws JsonProcessingException {
        return (SpEndpointMonitoringInfo) JacksonSerializer.getObjectMapper().readValue(str, SpEndpointMonitoringInfo.class);
    }
}
