package org.apache.streampipes.manager.monitoring.pipeline;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.time.temporal.TemporalUnit;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.OffsetSpec;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.streampipes.config.backend.BackendConfig;
import org.apache.streampipes.model.SpDataStream;
import org.apache.streampipes.model.base.InvocableStreamPipesEntity;
import org.apache.streampipes.model.graph.DataProcessorInvocation;
import org.apache.streampipes.model.graph.DataSinkInvocation;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.TransportProtocol;
import org.apache.streampipes.model.monitoring.ConsumedMessagesInfo;
import org.apache.streampipes.model.monitoring.PipelineElementMonitoringInfo;
import org.apache.streampipes.model.monitoring.ProducedMessagesInfo;
import org.apache.streampipes.model.pipeline.Pipeline;
import org.apache.streampipes.sdk.helpers.Tuple2;

/* loaded from: input_file:BOOT-INF/lib/streampipes-pipeline-management-0.69.0.jar:org/apache/streampipes/manager/monitoring/pipeline/TopicInfoCollector.class */
public class TopicInfoCollector {
    private final Pipeline pipeline;
    private final AdminClient kafkaAdminClient = kafkaAdminClient();
    private final Map<String, Long> latestTopicOffsets = new HashMap();
    private final Map<String, Long> topicOffsetAtPipelineStart = new HashMap();
    private final Map<String, Long> currentConsumerGroupOffsets = new HashMap();
    private final List<PipelineElementMonitoringInfo> monitoringInfo = new ArrayList();

    public TopicInfoCollector(Pipeline pipeline) {
        this.pipeline = pipeline;
    }

    private void makeTopicInfo() {
        long epochMilli = Instant.now().minus(1L, (TemporalUnit) ChronoUnit.SECONDS).toEpochMilli();
        ArrayList arrayList = new ArrayList();
        this.pipeline.getSepas().forEach(dataProcessorInvocation -> {
            arrayList.addAll(extractProtocols(dataProcessorInvocation));
        });
        this.pipeline.getActions().forEach(dataSinkInvocation -> {
            arrayList.addAll(extractProtocols(dataSinkInvocation));
        });
        arrayList.forEach(transportProtocol -> {
            if (transportProtocol instanceof KafkaTransportProtocol) {
                try {
                    Tuple2<String, Long> makeTopicOffsetInfo = makeTopicOffsetInfo((KafkaTransportProtocol) transportProtocol, OffsetSpec.forTimestamp(epochMilli));
                    Tuple2<String, Long> makeTopicOffsetInfo2 = makeTopicOffsetInfo((KafkaTransportProtocol) transportProtocol, OffsetSpec.forTimestamp(this.pipeline.getStartedAt()));
                    Tuple2<String, Long> makeTopicInfo = makeTopicInfo((KafkaTransportProtocol) transportProtocol);
                    this.latestTopicOffsets.put(makeTopicOffsetInfo.a, makeTopicOffsetInfo.b);
                    this.topicOffsetAtPipelineStart.put(makeTopicOffsetInfo2.a, makeTopicOffsetInfo2.b);
                    this.currentConsumerGroupOffsets.put(makeTopicInfo.a, makeTopicInfo.b);
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public List<PipelineElementMonitoringInfo> makeMonitoringInfo() {
        makeTopicInfo();
        this.pipeline.getStreams().forEach(spDataStream -> {
            this.monitoringInfo.add(makeStreamMonitoringInfo(spDataStream));
        });
        this.pipeline.getSepas().forEach(dataProcessorInvocation -> {
            this.monitoringInfo.add(makeProcessorMonitoringInfo(dataProcessorInvocation));
        });
        this.pipeline.getActions().forEach(dataSinkInvocation -> {
            this.monitoringInfo.add(makeSinkMonitoringInfo(dataSinkInvocation));
        });
        this.kafkaAdminClient.close();
        return this.monitoringInfo;
    }

    private PipelineElementMonitoringInfo makeStreamMonitoringInfo(SpDataStream spDataStream) {
        PipelineElementMonitoringInfo prepare = prepare(spDataStream.getElementId(), spDataStream.getName(), false, true);
        prepare.setProducedMessagesInfo(makeOutputTopicInfoForPipelineElement((KafkaTransportProtocol) spDataStream.getEventGrounding().getTransportProtocol()));
        return prepare;
    }

    private PipelineElementMonitoringInfo makeProcessorMonitoringInfo(DataProcessorInvocation dataProcessorInvocation) {
        PipelineElementMonitoringInfo prepare = prepare(dataProcessorInvocation.getElementId(), dataProcessorInvocation.getName(), true, true);
        ProducedMessagesInfo makeOutputTopicInfoForPipelineElement = makeOutputTopicInfoForPipelineElement((KafkaTransportProtocol) dataProcessorInvocation.getOutputStream().getEventGrounding().getTransportProtocol());
        List<ConsumedMessagesInfo> makeInputTopicInfoForPipelineElement = makeInputTopicInfoForPipelineElement(dataProcessorInvocation.getInputStreams());
        prepare.setProducedMessagesInfo(makeOutputTopicInfoForPipelineElement);
        prepare.setConsumedMessagesInfos(makeInputTopicInfoForPipelineElement);
        return prepare;
    }

    private PipelineElementMonitoringInfo makeSinkMonitoringInfo(DataSinkInvocation dataSinkInvocation) {
        PipelineElementMonitoringInfo prepare = prepare(dataSinkInvocation.getElementId(), dataSinkInvocation.getName(), true, false);
        prepare.setConsumedMessagesInfos(makeInputTopicInfoForPipelineElement(dataSinkInvocation.getInputStreams()));
        return prepare;
    }

    private List<ConsumedMessagesInfo> makeInputTopicInfoForPipelineElement(List<SpDataStream> list) {
        ArrayList arrayList = new ArrayList();
        list.stream().map(spDataStream -> {
            return spDataStream.getEventGrounding().getTransportProtocol();
        }).forEach(transportProtocol -> {
            String topic = getTopic((KafkaTransportProtocol) transportProtocol);
            String groupId = ((KafkaTransportProtocol) transportProtocol).getGroupId();
            ConsumedMessagesInfo consumedMessagesInfo = new ConsumedMessagesInfo(topic, groupId);
            long currentConsumerGroupOffset = getCurrentConsumerGroupOffset(groupId) - this.topicOffsetAtPipelineStart.get(topic).longValue();
            long longValue = this.latestTopicOffsets.get(topic).longValue() - this.topicOffsetAtPipelineStart.get(topic).longValue();
            long max = Math.max(0L, longValue - currentConsumerGroupOffset);
            consumedMessagesInfo.setTotalMessagesSincePipelineStart(longValue);
            consumedMessagesInfo.setConsumedMessagesSincePipelineStart(currentConsumerGroupOffset);
            consumedMessagesInfo.setLag(max);
            arrayList.add(consumedMessagesInfo);
        });
        return arrayList;
    }

    private long getCurrentConsumerGroupOffset(String str) {
        return this.currentConsumerGroupOffsets.get(str).longValue();
    }

    private ProducedMessagesInfo makeOutputTopicInfoForPipelineElement(KafkaTransportProtocol kafkaTransportProtocol) {
        String topic = getTopic(kafkaTransportProtocol);
        ProducedMessagesInfo producedMessagesInfo = new ProducedMessagesInfo(topic);
        producedMessagesInfo.setTotalProducedMessages(this.latestTopicOffsets.get(topic).longValue());
        producedMessagesInfo.setTotalProducedMessagesSincePipelineStart(producedMessagesInfo.getTotalProducedMessages() - this.topicOffsetAtPipelineStart.get(topic).longValue());
        return producedMessagesInfo;
    }

    private String getTopic(KafkaTransportProtocol kafkaTransportProtocol) {
        return kafkaTransportProtocol.getTopicDefinition().getActualTopicName();
    }

    private PipelineElementMonitoringInfo prepare(String str, String str2, boolean z, boolean z2) {
        PipelineElementMonitoringInfo pipelineElementMonitoringInfo = new PipelineElementMonitoringInfo();
        pipelineElementMonitoringInfo.setPipelineElementName(str2);
        pipelineElementMonitoringInfo.setPipelineElementId(str);
        pipelineElementMonitoringInfo.setConsumedMessageInfoExists(z);
        pipelineElementMonitoringInfo.setProducedMessageInfoExists(z2);
        return pipelineElementMonitoringInfo;
    }

    private AdminClient kafkaAdminClient() {
        return KafkaAdminClient.create(makeProperties());
    }

    private Properties makeProperties() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", getBrokerUrl());
        properties.put("client.id", UUID.randomUUID().toString());
        return properties;
    }

    private String getBrokerUrl() {
        String str = System.getenv("SP_DEBUG");
        if (str != null) {
            str = str.replaceAll(StringUtils.SPACE, "");
        }
        return "true".equals(str) ? "localhost:9094" : BackendConfig.INSTANCE.getKafkaUrl();
    }

    private Tuple2<String, Long> makeTopicOffsetInfo(KafkaTransportProtocol kafkaTransportProtocol, OffsetSpec offsetSpec) throws ExecutionException, InterruptedException {
        Map<TopicPartition, OffsetAndMetadata> map = this.kafkaAdminClient.listConsumerGroupOffsets(kafkaTransportProtocol.getGroupId()).partitionsToOffsetAndMetadata().get();
        HashMap hashMap = new HashMap();
        map.forEach((topicPartition, offsetAndMetadata) -> {
            hashMap.put(topicPartition, offsetSpec);
        });
        return new Tuple2<>(kafkaTransportProtocol.getTopicDefinition().getActualTopicName(), (Long) this.kafkaAdminClient.listOffsets(hashMap).all().get().values().stream().map((v0) -> {
            return v0.offset();
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        }));
    }

    private Tuple2<String, Long> makeTopicInfo(KafkaTransportProtocol kafkaTransportProtocol) throws ExecutionException, InterruptedException {
        return new Tuple2<>(kafkaTransportProtocol.getGroupId(), (Long) this.kafkaAdminClient.listConsumerGroupOffsets(kafkaTransportProtocol.getGroupId()).partitionsToOffsetAndMetadata().get().values().stream().map((v0) -> {
            return v0.offset();
        }).reduce(0L, (v0, v1) -> {
            return Long.sum(v0, v1);
        }));
    }

    public List<TransportProtocol> extractProtocols(InvocableStreamPipesEntity invocableStreamPipesEntity) {
        return (List) invocableStreamPipesEntity.getInputStreams().stream().map(spDataStream -> {
            return spDataStream.getEventGrounding().getTransportProtocol();
        }).collect(Collectors.toList());
    }
}
