package org.apache.streampipes.messaging.kafka;

import java.io.Serializable;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
import org.apache.streampipes.messaging.EventConsumer;
import org.apache.streampipes.messaging.InternalEventProcessor;
import org.apache.streampipes.messaging.kafka.config.ConsumerConfigFactory;
import org.apache.streampipes.messaging.kafka.config.KafkaConfigAppender;
import org.apache.streampipes.model.grounding.KafkaTransportProtocol;
import org.apache.streampipes.model.grounding.WildcardTopicDefinition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/streampipes/messaging/kafka/SpKafkaConsumer.class */
public class SpKafkaConsumer implements EventConsumer, Runnable, Serializable {
    private String topic;
    private InternalEventProcessor<byte[]> eventProcessor;
    private final KafkaTransportProtocol protocol;
    private volatile boolean isRunning;
    private Boolean patternTopic;
    private List<KafkaConfigAppender> appenders;
    private static final Logger LOG = LoggerFactory.getLogger(SpKafkaConsumer.class);

    public SpKafkaConsumer(KafkaTransportProtocol kafkaTransportProtocol) {
        this.patternTopic = false;
        this.appenders = new ArrayList();
        this.protocol = kafkaTransportProtocol;
    }

    public SpKafkaConsumer(KafkaTransportProtocol kafkaTransportProtocol, String str, InternalEventProcessor<byte[]> internalEventProcessor) {
        this.patternTopic = false;
        this.appenders = new ArrayList();
        this.protocol = kafkaTransportProtocol;
        this.topic = str;
        this.eventProcessor = internalEventProcessor;
        this.isRunning = true;
    }

    public SpKafkaConsumer(KafkaTransportProtocol kafkaTransportProtocol, String str, InternalEventProcessor<byte[]> internalEventProcessor, List<KafkaConfigAppender> list) {
        this(kafkaTransportProtocol, str, internalEventProcessor);
        this.appenders = list;
    }

    @Override // java.lang.Runnable
    public void run() {
        KafkaConsumer kafkaConsumer = new KafkaConsumer(makeProperties(this.protocol, this.appenders));
        if (this.patternTopic.booleanValue()) {
            this.topic = replaceWildcardWithPatternFormat(this.topic);
            kafkaConsumer.subscribe(Pattern.compile(this.topic), new ConsumerRebalanceListener() { // from class: org.apache.streampipes.messaging.kafka.SpKafkaConsumer.1
                public void onPartitionsRevoked(Collection<TopicPartition> collection) {
                }

                public void onPartitionsAssigned(Collection<TopicPartition> collection) {
                }
            });
        } else {
            kafkaConsumer.subscribe(Collections.singletonList(this.topic));
        }
        Duration of = Duration.of(100L, ChronoUnit.MILLIS);
        while (this.isRunning) {
            kafkaConsumer.poll(of).forEach(consumerRecord -> {
                this.eventProcessor.onEvent((byte[]) consumerRecord.value());
            });
        }
        LOG.info("Closing Kafka Consumer.");
        kafkaConsumer.close();
    }

    private String replaceWildcardWithPatternFormat(String str) {
        return str.replaceAll("\\.", "\\\\.").replaceAll("\\*", ".*");
    }

    private Properties makeProperties(KafkaTransportProtocol kafkaTransportProtocol, List<KafkaConfigAppender> list) {
        return new ConsumerConfigFactory(kafkaTransportProtocol).buildProperties(list);
    }

    public void connect(InternalEventProcessor<byte[]> internalEventProcessor) throws SpRuntimeException {
        LOG.info("Kafka consumer: Connecting to " + this.protocol.getTopicDefinition().getActualTopicName());
        if (this.protocol.getTopicDefinition() instanceof WildcardTopicDefinition) {
            this.patternTopic = true;
        }
        this.eventProcessor = internalEventProcessor;
        this.topic = this.protocol.getTopicDefinition().getActualTopicName();
        this.isRunning = true;
        new Thread(this).start();
    }

    public void disconnect() throws SpRuntimeException {
        LOG.info("Kafka consumer: Disconnecting from " + this.topic);
        this.isRunning = false;
    }

    public boolean isConnected() {
        return this.isRunning;
    }
}
