/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.offsets;

import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriter;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriterConfig;
import io.confluent.connect.replicator.offsets.ConsumerTimestampsWriterDefaults;
import io.confluent.connect.replicator.offsets.GroupTopicPartition;
import io.confluent.connect.replicator.offsets.TimestampAndDelta;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ConsumerTimestampsInterceptor<K, V>
implements ConsumerInterceptor<K, V>,
AutoCloseable {
    private static final Logger log = LoggerFactory.getLogger(ConsumerTimestampsInterceptor.class);
    private final ConsumerTimestampsWriter writer;
    private String groupId;
    private int cacheSize;
    private final Map<TopicPartition, Map<Long, Long>> offsetTimestamps = new HashMap<TopicPartition, Map<Long, Long>>();
    private Map<TopicPartition, Long> previousTimestamps = new HashMap<TopicPartition, Long>();

    public ConsumerTimestampsInterceptor() {
        this.writer = new ConsumerTimestampsWriter();
    }

    protected ConsumerTimestampsInterceptor(ConsumerTimestampsWriter writer) {
        this.writer = writer;
    }

    public void configure(Map<String, ?> configs) {
        this.groupId = (String)configs.get("group.id");
        if (this.groupId == null) {
            throw new KafkaException("Missing value for group.id in ConsumerTimestampsInterceptor");
        }
        this.writer.configure(configs);
        ConsumerTimestampsWriterConfig writerConfig = ConsumerTimestampsWriterConfig.getConfig(configs);
        this.cacheSize = writerConfig.getInt("timestamps.producer.max.per.partition");
    }

    protected Map<TopicPartition, Map<Long, Long>> offsetTimestamps() {
        return this.offsetTimestamps;
    }

    public ConsumerRecords<K, V> onConsume(ConsumerRecords<K, V> records) {
        for (TopicPartition topicPartition : records.partitions()) {
            if (!this.isMatchingTopic(topicPartition.topic())) continue;
            Map<Long, Long> timestampsPerPartition = this.offsetTimestamps.get(topicPartition);
            if (timestampsPerPartition == null) {
                timestampsPerPartition = this.buildLinkedHashMap(this.cacheSize);
                this.offsetTimestamps.put(topicPartition, timestampsPerPartition);
            }
            for (ConsumerRecord record : records.records(topicPartition)) {
                log.trace("Consumed group {}, topic {}, partition {}, offset {}, timestamp {}", new Object[]{this.groupId, topicPartition.topic(), topicPartition.partition(), record.offset(), record.timestamp()});
                if (record.timestamp() == -1L) continue;
                timestampsPerPartition.put(record.offset(), record.timestamp());
            }
        }
        return records;
    }

    public void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {
        HashMap<TopicPartition, Long> currentTimestamps = new HashMap<TopicPartition, Long>();
        log.debug("Writing timestamps for committed offsets...");
        for (Map.Entry<TopicPartition, OffsetAndMetadata> offsetAndMetadataEntry : offsets.entrySet()) {
            Map.Entry<Long, Long> offsetTimestampEntry;
            TopicPartition topicPartition = offsetAndMetadataEntry.getKey();
            if (!this.isMatchingTopic(topicPartition.topic())) continue;
            long offset = offsetAndMetadataEntry.getValue().offset();
            Map<Long, Long> timestampsPerPartition = this.offsetTimestamps.get(topicPartition);
            if (timestampsPerPartition == null) {
                log.warn("Could not find recent messages for group {}, topic {}, partition {}, offset {}", new Object[]{this.groupId, topicPartition.topic(), topicPartition.partition(), offset});
                continue;
            }
            long messageOffset = offset - 1L;
            Long offsetTimestamp = timestampsPerPartition.get(messageOffset);
            if (offsetTimestamp == null) {
                log.warn("Could not find timestamp for group {}, topic {}, partition {}, offset {}", new Object[]{this.groupId, topicPartition.topic(), topicPartition.partition(), messageOffset});
                continue;
            }
            log.debug("Found timestamp {} for group {}, topic {}, partition {}, offset {}", new Object[]{offsetTimestamp, this.groupId, topicPartition.topic(), topicPartition.partition(), messageOffset});
            short matchingTimestamps = 0;
            Iterator<Map.Entry<Long, Long>> it = timestampsPerPartition.entrySet().iterator();
            while (it.hasNext() && (offsetTimestampEntry = it.next()).getKey() != messageOffset) {
                if (offsetTimestampEntry.getValue().longValue() == offsetTimestamp.longValue()) {
                    matchingTimestamps = (short)(matchingTimestamps + 1);
                }
                it.remove();
            }
            if (!offsetTimestamp.equals(this.previousTimestamps.get(topicPartition))) {
                GroupTopicPartition groupTopicPartition = new GroupTopicPartition(this.groupId, topicPartition);
                this.writer.send(groupTopicPartition, new TimestampAndDelta(offsetTimestamp, matchingTimestamps));
            }
            currentTimestamps.put(topicPartition, offsetTimestamp);
        }
        this.previousTimestamps = currentTimestamps;
    }

    private boolean isMatchingTopic(String topic) {
        if (ConsumerTimestampsWriterDefaults.isInternal(topic) || this.writer.blacklistTopics().contains(topic)) {
            return false;
        }
        if (this.writer.whitelistTopics() == null) {
            if (this.writer.topicPattern() == null) {
                return true;
            }
            return this.matchesTopicPattern(topic);
        }
        return this.writer.whitelistTopics().contains(topic) || this.matchesTopicPattern(topic);
    }

    private boolean matchesTopicPattern(String topic) {
        return this.writer.topicPattern() != null && this.writer.topicPattern().matcher(topic).matches();
    }

    private <K1, V1> Map<K1, V1> buildLinkedHashMap(final int cacheSize) {
        return new LinkedHashMap<K1, V1>(){

            @Override
            protected boolean removeEldestEntry(Map.Entry<K1, V1> eldest) {
                boolean doRemove;
                boolean bl = doRemove = this.size() > cacheSize;
                if (doRemove) {
                    log.warn("Discarding timestamp records due to exceeding the configured maximum number of timestamp records being processed per partition.  Increase 'timestamps.producer.max.per.partition' to avoid losing timestamp records and their corresponding translated offsets.");
                }
                return doRemove;
            }
        };
    }

    @Override
    public void close() {
        this.writer.close();
    }
}

