package org.apache.camel.component.kafka;

import java.util.Collection;
import java.util.Collections;
import org.apache.camel.spi.StateRepository;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/DefaultKafkaManualAsyncCommit.class */
public class DefaultKafkaManualAsyncCommit extends DefaultKafkaManualCommit implements KafkaAsyncManualCommit {
    private static final Logger LOG = LoggerFactory.getLogger(DefaultKafkaManualAsyncCommit.class);
    private final Collection<KafkaAsyncManualCommit> asyncCommits;

    public DefaultKafkaManualAsyncCommit(org.apache.kafka.clients.consumer.KafkaConsumer kafkaConsumer, String str, String str2, StateRepository<String, String> stateRepository, TopicPartition topicPartition, long j, long j2, Collection<KafkaAsyncManualCommit> collection) {
        super(kafkaConsumer, str, str2, stateRepository, topicPartition, j, j2);
        this.asyncCommits = collection;
        LOG.debug("Using commit timeout of {}", Long.valueOf(j2));
    }

    @Override // org.apache.camel.component.kafka.KafkaManualCommit
    public void commit() {
        this.asyncCommits.add(this);
    }

    @Override // org.apache.camel.component.kafka.KafkaAsyncManualCommit
    public void processAsyncCommit() {
        commitAsyncOffset(getOffsetRepository(), getPartition(), getRecordOffset());
    }

    protected void commitAsyncOffset(StateRepository<String, String> stateRepository, TopicPartition topicPartition, long j) {
        if (j != -1) {
            if (stateRepository != null) {
                stateRepository.setState(serializeOffsetKey(topicPartition), serializeOffsetValue(j));
            } else {
                LOG.debug("CommitAsync {} from topic {} with offset: {}", new Object[]{getThreadId(), getTopicName(), Long.valueOf(j)});
                getConsumer().commitAsync(Collections.singletonMap(topicPartition, new OffsetAndMetadata(j + 1)), (map, exc) -> {
                    if (exc != null) {
                        LOG.error("Error during async commit for {} from topic {} with offset {}: ", new Object[]{getThreadId(), getTopicName(), Long.valueOf(j), exc});
                    } else {
                        LOG.debug("CommitAsync done for {} from topic {} with offset: {}", new Object[]{getThreadId(), getTopicName(), Long.valueOf(j)});
                    }
                });
            }
        }
    }
}
