package org.apache.pekko.kafka.internal;

import java.time.Duration;
import java.util.List;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.$less$colon$less$;
import scala.MatchError;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Tuple2$;
import scala.collection.IterableOnce;
import scala.collection.IterableOnceOps;
import scala.collection.IterableOps;
import scala.collection.immutable.Map;
import scala.collection.immutable.Seq;
import scala.collection.immutable.Set;
import scala.package$;
import scala.runtime.BoxesRunTime;
import scala.runtime.Nothing$;
import scala.runtime.ScalaRunTime$;

/* compiled from: ConsumerProgressTracking.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/ConsumerProgressTrackerImpl.class */
public final class ConsumerProgressTrackerImpl implements ConsumerProgressTracking {
    private Seq<ConsumerAssignmentTrackingListener> assignedOffsetsCallbacks = package$.MODULE$.Seq().apply(ScalaRunTime$.MODULE$.genericWrapArray(new Nothing$[0]));
    private Map<TopicPartition, OffsetAndMetadata> commitRequestedOffsetsImpl = Predef$.MODULE$.Map().empty();
    private Map<TopicPartition, SafeOffsetAndTimestamp> receivedMessagesImpl = Predef$.MODULE$.Map().empty();
    private Map<TopicPartition, OffsetAndMetadata> committedOffsetsImpl = Predef$.MODULE$.Map().empty();
    private Set<TopicPartition> assignedPartitions = Predef$.MODULE$.Set().empty();

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public Map<TopicPartition, OffsetAndMetadata> commitRequested() {
        return this.commitRequestedOffsetsImpl;
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public Map<TopicPartition, SafeOffsetAndTimestamp> receivedMessages() {
        return this.receivedMessagesImpl;
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public Map<TopicPartition, OffsetAndMetadata> committedOffsets() {
        return this.committedOffsetsImpl;
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public void addProgressTrackingCallback(ConsumerAssignmentTrackingListener consumerAssignmentTrackingListener) {
        this.assignedOffsetsCallbacks = (Seq) this.assignedOffsetsCallbacks.$colon$plus(consumerAssignmentTrackingListener);
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public <K, V> void received(ConsumerRecords<K, V> consumerRecords) {
        this.receivedMessagesImpl = this.receivedMessagesImpl.$plus$plus((IterableOnce) ((IterableOps) ((IterableOps) package$JavaConverters$.MODULE$.SetHasAsScala(consumerRecords.partitions()).asScala().intersect(this.assignedPartitions).map(topicPartition -> {
            return Tuple2$.MODULE$.apply(topicPartition, consumerRecords.records(topicPartition));
        })).map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicPartition topicPartition2 = (TopicPartition) tuple2._1();
            List list = (List) tuple2._2();
            return Tuple2$.MODULE$.apply(topicPartition2, list.get(list.size() - 1));
        })).map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            TopicPartition topicPartition2 = (TopicPartition) tuple22._1();
            ConsumerRecord consumerRecord = (ConsumerRecord) tuple22._2();
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition2), SafeOffsetAndTimestamp$.MODULE$.apply(consumerRecord.offset(), consumerRecord.timestamp()));
        }));
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public void commitRequested(Map<TopicPartition, OffsetAndMetadata> map) {
        this.commitRequestedOffsetsImpl = commitRequested().$plus$plus(map);
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public void committed(java.util.Map<TopicPartition, OffsetAndMetadata> map) {
        this.committedOffsetsImpl = committedOffsets().$plus$plus(package$JavaConverters$.MODULE$.MapHasAsScala(map).asScala().toMap($less$colon$less$.MODULE$.refl()));
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerAssignmentTrackingListener
    public void revoke(Set<TopicPartition> set) {
        this.commitRequestedOffsetsImpl = commitRequested().$minus$minus(set);
        this.committedOffsetsImpl = committedOffsets().$minus$minus(set);
        this.receivedMessagesImpl = receivedMessages().$minus$minus(set);
        this.assignedPartitions = this.assignedPartitions.$minus$minus(set);
        this.assignedOffsetsCallbacks.foreach(consumerAssignmentTrackingListener -> {
            consumerAssignmentTrackingListener.revoke(set);
        });
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerAssignmentTrackingListener
    public void assignedPositions(Set<TopicPartition> set, Map<TopicPartition, Object> map) {
        this.assignedPartitions = this.assignedPartitions.$plus$plus(set);
        this.commitRequestedOffsetsImpl = this.commitRequestedOffsetsImpl.$plus$plus(map.map(tuple2 -> {
            if (tuple2 == null) {
                throw new MatchError(tuple2);
            }
            TopicPartition topicPartition = (TopicPartition) tuple2._1();
            long unboxToLong = BoxesRunTime.unboxToLong(tuple2._2());
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition), commitRequested().getOrElse(topicPartition, () -> {
                return assignedPositions$$anonfun$1$$anonfun$1(r4);
            }));
        }));
        this.committedOffsetsImpl = committedOffsets().$plus$plus(map.map(tuple22 -> {
            if (tuple22 == null) {
                throw new MatchError(tuple22);
            }
            TopicPartition topicPartition = (TopicPartition) tuple22._1();
            long unboxToLong = BoxesRunTime.unboxToLong(tuple22._2());
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition), committedOffsets().getOrElse(topicPartition, () -> {
                return assignedPositions$$anonfun$2$$anonfun$1(r4);
            }));
        }));
        this.assignedOffsetsCallbacks.foreach(consumerAssignmentTrackingListener -> {
            consumerAssignmentTrackingListener.assignedPositions(set, map);
        });
    }

    @Override // org.apache.pekko.kafka.internal.ConsumerProgressTracking
    public void assignedPositionsAndSeek(Set<TopicPartition> set, Consumer<?, ?> consumer, Duration duration) {
        assignedPositions(set, ((IterableOnceOps) set.map(topicPartition -> {
            return Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(consumer.position(topicPartition, duration)));
        })).toMap($less$colon$less$.MODULE$.refl()));
    }

    private static final OffsetAndMetadata assignedPositions$$anonfun$1$$anonfun$1(long j) {
        return new OffsetAndMetadata(j);
    }

    private static final OffsetAndMetadata assignedPositions$$anonfun$2$$anonfun$1(long j) {
        return new OffsetAndMetadata(j);
    }
}
