package org.oracle.okafka.clients.producer.internals;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.producer.internals.SenderMetricsRegistry;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.InvalidMetadataException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.oracle.okafka.clients.KafkaClient;
import org.oracle.okafka.clients.Metadata;
import org.oracle.okafka.clients.producer.internals.RecordAccumulator;
import org.oracle.okafka.common.requests.ProduceRequest;
import org.oracle.okafka.common.requests.ProduceResponse;
import org.slf4j.Logger;

/* loaded from: input_file:org/oracle/okafka/clients/producer/internals/SenderThread.class */
public class SenderThread implements Runnable {
    private final Logger log;
    private final RecordAccumulator accumulator;
    private final KafkaClient client;
    private final Metadata metadata;
    private final boolean guaranteeMessageOrder;
    private final String clientId;
    private final int maxRequestSize;
    private final int retries;
    private final short acks;
    private final Time time;
    private final int requestTimeoutMs;
    private final long retryBackoffMs;
    private volatile boolean forceClose;
    private int correlation = 0;
    private volatile boolean running = true;

    public SenderThread(LogContext logContext, String str, KafkaClient kafkaClient, Metadata metadata, RecordAccumulator recordAccumulator, boolean z, int i, short s, int i2, SenderMetricsRegistry senderMetricsRegistry, Time time, int i3, long j) {
        this.log = logContext.logger(SenderThread.class);
        this.clientId = str;
        this.accumulator = recordAccumulator;
        this.client = kafkaClient;
        this.metadata = metadata;
        this.guaranteeMessageOrder = z;
        this.maxRequestSize = i;
        this.acks = s;
        this.time = time;
        this.retries = i2;
        this.requestTimeoutMs = i3;
        this.retryBackoffMs = j;
    }

    @Override // java.lang.Runnable
    public void run() {
        this.log.debug("Starting Kafka producer I/O thread.");
        while (this.running) {
            try {
                run(this.time.milliseconds());
            } catch (Exception e) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", e);
            }
        }
        this.log.debug("Beginning shutdown of Kafka producer I/O thread, sending remaining records.");
        while (!this.forceClose && this.accumulator.hasUndrained()) {
            try {
                run(this.time.milliseconds());
            } catch (Exception e2) {
                this.log.error("Uncaught error in kafka producer I/O thread: ", e2);
            }
        }
        if (this.forceClose) {
            this.log.debug("Aborting incomplete batches due to forced shutdown");
            this.accumulator.abortIncompleteBatches();
        }
        try {
            this.client.close();
        } catch (Exception e3) {
            this.log.error("failed to close AQ producer", e3);
        }
        this.log.debug("Shutdown of Kafka producer I/O thread has completed.");
    }

    void run(long j) {
        sendProducerData(j);
        this.client.maybeUpdateMetadata(j);
    }

    private long sendProducerData(long j) {
        RecordAccumulator.ReadyCheckResult ready = this.accumulator.ready(this.metadata.fetch(), j);
        if (!ready.unknownLeaderTopics.isEmpty()) {
            Iterator<String> it = ready.unknownLeaderTopics.iterator();
            while (it.hasNext()) {
                this.metadata.add(it.next());
            }
            this.log.debug("Requesting metadata update due to unknown leader topics from the batched records: {}", ready.unknownLeaderTopics);
            this.metadata.requestUpdate();
        }
        Iterator<Node> it2 = ready.readyNodes.iterator();
        long j2 = Long.MAX_VALUE;
        while (it2.hasNext()) {
            org.oracle.okafka.common.Node node = (org.oracle.okafka.common.Node) it2.next();
            if (!this.client.ready(node, j)) {
                it2.remove();
                j2 = Math.min(j2, this.client.pollDelayMs(node, j));
            }
        }
        Map<Integer, List<ProducerBatch>> drain = this.accumulator.drain(this.metadata.fetch(), ready.readyNodes, this.maxRequestSize, j);
        if (this.guaranteeMessageOrder) {
            Iterator<List<ProducerBatch>> it3 = drain.values().iterator();
            while (it3.hasNext()) {
                Iterator<ProducerBatch> it4 = it3.next().iterator();
                while (it4.hasNext()) {
                    this.accumulator.mutePartition(it4.next().topicPartition);
                }
            }
        }
        List<ProducerBatch> expiredBatches = this.accumulator.expiredBatches(this.requestTimeoutMs);
        if (!expiredBatches.isEmpty()) {
            this.log.trace("Expired {} batches in accumulator", Integer.valueOf(expiredBatches.size()));
        }
        for (ProducerBatch producerBatch : expiredBatches) {
            failBatch(producerBatch, -1L, -1L, new ArrayList(), new TimeoutException("Expiring " + producerBatch.recordCount + " record(s) for " + producerBatch.topicPartition + ":" + (j - producerBatch.createdMs) + " ms has passed since batch creation"));
        }
        long min = Math.min(ready.nextReadyCheckDelayMs, j2);
        if (!ready.readyNodes.isEmpty()) {
            this.log.trace("Instances with data ready to send: {}", ready.readyNodes);
            min = 0;
        }
        sendProduceRequests(drain, min);
        return min;
    }

    public void initiateClose() {
        this.accumulator.close();
        this.running = false;
    }

    public void forceClose() {
        this.forceClose = true;
        initiateClose();
    }

    private void sendProduceRequests(Map<Integer, List<ProducerBatch>> map, long j) {
        for (Map.Entry<Integer, List<ProducerBatch>> entry : map.entrySet()) {
            sendProduceRequest(this.metadata.getNodeById(entry.getKey().intValue()), entry.getValue());
        }
    }

    private void sendProduceRequest(org.oracle.okafka.common.Node node, List<ProducerBatch> list) {
        if (list.isEmpty()) {
            return;
        }
        for (final ProducerBatch producerBatch : list) {
            send(this.client.newClientRequest(node, new ProduceRequest.Builder(producerBatch.topicPartition, producerBatch.records(), (short) 1, -1), this.time.milliseconds(), true, -1, new RequestCompletionHandler() { // from class: org.oracle.okafka.clients.producer.internals.SenderThread.1
                public void onComplete(ClientResponse clientResponse) {
                    SenderThread.this.handleProduceResponse(clientResponse, producerBatch, SenderThread.this.time.milliseconds());
                }
            }));
        }
    }

    public void send(ClientRequest clientRequest) {
        completeResponse(this.client.send(clientRequest, this.time.milliseconds()));
    }

    private void completeResponse(ClientResponse clientResponse) {
        clientResponse.onComplete();
    }

    private void handleProduceResponse(ClientResponse clientResponse, ProducerBatch producerBatch, long j) {
        if (clientResponse.wasDisconnected()) {
            this.client.disconnected(this.metadata.getNodeById(Integer.parseInt(clientResponse.destination())), j);
            this.metadata.requestUpdate();
        }
        long receivedTimeMs = clientResponse.receivedTimeMs();
        completeBatch(producerBatch, ((ProduceResponse) clientResponse.responseBody()).getPartitionResponse(), clientResponse.requestHeader().correlationId(), j, receivedTimeMs + r0.throttleTimeMs());
    }

    private void completeBatch(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse, long j, long j2, long j3) {
        RuntimeException exception = partitionResponse.exception();
        if (exception != null) {
            if (canRetry(producerBatch, partitionResponse)) {
                reenqueueBatch(producerBatch, j2);
            } else {
                failBatch(producerBatch, partitionResponse, exception);
            }
            if (exception instanceof InvalidMetadataException) {
                this.metadata.requestUpdate();
            }
        } else {
            completeBatch(producerBatch, partitionResponse);
        }
        if (this.guaranteeMessageOrder) {
            this.accumulator.unmutePartition(producerBatch.topicPartition);
        }
    }

    private void completeBatch(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse) {
        if (producerBatch.done(partitionResponse.subPartitionId * 20000, partitionResponse.logAppendTime, partitionResponse.msgIds, null)) {
            this.accumulator.deallocate(producerBatch);
        }
    }

    private void failBatch(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse, RuntimeException runtimeException) {
        failBatch(producerBatch, partitionResponse.subPartitionId, partitionResponse.logAppendTime, partitionResponse.msgIds, runtimeException);
    }

    private void failBatch(ProducerBatch producerBatch, long j, long j2, List<String> list, RuntimeException runtimeException) {
        if (producerBatch.done(j, j2, list, runtimeException)) {
            this.accumulator.deallocate(producerBatch);
        }
    }

    private void reenqueueBatch(ProducerBatch producerBatch, long j) {
        this.accumulator.reenqueue(producerBatch, j);
    }

    private boolean canRetry(ProducerBatch producerBatch, ProduceResponse.PartitionResponse partitionResponse) {
        return producerBatch.attempts() < this.retries && (partitionResponse.exception instanceof RetriableException);
    }

    public void wakeup() {
    }

    public boolean isRunning() {
        return this.running;
    }
}
