package dev.responsive.kafka.internal.clients;

import dev.responsive.kafka.internal.clients.ResponsiveConsumer;
import dev.responsive.kafka.internal.clients.ResponsiveProducer;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartition;

/* loaded from: input_file:dev/responsive/kafka/internal/clients/OffsetRecorder.class */
public class OffsetRecorder {
    private final Map<RecordingKey, Long> uncommitted = new HashMap();
    private final Map<TopicPartition, Long> written = new HashMap();
    private final ProducerListener producerListener = new ProducerListener();
    private final ConsumerListener consumerListener = new ConsumerListener();
    private final List<CommitCallback> commitCallback = new LinkedList();
    private final boolean eos;
    private final String threadId;

    @FunctionalInterface
    /* loaded from: input_file:dev/responsive/kafka/internal/clients/OffsetRecorder$CommitCallback.class */
    public interface CommitCallback {
        void onCommit(String str, Map<RecordingKey, Long> map, Map<TopicPartition, Long> map2);
    }

    /* loaded from: input_file:dev/responsive/kafka/internal/clients/OffsetRecorder$ConsumerListener.class */
    private class ConsumerListener implements ResponsiveConsumer.Listener {
        private ConsumerListener() {
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveConsumer.Listener
        public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) {
            if (OffsetRecorder.this.eos) {
                throw new IllegalStateException("consumer commit is not expected with EOS");
            }
            map.forEach((topicPartition, offsetAndMetadata) -> {
                OffsetRecorder.this.onConsumedOffsets("", topicPartition, offsetAndMetadata.offset());
            });
            OffsetRecorder.this.onCommit();
        }
    }

    /* loaded from: input_file:dev/responsive/kafka/internal/clients/OffsetRecorder$ProducerListener.class */
    private class ProducerListener implements ResponsiveProducer.Listener {
        private ProducerListener() {
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveProducer.Listener
        public void onSendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> map, String str) {
            if (!OffsetRecorder.this.eos) {
                throw new IllegalStateException("producer commit is not expected with alos");
            }
            map.forEach((topicPartition, offsetAndMetadata) -> {
                OffsetRecorder.this.onConsumedOffsets(str, topicPartition, offsetAndMetadata.offset());
            });
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveProducer.Listener
        public void onCommit() {
            if (!OffsetRecorder.this.eos) {
                throw new IllegalStateException("producer commit is not expected with alos");
            }
            OffsetRecorder.this.onCommit();
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveProducer.Listener
        public void onAbort() {
            OffsetRecorder.this.onAbort();
        }

        @Override // dev.responsive.kafka.internal.clients.ResponsiveProducer.Listener
        public void onSendCompleted(RecordMetadata recordMetadata) {
            OffsetRecorder.this.onProduce(recordMetadata);
        }
    }

    /* loaded from: input_file:dev/responsive/kafka/internal/clients/OffsetRecorder$RecordingKey.class */
    public static class RecordingKey {
        private final TopicPartition partition;
        private final String consumerGroup;

        public RecordingKey(TopicPartition topicPartition, String str) {
            this.partition = topicPartition;
            this.consumerGroup = str;
        }

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            RecordingKey recordingKey = (RecordingKey) obj;
            return Objects.equals(this.partition, recordingKey.partition) && Objects.equals(this.consumerGroup, recordingKey.consumerGroup);
        }

        public int hashCode() {
            return Objects.hash(this.partition, this.consumerGroup);
        }

        public String toString() {
            return "RecordingKey{partition=" + this.partition + ", consumerGroup='" + this.consumerGroup + "'}";
        }

        public TopicPartition getPartition() {
            return this.partition;
        }

        public String getConsumerGroup() {
            return this.consumerGroup;
        }
    }

    public OffsetRecorder(boolean z, String str) {
        this.eos = z;
        this.threadId = str;
    }

    public synchronized void addCommitCallback(CommitCallback commitCallback) {
        this.commitCallback.add(commitCallback);
    }

    public ResponsiveProducer.Listener getProducerListener() {
        return this.producerListener;
    }

    public ResponsiveConsumer.Listener getConsumerListener() {
        return this.consumerListener;
    }

    private synchronized void onConsumedOffsets(String str, TopicPartition topicPartition, long j) {
        this.uncommitted.put(new RecordingKey(topicPartition, str), Long.valueOf(j));
    }

    private synchronized void onProduce(RecordMetadata recordMetadata) {
        this.written.compute(new TopicPartition(recordMetadata.topic(), recordMetadata.partition()), (topicPartition, l) -> {
            return Long.valueOf(l == null ? recordMetadata.offset() : Math.max(recordMetadata.offset(), l.longValue()));
        });
    }

    private void onCommit() {
        Map copyOf;
        Map copyOf2;
        synchronized (this) {
            copyOf = Map.copyOf(this.uncommitted);
            copyOf2 = Map.copyOf(this.written);
            this.uncommitted.clear();
            this.written.clear();
        }
        this.commitCallback.forEach(commitCallback -> {
            commitCallback.onCommit(this.threadId, copyOf, copyOf2);
        });
    }

    private synchronized void onAbort() {
        this.uncommitted.clear();
        this.written.clear();
    }
}
