package org.springframework.kafka.listener;

import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiPredicate;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.springframework.core.log.LogAccessor;
import org.springframework.kafka.KafkaException;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.util.ObjectUtils;
import org.springframework.util.backoff.FixedBackOff;

/* loaded from: input_file:BOOT-INF/lib/spring-kafka-2.5.14.RELEASE.jar:org/springframework/kafka/listener/SeekUtils.class */
public final class SeekUtils {
    public static final int DEFAULT_MAX_FAILURES = 10;
    public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff(0, 9);
    private static final LoggingCommitCallback LOGGING_COMMIT_CALLBACK = new LoggingCommitCallback();

    private SeekUtils() {
    }

    public static boolean doSeeks(List<ConsumerRecord<?, ?>> list, Consumer<?, ?> consumer, Exception exc, boolean z, BiPredicate<ConsumerRecord<?, ?>, Exception> biPredicate, LogAccessor logAccessor) {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        list.forEach(consumerRecord -> {
            if (z && atomicBoolean.get()) {
                try {
                    atomicBoolean2.set(biPredicate.test(consumerRecord, exc));
                } catch (Exception e) {
                    logAccessor.error(e, "Failed to determine if this record (" + ListenerUtils.recordToString(consumerRecord) + ") should be recovererd, including in seeks");
                    atomicBoolean2.set(false);
                }
                if (atomicBoolean2.get()) {
                    logAccessor.debug(() -> {
                        return "Skipping seek of: " + ListenerUtils.recordToString(consumerRecord);
                    });
                }
            }
            if (!z || !atomicBoolean.get() || !atomicBoolean2.get()) {
                linkedHashMap.computeIfAbsent(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), topicPartition -> {
                    return Long.valueOf(consumerRecord.offset());
                });
            }
            atomicBoolean.set(false);
        });
        seekPartitions(consumer, linkedHashMap, logAccessor);
        return atomicBoolean2.get();
    }

    public static void seekPartitions(Consumer<?, ?> consumer, Map<TopicPartition, Long> map, LogAccessor logAccessor) {
        map.forEach((topicPartition, l) -> {
            try {
                logAccessor.trace(() -> {
                    return "Seeking: " + topicPartition + " to: " + l;
                });
                consumer.seek(topicPartition, l.longValue());
            } catch (Exception e) {
                logAccessor.error(e, () -> {
                    return "Failed to seek " + topicPartition + " to " + l;
                });
            }
        });
    }

    public static void seekOrRecover(Exception exc, List<ConsumerRecord<?, ?>> list, Consumer<?, ?> consumer, MessageListenerContainer messageListenerContainer, boolean z, BiPredicate<ConsumerRecord<?, ?>, Exception> biPredicate, LogAccessor logAccessor, KafkaException.Level level) {
        if (ObjectUtils.isEmpty(list)) {
            if (!(exc instanceof SerializationException)) {
                throw new IllegalStateException("This error handler cannot process '" + exc.getClass().getName() + "'s; no record information is available", exc);
            }
            throw new IllegalStateException("This error handler cannot process 'SerializationException's directly; please consider configuring an 'ErrorHandlingDeserializer' in the value and/or key deserializer", exc);
        }
        if (!doSeeks(list, consumer, exc, true, biPredicate, logAccessor)) {
            throw new KafkaException("Seek to current after exception", level, exc);
        }
        if (z) {
            if (!messageListenerContainer.getContainerProperties().getAckMode().equals(ContainerProperties.AckMode.MANUAL_IMMEDIATE)) {
                logAccessor.debug(() -> {
                    return "'commitRecovered' ignored, container AckMode must be MANUAL_IMMEDIATE, not " + messageListenerContainer.getContainerProperties().getAckMode();
                });
                return;
            }
            ConsumerRecord<?, ?> consumerRecord = list.get(0);
            Map<TopicPartition, OffsetAndMetadata> singletonMap = Collections.singletonMap(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), new OffsetAndMetadata(consumerRecord.offset() + 1));
            if (messageListenerContainer.getContainerProperties().isSyncCommits()) {
                consumer.commitSync(singletonMap, messageListenerContainer.getContainerProperties().getSyncCommitTimeout());
                return;
            }
            OffsetCommitCallback commitCallback = messageListenerContainer.getContainerProperties().getCommitCallback();
            if (commitCallback == null) {
                commitCallback = LOGGING_COMMIT_CALLBACK;
            }
            consumer.commitAsync(singletonMap, commitCallback);
        }
    }
}
