/*
 * Decompiled with CFR 0.152.
 */
package io.openraven.magpie.core.fifos;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import io.openraven.magpie.api.MagpieEnvelope;
import io.openraven.magpie.core.config.ConfigException;
import io.openraven.magpie.core.fifos.FifoDequeue;
import io.openraven.magpie.core.fifos.FifoException;
import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaDequeue
implements FifoDequeue {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaDequeue.class);
    private static final ObjectMapper MAPPER = new ObjectMapper().enable(SerializationFeature.INDENT_OUTPUT).configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false).findAndRegisterModules();
    private static final long POLL_TIMEOUT = 100L;
    private static final Map<String, Object> DEFAULT_PROPERTIES = Map.of("key.deserializer", StringDeserializer.class.getName(), "value.deserializer", StringDeserializer.class.getName(), "max.poll.records", 1);
    private final Consumer<String, String> consumer;

    public KafkaDequeue(Map<String, Object> properties) {
        Object t = properties.remove("topic");
        if (Objects.isNull(t)) {
            throw new ConfigException("Kafka 'topic' value must be set under properties");
        }
        HashMap<String, Object> props = new HashMap<String, Object>();
        props.putAll(DEFAULT_PROPERTIES);
        props.putAll(properties);
        this.consumer = new KafkaConsumer(props);
        this.consumer.subscribe(List.of(t.toString()));
    }

    @Override
    public Optional<MagpieEnvelope> poll() throws FifoException {
        Iterator iterator;
        ConsumerRecords records = this.consumer.poll(Duration.ofMillis(100L));
        if (!records.isEmpty() && (iterator = records.iterator()).hasNext()) {
            ConsumerRecord r = (ConsumerRecord)iterator.next();
            try {
                return Optional.of((MagpieEnvelope)MAPPER.readValue((String)r.value(), MagpieEnvelope.class));
            }
            catch (JsonProcessingException ex) {
                throw new FifoException("Couldn't deserialize envelope", ex);
            }
        }
        return Optional.empty();
    }
}

