package org.apache.pekko.kafka.internal;

import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.annotation.InternalApi;
import org.apache.pekko.event.LoggingAdapter;
import org.apache.pekko.kafka.OffsetResetProtectionSettings;
import org.apache.pekko.util.ccompat.package$JavaConverters$;
import scala.$less$colon$less$;
import scala.Function0;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Predef$ArrowAssoc$;
import scala.Some;
import scala.Some$;
import scala.Tuple2;
import scala.Tuple2$;
import scala.collection.IterableOnceOps;
import scala.collection.immutable.Map;
import scala.runtime.BoxesRunTime;
import scala.runtime.ScalaRunTime$;

/* compiled from: ConsumerResetProtection.scala */
@InternalApi
/* loaded from: input_file:org/apache/pekko/kafka/internal/ConsumerResetProtection.class */
public interface ConsumerResetProtection {

    /* compiled from: ConsumerResetProtection.scala */
    /* loaded from: input_file:org/apache/pekko/kafka/internal/ConsumerResetProtection$Impl.class */
    public static final class Impl implements ConsumerResetProtection {
        private final LoggingAdapter log;
        public final OffsetResetProtectionSettings org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$$resetProtection;
        private final ConsumerProgressTracking progress;

        /* compiled from: ConsumerResetProtection.scala */
        /* loaded from: input_file:org/apache/pekko/kafka/internal/ConsumerResetProtection$Impl$RecordThreshold.class */
        public class RecordThreshold {
            private final long offsetThreshold;
            private final Option<Object> timeThreshold;
            private final /* synthetic */ Impl $outer;

            public RecordThreshold(Impl impl, long j, Option<SafeOffsetAndTimestamp> option) {
                if (impl == null) {
                    throw new NullPointerException();
                }
                this.$outer = impl;
                this.offsetThreshold = j - impl.org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$$resetProtection.offsetThreshold();
                this.timeThreshold = option.flatMap((v1) -> {
                    return ConsumerResetProtection$.org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$RecordThreshold$$_$$lessinit$greater$$anonfun$1(r2, v1);
                });
            }

            public <K, V> boolean recordsExceedThreshold(RecordThreshold recordThreshold, List<ConsumerRecord<K, V>> list) {
                boolean z = false;
                if (list.size() > 0) {
                    z = recordThreshold.checkExceedsThreshold(list.get(0));
                    if (!z && list.size() > 1) {
                        z = recordThreshold.checkExceedsThreshold(list.get(list.size() - 1));
                    }
                }
                return z;
            }

            public <K, V> boolean checkExceedsThreshold(ConsumerRecord<K, V> consumerRecord) {
                return consumerRecord.offset() < this.offsetThreshold || this.timeThreshold.exists((v1) -> {
                    return ConsumerResetProtection$.org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$RecordThreshold$$_$checkExceedsThreshold$$anonfun$1(r1, v1);
                });
            }

            public String toString() {
                return new StringBuilder(29).append("max-offset: ").append(this.offsetThreshold).append(", max-timestamp: ").append(this.timeThreshold).toString();
            }

            public final /* synthetic */ Impl org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$RecordThreshold$$$outer() {
                return this.$outer;
            }
        }

        public Impl(LoggingAdapter loggingAdapter, OffsetResetProtectionSettings offsetResetProtectionSettings, ConsumerProgressTracking consumerProgressTracking) {
            this.log = loggingAdapter;
            this.org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$$resetProtection = offsetResetProtectionSettings;
            this.progress = consumerProgressTracking;
        }

        @Override // org.apache.pekko.kafka.internal.ConsumerResetProtection
        public <K, V> ConsumerRecords<K, V> protect(ActorRef actorRef, ConsumerRecords<K, V> consumerRecords) {
            return new ConsumerRecords<>(package$JavaConverters$.MODULE$.MapHasAsJava(((IterableOnceOps) package$JavaConverters$.MODULE$.SetHasAsScala(consumerRecords.partitions()).asScala().flatMap(topicPartition -> {
                return maybeProtectRecords(actorRef, topicPartition, consumerRecords).toList();
            })).toMap($less$colon$less$.MODULE$.refl())).asJava());
        }

        private <K, V> Option<Tuple2<TopicPartition, List<ConsumerRecord<K, V>>>> maybeProtectRecords(ActorRef actorRef, TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) {
            List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition);
            Some some = this.progress.commitRequested().get(topicPartition);
            if (some instanceof Some) {
                return protectPartition(actorRef, topicPartition, (OffsetAndMetadata) some.value(), records);
            }
            if (None$.MODULE$.equals(some)) {
                return Some$.MODULE$.apply(Tuple2$.MODULE$.apply(topicPartition, records));
            }
            throw new MatchError(some);
        }

        private <K, V> Option<Tuple2<TopicPartition, List<ConsumerRecord<K, V>>>> protectPartition(ActorRef actorRef, TopicPartition topicPartition, OffsetAndMetadata offsetAndMetadata, List<ConsumerRecord<K, V>> list) {
            RecordThreshold recordThreshold = new RecordThreshold(this, offsetAndMetadata.offset(), this.progress.receivedMessages().get(topicPartition));
            if (!recordThreshold.recordsExceedThreshold(recordThreshold, list)) {
                return Some$.MODULE$.apply(Tuple2$.MODULE$.apply(topicPartition, list));
            }
            OffsetAndMetadata offsetAndMetadata2 = (OffsetAndMetadata) this.progress.committedOffsets().apply(topicPartition);
            long offset = offsetAndMetadata.offset() - offsetAndMetadata2.offset();
            if (this.org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$$resetProtection.offsetThreshold() < Long.MAX_VALUE && offset > this.org$apache$pekko$kafka$internal$ConsumerResetProtection$Impl$$resetProtection.offsetThreshold()) {
                this.log.warning(new StringBuilder(210).append("Your last commit request ").append(offsetAndMetadata).append(" is more than the configured threshold from the last").append(new StringBuilder(30).append("committed offset (").append(offsetAndMetadata2).append(") for ").append(topicPartition).append(". See ").toString()).append("https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#setting-offset-threshold-appropriately for more info.").toString());
            }
            this.log.warning(new StringBuilder(214).append("Dropping offsets for partition ").append(topicPartition).append(" - received an offset which is less than allowed ").append(recordThreshold).append(" ").append(new StringBuilder(91).append("from the  last requested offset (threshold: ").append(recordThreshold).append("). Seeking to the latest known safe (committed ").toString()).append(new StringBuilder(28).append("or assigned) offset: ").append(offsetAndMetadata2).append(". See  ").toString()).append("https://pekko.apache.org/docs/pekko-connectors-kafka/current/errorhandling.html#unexpected-consumer-offset-reset").append("for more information.").toString());
            KafkaConsumerActor$Internal$Seek apply = KafkaConsumerActor$Internal$Seek$.MODULE$.apply((Map) Predef$.MODULE$.Map().apply(ScalaRunTime$.MODULE$.wrapRefArray(new Tuple2[]{Predef$ArrowAssoc$.MODULE$.$minus$greater$extension((TopicPartition) Predef$.MODULE$.ArrowAssoc(topicPartition), BoxesRunTime.boxToLong(offsetAndMetadata2.offset()))})));
            actorRef.$bang(apply, actorRef.$bang$default$2(apply));
            return None$.MODULE$;
        }
    }

    static <K, V> ConsumerResetProtection apply(LoggingAdapter loggingAdapter, OffsetResetProtectionSettings offsetResetProtectionSettings, Function0<ConsumerProgressTracking> function0) {
        return ConsumerResetProtection$.MODULE$.apply(loggingAdapter, offsetResetProtectionSettings, function0);
    }

    <K, V> ConsumerRecords<K, V> protect(ActorRef actorRef, ConsumerRecords<K, V> consumerRecords);
}
