package org.apache.camel.component.kafka.producer.support;

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.component.kafka.KafkaConfiguration;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/camel/component/kafka/producer/support/KafkaProducerCallBack.class */
public class KafkaProducerCallBack implements Callback {
    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerCallBack.class);
    private final Object body;
    private final AsyncCallback callback;
    private final AtomicInteger count;
    private final List<RecordMetadata> recordMetadatas;
    private final ExecutorService workerPool;

    public KafkaProducerCallBack(Object obj, ExecutorService executorService, KafkaConfiguration kafkaConfiguration) {
        this(obj, null, executorService, kafkaConfiguration);
    }

    public KafkaProducerCallBack(Object obj, AsyncCallback asyncCallback, ExecutorService executorService, KafkaConfiguration kafkaConfiguration) {
        this.count = new AtomicInteger(1);
        this.recordMetadatas = new ArrayList();
        this.body = obj;
        this.callback = asyncCallback;
        this.workerPool = (ExecutorService) Objects.requireNonNull(executorService, "A worker pool must be provided");
        if (kafkaConfiguration.isRecordMetadata()) {
            if (obj instanceof Exchange) {
                ((Exchange) obj).getMessage().setHeader(KafkaConstants.KAFKA_RECORDMETA, this.recordMetadatas);
            }
            if (obj instanceof Message) {
                ((Message) obj).setHeader(KafkaConstants.KAFKA_RECORDMETA, this.recordMetadatas);
            }
        }
    }

    public void increment() {
        this.count.incrementAndGet();
    }

    public boolean allSent() {
        if (this.count.decrementAndGet() != 0) {
            return false;
        }
        LOG.trace("All messages sent, continue routing.");
        if (this.callback == null) {
            return true;
        }
        this.callback.done(true);
        return true;
    }

    public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
        if (exc != null) {
            if (this.body instanceof Exchange) {
                ((Exchange) this.body).setException(exc);
            }
            if ((this.body instanceof Message) && ((Message) this.body).getExchange() != null) {
                ((Message) this.body).getExchange().setException(exc);
            }
        }
        this.recordMetadatas.add(recordMetadata);
        if (this.count.decrementAndGet() == 0) {
            this.workerPool.submit(new Runnable() { // from class: org.apache.camel.component.kafka.producer.support.KafkaProducerCallBack.1
                @Override // java.lang.Runnable
                public void run() {
                    KafkaProducerCallBack.LOG.trace("All messages sent, continue routing.");
                    if (KafkaProducerCallBack.this.callback != null) {
                        KafkaProducerCallBack.this.callback.done(false);
                    }
                }
            });
        }
    }
}
