package org.apache.kafka.streams.processor.internals;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.internals.ConfluentConfigs;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.security.oauthbearer.CommonExtensionsValidatorCallback;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.internals.StreamsConfigUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamsProducer.class */
public class StreamsProducer {
    private final Logger log;
    private final String logPrefix;
    private final Map<String, Object> eosV2ProducerConfigs;
    private final KafkaClientSupplier clientSupplier;
    private final StreamsConfigUtils.ProcessingMode processingMode;
    private final Time time;
    private Producer<byte[], byte[]> producer;
    private boolean transactionInFlight = false;
    private boolean transactionInitialized = false;
    private double oldProducerTotalBlockedTime = ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT;

    public StreamsProducer(StreamsConfig streamsConfig, String str, KafkaClientSupplier kafkaClientSupplier, TaskId taskId, UUID uuid, LogContext logContext, Time time) {
        Map<String, Object> producerConfigs;
        Objects.requireNonNull(streamsConfig, "config cannot be null");
        Objects.requireNonNull(str, "threadId cannot be null");
        this.clientSupplier = (KafkaClientSupplier) Objects.requireNonNull(kafkaClientSupplier, "clientSupplier cannot be null");
        this.log = ((LogContext) Objects.requireNonNull(logContext, "logContext cannot be null")).logger(getClass());
        this.logPrefix = logContext.logPrefix().trim();
        this.time = (Time) Objects.requireNonNull(time, "time");
        this.processingMode = StreamsConfigUtils.processingMode(streamsConfig);
        switch (this.processingMode) {
            case AT_LEAST_ONCE:
                producerConfigs = streamsConfig.getProducerConfigs(ClientUtils.getThreadProducerClientId(str));
                this.eosV2ProducerConfigs = null;
                break;
            case EXACTLY_ONCE_ALPHA:
                producerConfigs = streamsConfig.getProducerConfigs(ClientUtils.getTaskProducerClientId(str, (TaskId) Objects.requireNonNull(taskId, "taskId cannot be null for exactly-once alpha")));
                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + CommonExtensionsValidatorCallback.SEPARATOR + taskId);
                this.eosV2ProducerConfigs = null;
                break;
            case EXACTLY_ONCE_V2:
                producerConfigs = streamsConfig.getProducerConfigs(ClientUtils.getThreadProducerClientId(str));
                producerConfigs.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, streamsConfig.getString(StreamsConfig.APPLICATION_ID_CONFIG) + CommonExtensionsValidatorCallback.SEPARATOR + Objects.requireNonNull(uuid, "processId cannot be null for exactly-once v2") + CommonExtensionsValidatorCallback.SEPARATOR + str.split("-StreamThread-")[1]);
                this.eosV2ProducerConfigs = producerConfigs;
                break;
            default:
                throw new IllegalArgumentException("Unknown processing mode: " + this.processingMode);
        }
        this.producer = kafkaClientSupplier.getProducer(producerConfigs);
    }

    private String formatException(String str) {
        return str + " [" + this.logPrefix + "]";
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean eosEnabled() {
        return StreamsConfigUtils.eosEnabled(this.processingMode);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean transactionInFlight() {
        return this.transactionInFlight;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void initTransaction() {
        if (!eosEnabled()) {
            throw new IllegalStateException(formatException("Exactly-once is not enabled"));
        }
        if (this.transactionInitialized) {
            return;
        }
        try {
            this.producer.initTransactions();
            this.transactionInitialized = true;
        } catch (TimeoutException e) {
            this.log.warn("Timeout exception caught trying to initialize transactions. The broker is either slow or in bad state (like not having enough replicas) in responding to the request, or the connection to broker was interrupted sending the request or receiving the response. Will retry initializing the task in the next loop. Consider overwriting {} to a larger value to avoid timeout errors", ProducerConfig.MAX_BLOCK_MS_CONFIG);
            throw e;
        } catch (KafkaException e2) {
            throw new StreamsException(formatException("Error encountered trying to initialize transactions"), e2);
        }
    }

    public void resetProducer() {
        if (this.processingMode != StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2) {
            throw new IllegalStateException("Expected eos-v2 to be enabled, but the processing mode was " + this.processingMode);
        }
        this.oldProducerTotalBlockedTime += totalBlockedTime(this.producer);
        long nanoseconds = this.time.nanoseconds();
        this.producer.close();
        this.oldProducerTotalBlockedTime += this.time.nanoseconds() - nanoseconds;
        this.producer = this.clientSupplier.getProducer(this.eosV2ProducerConfigs);
        this.transactionInitialized = false;
    }

    private double getMetricValue(Map<MetricName, ? extends Metric> map, String str) {
        List list = (List) map.keySet().stream().filter(metricName -> {
            return metricName.name().equals(str);
        }).collect(Collectors.toList());
        if (list.isEmpty()) {
            return ConfluentConfigs.IP_CONNECTION_CREATION_RATE_THROTTLE_ENABLE_THRESHOLD_DEFAULT;
        }
        if (list.size() <= 1) {
            return ((Double) map.get(list.get(0)).metricValue()).doubleValue();
        }
        String format = String.format("found %d values for metric %s. total blocked time computation may be incorrect", Integer.valueOf(list.size()), str);
        this.log.error(format);
        throw new IllegalStateException(format);
    }

    private double totalBlockedTime(Producer<?, ?> producer) {
        return getMetricValue(producer.metrics(), "bufferpool-wait-time-ns-total") + getMetricValue(producer.metrics(), "flush-time-ns-total") + getMetricValue(producer.metrics(), "txn-init-time-ns-total") + getMetricValue(producer.metrics(), "txn-begin-time-ns-total") + getMetricValue(producer.metrics(), "txn-send-offsets-time-ns-total") + getMetricValue(producer.metrics(), "txn-commit-time-ns-total") + getMetricValue(producer.metrics(), "txn-abort-time-ns-total") + getMetricValue(producer.metrics(), "metadata-wait-time-ns-total");
    }

    public double totalBlockedTime() {
        return this.oldProducerTotalBlockedTime + totalBlockedTime(this.producer);
    }

    private void maybeBeginTransaction() {
        if (!eosEnabled() || this.transactionInFlight) {
            return;
        }
        try {
            this.producer.beginTransaction();
            this.transactionInFlight = true;
        } catch (InvalidProducerEpochException | ProducerFencedException e) {
            throw new TaskMigratedException(formatException("Producer got fenced trying to begin a new transaction"), e);
        } catch (KafkaException e2) {
            throw new StreamsException(formatException("Error encountered trying to begin a new transaction"), e2);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord, Callback callback) {
        maybeBeginTransaction();
        try {
            return this.producer.send(producerRecord, callback);
        } catch (KafkaException e) {
            if (isRecoverable(e)) {
                throw new TaskMigratedException(formatException("Producer got fenced trying to send a record"), e.getCause());
            }
            throw new StreamsException(formatException(String.format("Error encountered trying to send record to topic %s", producerRecord.topic())), e);
        }
    }

    private static boolean isRecoverable(KafkaException kafkaException) {
        return (kafkaException.getCause() instanceof ProducerFencedException) || (kafkaException.getCause() instanceof InvalidProducerEpochException) || (kafkaException.getCause() instanceof UnknownProducerIdException);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void commitTransaction(Map<TopicPartition, OffsetAndMetadata> map, ConsumerGroupMetadata consumerGroupMetadata) {
        if (!eosEnabled()) {
            throw new IllegalStateException(formatException("Exactly-once is not enabled"));
        }
        maybeBeginTransaction();
        try {
            this.producer.sendOffsetsToTransaction(map, this.processingMode == StreamsConfigUtils.ProcessingMode.EXACTLY_ONCE_V2 ? consumerGroupMetadata : new ConsumerGroupMetadata(consumerGroupMetadata.groupId()));
            this.producer.commitTransaction();
            this.transactionInFlight = false;
        } catch (CommitFailedException | InvalidProducerEpochException | ProducerFencedException e) {
            throw new TaskMigratedException(formatException("Producer got fenced trying to commit a transaction"), e);
        } catch (TimeoutException e2) {
            throw e2;
        } catch (KafkaException e3) {
            throw new StreamsException(formatException("Error encountered trying to commit a transaction"), e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void abortTransaction() {
        if (!eosEnabled()) {
            throw new IllegalStateException(formatException("Exactly-once is not enabled"));
        }
        if (this.transactionInFlight) {
            try {
                this.producer.abortTransaction();
            } catch (InvalidProducerEpochException | ProducerFencedException e) {
                this.log.debug("Encountered {} while aborting the transaction; this is expected and hence swallowed", e.getMessage());
            } catch (TimeoutException e2) {
                this.log.warn("Aborting transaction failed due to timeout. Will rely on broker to eventually abort the transaction after the transaction timeout passed.", e2);
            } catch (KafkaException e3) {
                throw new StreamsException(formatException("Error encounter trying to abort a transaction"), e3);
            }
            this.transactionInFlight = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public List<PartitionInfo> partitionsFor(String str) {
        return this.producer.partitionsFor(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Map<MetricName, ? extends Metric> metrics() {
        return this.producer.metrics();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void flush() {
        this.producer.flush();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void close() {
        this.producer.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Producer<byte[], byte[]> kafkaProducer() {
        return this.producer;
    }
}
