package org.apache.kafka.clients.producer.internals;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collections;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.RequestCompletionHandler;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.RecordAccumulator;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.InvalidRecordException;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.MetricNameTemplate;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.ClusterAuthorizationException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.NetworkException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.errors.TransactionAbortedException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
import org.apache.kafka.common.message.EndTxnResponseData;
import org.apache.kafka.common.message.InitProducerIdResponseData;
import org.apache.kafka.common.message.ProduceRequestData;
import org.apache.kafka.common.message.ProduceResponseData;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.network.KafkaChannelTest;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionRatioEstimator;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MutableRecordBatch;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.AddPartitionsToTxnResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.EndTxnRequest;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
import org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdRequest;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.ProduceRequest;
import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TransactionResult;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.ProducerIdAndEpoch;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.DelayedReceive;
import org.apache.kafka.test.MockSelector;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.mockito.AdditionalMatchers;
import org.mockito.ArgumentMatchers;
import org.mockito.InOrder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest.class */
public class SenderTest {
    private static final int MAX_REQUEST_SIZE = 1048576;
    private static final short ACKS_ALL = -1;
    private static final String CLIENT_ID = "clientId";
    private static final double EPS = 1.0E-4d;
    private static final int MAX_BLOCK_TIMEOUT = 1000;
    private static final int REQUEST_TIMEOUT = 5000;
    private static final long RETRY_BACKOFF_MS = 50;
    private static final int DELIVERY_TIMEOUT_MS = 1500;
    private static final long TOPIC_IDLE_MS = 60000;
    private TopicPartition tp0 = new TopicPartition("test", 0);
    private TopicPartition tp1 = new TopicPartition("test", 1);
    private MockTime time = new MockTime();
    private int batchSize = 16384;
    private ProducerMetadata metadata = new ProducerMetadata(0, Long.MAX_VALUE, TOPIC_IDLE_MS, new LogContext(), new ClusterResourceListeners(), this.time);
    private MockClient client = new MockClient((Time) this.time, (Metadata) this.metadata);
    private ApiVersions apiVersions = new ApiVersions();
    private Metrics metrics = null;
    private RecordAccumulator accumulator = null;
    private Sender sender = null;
    private SenderMetricsRegistry senderMetricsRegistry = null;
    private final LogContext logContext = new LogContext();

    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest$AssertEndTxnRequestMatcher.class */
    class AssertEndTxnRequestMatcher implements MockClient.RequestMatcher {
        private TransactionResult requiredResult;
        private boolean matched = false;

        AssertEndTxnRequestMatcher(TransactionResult transactionResult) {
            this.requiredResult = transactionResult;
        }

        @Override // org.apache.kafka.clients.MockClient.RequestMatcher
        public boolean matches(AbstractRequest abstractRequest) {
            if (!(abstractRequest instanceof EndTxnRequest)) {
                return false;
            }
            Assertions.assertSame(this.requiredResult, ((EndTxnRequest) abstractRequest).result());
            this.matched = true;
            return true;
        }
    }

    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest$MatchingBufferPool.class */
    private class MatchingBufferPool extends BufferPool {
        IdentityHashMap<ByteBuffer, Boolean> allocatedBuffers;

        MatchingBufferPool(long j, int i, Metrics metrics, Time time, String str) {
            super(j, i, metrics, time, str);
            this.allocatedBuffers = new IdentityHashMap<>();
        }

        public ByteBuffer allocate(int i, long j) throws InterruptedException {
            ByteBuffer allocate = super.allocate(i, j);
            this.allocatedBuffers.put(allocate, Boolean.TRUE);
            return allocate;
        }

        public void deallocate(ByteBuffer byteBuffer, int i) {
            if (!this.allocatedBuffers.containsKey(byteBuffer)) {
                throw new IllegalStateException("Deallocating a buffer that is not allocated");
            }
            this.allocatedBuffers.remove(byteBuffer);
            super.deallocate(byteBuffer, i);
        }

        public boolean allMatch() {
            return this.allocatedBuffers.isEmpty();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/clients/producer/internals/SenderTest$OffsetAndError.class */
    public static class OffsetAndError {
        final long offset;
        final Errors error;
        final List<ProduceResponseData.BatchIndexAndErrorMessage> recordErrors;

        OffsetAndError(long j, Errors errors, List<ProduceResponseData.BatchIndexAndErrorMessage> list) {
            this.offset = j;
            this.error = errors;
            this.recordErrors = list;
        }

        OffsetAndError(long j, Errors errors) {
            this(j, errors, Collections.emptyList());
        }
    }

    @BeforeEach
    public void setup() {
        setupWithTransactionState(null);
    }

    @AfterEach
    public void tearDown() {
        this.metrics.close();
    }

    private static Map<TopicPartition, MemoryRecords> partitionRecords(ProduceRequest produceRequest) {
        HashMap hashMap = new HashMap();
        produceRequest.data().topicData().forEach(topicProduceData -> {
            topicProduceData.partitionData().forEach(partitionProduceData -> {
                hashMap.put(new TopicPartition(topicProduceData.name(), partitionProduceData.index()), partitionProduceData.records());
            });
        });
        return Collections.unmodifiableMap(hashMap);
    }

    @Test
    public void testSimple() throws Exception {
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount(), "We should have a single produce request in flight.");
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertTrue(this.client.hasInFlightRequests());
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount(), "All requests completed.");
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone(), "Request should be completed");
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
    }

    @Test
    public void testMessageFormatDownConversion() throws Exception {
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
        this.client.prepareResponse(abstractRequest -> {
            MemoryRecords memoryRecords;
            ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
            return produceRequest.version() == 2 && (memoryRecords = partitionRecords(produceRequest).get(this.tp0)) != null && memoryRecords.sizeInBytes() > 0 && memoryRecords.hasMatchingMagic((byte) 1);
        }, (AbstractResponse) produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone(), "Request should be completed");
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
    }

    @Test
    public void testDownConversionForMismatchedMagicValues() throws Exception {
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(ApiKeys.PRODUCE.id, (short) 0, (short) 2));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp1, 0L, "key", "value");
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create());
        ProduceResponse.PartitionResponse partitionResponse = new ProduceResponse.PartitionResponse(Errors.NONE, 0L, -1L, 100L);
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, partitionResponse);
        hashMap.put(this.tp1, partitionResponse);
        this.client.prepareResponse(abstractRequest -> {
            ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
            if (produceRequest.version() != 2) {
                return false;
            }
            Map<TopicPartition, MemoryRecords> partitionRecords = partitionRecords(produceRequest);
            if (partitionRecords.size() != 2) {
                return false;
            }
            for (MemoryRecords memoryRecords : partitionRecords.values()) {
                if (memoryRecords == null || memoryRecords.sizeInBytes() == 0 || !memoryRecords.hasMatchingMagic((byte) 1)) {
                    return false;
                }
            }
            return true;
        }, new ProduceResponse(hashMap, 0));
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone(), "Request should be completed");
        Assertions.assertTrue(appendToAccumulator2.isDone(), "Request should be completed");
    }

    @Test
    public void testQuotaMetrics() {
        MockSelector mockSelector = new MockSelector(this.time);
        Sensor throttleTimeSensor = Sender.throttleTimeSensor(this.senderMetricsRegistry);
        Node node = (Node) TestUtils.singletonCluster("test", 1).nodes().get(0);
        NetworkClient networkClient = new NetworkClient(mockSelector, this.metadata, "mock", Integer.MAX_VALUE, 1000L, 1000L, 65536, 65536, MAX_BLOCK_TIMEOUT, 10000L, 127000L, this.time, true, new ApiVersions(), throttleTimeSensor, this.logContext);
        mockSelector.delayedReceive(new DelayedReceive(node.idString(), new NetworkReceive(node.idString(), RequestTestUtils.serializeResponseWithHeader(ApiVersionsResponse.defaultApiVersionsResponse(400, ApiMessageType.ListenerType.ZK_BROKER), ApiKeys.API_VERSIONS.latestVersion(), 0))));
        while (!networkClient.ready(node, this.time.milliseconds())) {
            networkClient.poll(1L, this.time.milliseconds());
            this.time.sleep(networkClient.throttleDelayMs(node, this.time.milliseconds()));
        }
        mockSelector.clear();
        for (int i = 1; i <= 3; i++) {
            ClientRequest newClientRequest = networkClient.newClientRequest(node.idString(), ProduceRequest.forCurrentMagic(new ProduceRequestData().setTopicData(new ProduceRequestData.TopicProduceDataCollection()).setAcks((short) 1).setTimeoutMs(MAX_BLOCK_TIMEOUT)), this.time.milliseconds(), true);
            networkClient.send(newClientRequest, this.time.milliseconds());
            networkClient.poll(1L, this.time.milliseconds());
            mockSelector.completeReceive(new NetworkReceive(node.idString(), RequestTestUtils.serializeResponseWithHeader(produceResponse(this.tp0, i, Errors.NONE, 100 * i), ApiKeys.PRODUCE.latestVersion(), newClientRequest.correlationId())));
            networkClient.poll(1L, this.time.milliseconds());
            this.time.sleep(networkClient.throttleDelayMs(node, this.time.milliseconds()));
            mockSelector.clear();
        }
        Map metrics = this.metrics.metrics();
        KafkaMetric kafkaMetric = (KafkaMetric) metrics.get(this.senderMetricsRegistry.produceThrottleTimeAvg);
        KafkaMetric kafkaMetric2 = (KafkaMetric) metrics.get(this.senderMetricsRegistry.produceThrottleTimeMax);
        Assertions.assertEquals(250.0d, ((Double) kafkaMetric.metricValue()).doubleValue(), EPS);
        Assertions.assertEquals(400.0d, ((Double) kafkaMetric2.metricValue()).doubleValue(), EPS);
        networkClient.close();
    }

    @Test
    public void testSenderMetricsTemplates() throws Exception {
        this.metrics.close();
        this.metrics = new Metrics(new MetricConfig().tags(Collections.singletonMap("client-id", "clientA")));
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, (TransactionManager) null, this.apiVersions);
        appendToAccumulator(this.tp0, 0L, "key", "value");
        sender.runOnce();
        sender.runOnce();
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.runOnce();
        Sender.throttleTimeSensor(senderMetricsRegistry);
        HashSet hashSet = new HashSet();
        for (MetricName metricName : this.metrics.metrics().keySet()) {
            if (!metricName.group().equals("kafka-metrics-count")) {
                hashSet.add(new MetricNameTemplate(metricName.name(), metricName.group(), "", metricName.tags().keySet()));
            }
        }
        TestUtils.checkEquals(hashSet, new HashSet(senderMetricsRegistry.allTemplates()), "metrics", "templates");
    }

    @Test
    public void testRetries() throws Exception {
        Metrics metrics = new Metrics();
        try {
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, (TransactionManager) null, this.apiVersions);
            FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            sender.runOnce();
            String destination = this.client.requests().peek().destination();
            Node node = new Node(Integer.parseInt(destination), "localhost", 0);
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            Assertions.assertEquals(1, sender.inFlightBatches(this.tp0).size());
            Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be true");
            this.client.disconnect(destination);
            Assertions.assertEquals(0, this.client.inFlightRequestCount());
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertFalse(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be false");
            Assertions.assertEquals(1, sender.inFlightBatches(this.tp0).size());
            sender.runOnce();
            sender.runOnce();
            sender.runOnce();
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            Assertions.assertEquals(1, sender.inFlightBatches(this.tp0).size());
            this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
            sender.runOnce();
            Assertions.assertTrue(appendToAccumulator.isDone(), "Request should have retried and completed");
            Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
            Assertions.assertEquals(0, sender.inFlightBatches(this.tp0).size());
            FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            Assertions.assertEquals(1, sender.inFlightBatches(this.tp0).size());
            int i = 0;
            while (i < 1 + 1) {
                this.client.disconnect(this.client.requests().peek().destination());
                sender.runOnce();
                Assertions.assertEquals(0, sender.inFlightBatches(this.tp0).size());
                sender.runOnce();
                sender.runOnce();
                Assertions.assertEquals(i > 0 ? 0 : 1, sender.inFlightBatches(this.tp0).size());
                i++;
            }
            sender.runOnce();
            assertFutureFailure(appendToAccumulator2, NetworkException.class);
            Assertions.assertEquals(0, sender.inFlightBatches(this.tp0).size());
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testSendInOrder() throws Exception {
        Metrics metrics = new Metrics();
        try {
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 1, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, (TransactionManager) null, this.apiVersions);
            this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap("test", 2)));
            TopicPartition topicPartition = new TopicPartition("test", 1);
            appendToAccumulator(topicPartition, 0L, "key1", "value1");
            sender.runOnce();
            sender.runOnce();
            String destination = this.client.requests().peek().destination();
            Assertions.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
            Node node = new Node(Integer.parseInt(destination), "localhost", 0);
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be true");
            Assertions.assertEquals(1, sender.inFlightBatches(topicPartition).size());
            this.time.sleep(900L);
            appendToAccumulator(topicPartition, 0L, "key2", "value2");
            this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
            Assertions.assertEquals(1, sender.inFlightBatches(topicPartition).size());
            sender.runOnce();
            Assertions.assertEquals(1, this.client.inFlightRequestCount());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            Assertions.assertEquals(1, sender.inFlightBatches(topicPartition).size());
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testAppendInExpiryCallback() throws InterruptedException {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final AtomicReference atomicReference = new AtomicReference();
        final byte[] bytes = "key".getBytes();
        final byte[] bytes2 = "value".getBytes();
        final Cluster singletonCluster = TestUtils.singletonCluster();
        RecordAccumulator.AppendCallbacks appendCallbacks = new RecordAccumulator.AppendCallbacks() { // from class: org.apache.kafka.clients.producer.internals.SenderTest.1
            public void setPartition(int i) {
            }

            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (!(exc instanceof TimeoutException)) {
                    if (exc != null) {
                        atomicReference.compareAndSet(null, exc);
                    }
                } else {
                    atomicInteger.incrementAndGet();
                    try {
                        SenderTest.this.accumulator.append(SenderTest.this.tp1.topic(), SenderTest.this.tp1.partition(), 0L, bytes, bytes2, Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, false, SenderTest.this.time.milliseconds(), singletonCluster);
                    } catch (InterruptedException e) {
                        throw new RuntimeException("Unexpected interruption", e);
                    }
                }
            }
        };
        long milliseconds = this.time.milliseconds();
        for (int i = 0; i < 10; i++) {
            this.accumulator.append(this.tp1.topic(), this.tp1.partition(), 0L, bytes, bytes2, (Header[]) null, appendCallbacks, 1000L, false, milliseconds, singletonCluster);
        }
        this.time.sleep(10000L);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.sender.addToInflightBatches(this.accumulator.drain(this.metadata.fetch(), Collections.singleton(node), Integer.MAX_VALUE, this.time.milliseconds()));
        this.client.disconnect(node.idString());
        this.client.backoff(node, 100L);
        this.sender.runOnce();
        Assertions.assertEquals(10, atomicInteger.get(), "Callbacks not invoked for expiry");
        Assertions.assertNull(atomicReference.get(), "Unexpected exception");
        Assertions.assertNotNull(this.accumulator.getDeque(this.tp1));
        Assertions.assertEquals(1, this.accumulator.getDeque(this.tp1).size());
        Assertions.assertEquals(10, ((ProducerBatch) this.accumulator.getDeque(this.tp1).peekFirst()).recordCount);
    }

    @Test
    public void testMetadataTopicExpiry() throws Exception {
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertTrue(this.metadata.containsTopic(this.tp0.topic()), "Topic not added to metadata");
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.sender.runOnce();
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount(), "Request completed.");
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone(), "Request should be completed");
        Assertions.assertTrue(this.metadata.containsTopic(this.tp0.topic()), "Topic not retained in metadata list");
        this.time.sleep(TOPIC_IDLE_MS);
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        Assertions.assertFalse(this.metadata.containsTopic(this.tp0.topic()), "Unused topic has not been expired");
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertTrue(this.metadata.containsTopic(this.tp0.topic()), "Topic not added to metadata");
        this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        this.sender.runOnce();
        this.client.respond(produceResponse(this.tp0, 0 + 1, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount(), "Request completed.");
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator2.isDone(), "Request should be completed");
    }

    @Test
    public void testNodeLatencyStats() throws Exception {
        Metrics metrics = new Metrics();
        Throwable th = null;
        try {
            this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.NONE, 0, 0L, DELIVERY_TIMEOUT_MS, new RecordAccumulator.PartitionerConfig(false, 42L), metrics, "producer-metrics", this.time, this.apiVersions, (TransactionManager) null, new BufferPool(1048576L, this.batchSize, metrics, this.time, "producer-internal-metrics"));
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, new SenderMetricsRegistry(metrics), this.time, REQUEST_TIMEOUT, 1000L, (TransactionManager) null, new ApiVersions());
            long milliseconds = this.time.milliseconds();
            appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            Assertions.assertEquals(1, this.client.inFlightRequestCount(), "We should have a single produce request in flight.");
            RecordAccumulator.NodeLatencyStats nodeLatencyStats = this.accumulator.getNodeLatencyStats(0);
            Assertions.assertEquals(milliseconds, nodeLatencyStats.drainTimeMs);
            Assertions.assertEquals(milliseconds, nodeLatencyStats.readyTimeMs);
            this.client.throttle(this.metadata.fetch().nodeById(0), 100L);
            this.time.sleep(10L);
            sender.runOnce();
            Assertions.assertEquals(1, this.client.inFlightRequestCount(), "We should have a single produce request in flight.");
            Assertions.assertEquals(milliseconds, nodeLatencyStats.drainTimeMs);
            Assertions.assertEquals(milliseconds, nodeLatencyStats.readyTimeMs);
            long milliseconds2 = this.time.milliseconds();
            appendToAccumulator(this.tp0, 0L, "key", "value");
            sender.runOnce();
            Assertions.assertEquals(1, this.client.inFlightRequestCount(), "We should have a single produce request in flight.");
            Assertions.assertEquals(milliseconds, nodeLatencyStats.drainTimeMs);
            Assertions.assertEquals(milliseconds2, nodeLatencyStats.readyTimeMs);
            this.time.sleep(10L);
            long milliseconds3 = this.time.milliseconds();
            sender.runOnce();
            Assertions.assertEquals(1, this.client.inFlightRequestCount(), "We should have a single produce request in flight.");
            Assertions.assertEquals(milliseconds, nodeLatencyStats.drainTimeMs);
            Assertions.assertEquals(milliseconds3, nodeLatencyStats.readyTimeMs);
            this.time.sleep(100L);
            long milliseconds4 = this.time.milliseconds();
            sender.runOnce();
            Assertions.assertEquals(2, this.client.inFlightRequestCount(), "We should have 2 produce requests in flight.");
            Assertions.assertEquals(milliseconds4, nodeLatencyStats.drainTimeMs);
            Assertions.assertEquals(milliseconds4, nodeLatencyStats.readyTimeMs);
            if (metrics != null) {
                if (0 == 0) {
                    metrics.close();
                    return;
                }
                try {
                    metrics.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (metrics != null) {
                if (0 != 0) {
                    try {
                        metrics.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    metrics.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testInitProducerIdRequest() {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(343434L, createTransactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals((short) 0, createTransactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testInitProducerIdWithMaxInFlightOne() throws Exception {
        createMockClientWithMaxFlightOneMetadataPending();
        TransactionManager transactionManager = new TransactionManager(new LogContext(), "testInitProducerIdWithPendingMetadataRequest", 60000, 100L, new ApiVersions());
        setupWithTransactionState(transactionManager, false, null, false);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        transactionManager.initializeTransactions();
        this.sender.runOnce();
        this.client.respond(RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap()));
        prepareFindCoordinatorResponse(Errors.NONE, "testInitProducerIdWithPendingMetadataRequest");
        prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        waitForProducerId(transactionManager, producerIdAndEpoch);
    }

    @Test
    public void testIdempotentInitProducerIdWithMaxInFlightOne() throws Exception {
        createMockClientWithMaxFlightOneMetadataPending();
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager, false, null, false);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        this.client.respond(RequestTestUtils.metadataUpdateWith(1, Collections.emptyMap()));
        this.sender.runOnce();
        this.sender.runOnce();
        this.client.respond(initProducerIdResponse(producerIdAndEpoch.producerId, producerIdAndEpoch.epoch, Errors.NONE));
        waitForProducerId(createTransactionManager, producerIdAndEpoch);
    }

    @Test
    public void testNodeNotReady() {
        this.time = new MockTime(10L);
        this.client = new MockClient((Time) this.time, (Metadata) this.metadata);
        TransactionManager transactionManager = new TransactionManager(new LogContext(), "testNodeNotReady", 60000, 100L, new ApiVersions());
        setupWithTransactionState(transactionManager, false, null, true);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        transactionManager.initializeTransactions();
        this.sender.runOnce();
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.client.delayReady(node, 5020L);
        prepareFindCoordinatorResponse(Errors.NONE, "testNodeNotReady");
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertNotNull(transactionManager.coordinator(FindCoordinatorRequest.CoordinatorType.TRANSACTION), "Coordinator not found");
        this.client.throttle(node, 5020L);
        prepareFindCoordinatorResponse(Errors.NONE, "Coordinator not found");
        prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        waitForProducerId(transactionManager, producerIdAndEpoch);
    }

    @Test
    public void testClusterAuthorizationExceptionInInitProducerIdRequest() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.CLUSTER_AUTHORIZATION_FAILED);
        Assertions.assertFalse(createTransactionManager.hasProducerId());
        Assertions.assertTrue(createTransactionManager.hasError());
        Assertions.assertTrue(createTransactionManager.lastError() instanceof ClusterAuthorizationException);
        assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCanRetryWithoutIdempotence() throws Exception {
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        Node node = new Node(Integer.parseInt(this.client.requests().peek().destination()), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertTrue(this.client.hasInFlightRequests());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be true");
        Assertions.assertFalse(appendToAccumulator.isDone());
        this.client.respond(abstractRequest -> {
            Assertions.assertFalse(RequestTestUtils.hasIdempotentRecords((ProduceRequest) abstractRequest));
            return true;
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.TOPIC_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        try {
            appendToAccumulator.get();
        } catch (Exception e) {
            Assertions.assertTrue(e.getCause() instanceof TopicAuthorizationException);
        }
    }

    @Test
    public void testIdempotenceWithMultipleInflights() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1L, ((RecordMetadata) appendToAccumulator2.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsRetriedInOrder() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(3, this.client.inFlightRequestCount());
        Assertions.assertEquals(3L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertFalse(appendToAccumulator3.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(0, this.tp0, Errors.LEADER_NOT_AVAILABLE, -1L);
        this.sender.runOnce();
        FutureRecordMetadata appendToAccumulator4 = appendToAccumulator(this.tp0);
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals(3L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        sendIdempotentProducerResponse(2, this.tp0, Errors.NONE, 2L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(2), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator3.isDone());
        Assertions.assertEquals(2L, ((RecordMetadata) appendToAccumulator3.get()).offset());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        sendIdempotentProducerResponse(3, this.tp0, Errors.NONE, 3L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(3), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator4.isDone());
        Assertions.assertEquals(3L, ((RecordMetadata) appendToAccumulator4.get()).offset());
    }

    @Test
    public void testIdempotenceWithMultipleInflightsWhereFirstFailsFatallyAndSequenceOfFutureBatchesIsAdjusted() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(0, this.tp0, Errors.MESSAGE_TOO_LARGE, -1L);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, RecordTooLargeException.class);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator2.get()).offset());
    }

    @Test
    public void testEpochBumpOnOutOfOrderSequenceForNextBatch() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(3L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        sendIdempotentProducerResponse(2, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals(1, createTransactionManager.sequenceNumber(this.tp0).intValue());
        Assertions.assertEquals(0, createTransactionManager.firstInFlightSequence(this.tp0));
    }

    @Test
    public void testEpochBumpOnOutOfOrderSequenceForNextBatchWhenThereIsNoBatchInFlight() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertEquals(343434L, createTransactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals(0, createTransactionManager.producerIdAndEpoch().epoch);
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 0, 1L, OptionalInt.empty());
        sendIdempotentProducerResponse(0, 0, this.tp0, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 0, 1L, OptionalInt.of(0));
        appendToAccumulator(this.tp1);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 1L, OptionalInt.empty());
        sendIdempotentProducerResponse(0, 0, this.tp1, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 1L, OptionalInt.of(0));
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 0, 2L, OptionalInt.of(0));
        sendIdempotentProducerResponse(0, 1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L, -1L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 1, 1L, OptionalInt.empty());
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 1L, OptionalInt.of(0));
        Assertions.assertTrue(createTransactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        sendIdempotentProducerResponse(1, 0, this.tp0, Errors.NONE, 1L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 1, 1L, OptionalInt.of(0));
        appendToAccumulator(this.tp1);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 1, 1L, OptionalInt.empty());
        Assertions.assertFalse(createTransactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        sendIdempotentProducerResponse(1, 0, this.tp1, Errors.NONE, 1L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 1, 1L, OptionalInt.of(0));
    }

    @Test
    public void testEpochBumpOnOutOfOrderSequenceForNextBatchWhenBatchInFlightFails() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager, false, null, true, 1, 0);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertEquals(343434L, createTransactionManager.producerIdAndEpoch().producerId);
        Assertions.assertEquals(0, createTransactionManager.producerIdAndEpoch().epoch);
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 0, 1L, OptionalInt.empty());
        sendIdempotentProducerResponse(0, 0, this.tp0, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 0, 1L, OptionalInt.of(0));
        appendToAccumulator(this.tp1);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 1L, OptionalInt.empty());
        sendIdempotentProducerResponse(0, 0, this.tp1, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 1L, OptionalInt.of(0));
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 0, 2L, OptionalInt.of(0));
        appendToAccumulator(this.tp1);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 2L, OptionalInt.of(0));
        sendIdempotentProducerResponse(0, 1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, -1L, -1L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 1, 1L, OptionalInt.empty());
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 2L, OptionalInt.of(0));
        Assertions.assertTrue(createTransactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        sendIdempotentProducerResponse(0, 1, this.tp1, Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 2L, OptionalInt.of(0));
        Assertions.assertTrue(createTransactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        sendIdempotentProducerResponse(1, 0, this.tp0, Errors.NONE, 1L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 1, 1L, OptionalInt.of(0));
        sendIdempotentProducerResponse(0, 1, this.tp1, Errors.NOT_LEADER_OR_FOLLOWER, -1L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 0, 2L, OptionalInt.of(0));
        Assertions.assertTrue(createTransactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        appendToAccumulator(this.tp1);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 1, 1L, OptionalInt.empty());
        Assertions.assertFalse(createTransactionManager.hasStaleProducerIdAndEpoch(this.tp1));
        sendIdempotentProducerResponse(1, 0, this.tp1, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp1, 343434L, (short) 1, 1L, OptionalInt.of(0));
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 1, 2L, OptionalInt.of(0));
        sendIdempotentProducerResponse(1, 1, this.tp0, Errors.NONE, 0L, -1L);
        this.sender.runOnce();
        assertPartitionState(createTransactionManager, this.tp0, 343434L, (short) 1, 2L, OptionalInt.of(1));
    }

    private void assertPartitionState(TransactionManager transactionManager, TopicPartition topicPartition, long j, short s, long j2, OptionalInt optionalInt) {
        Assertions.assertEquals(j, transactionManager.producerIdAndEpoch(topicPartition).producerId, "Producer Id:");
        Assertions.assertEquals(s, transactionManager.producerIdAndEpoch(topicPartition).epoch, "Producer Epoch:");
        Assertions.assertEquals(j2, transactionManager.sequenceNumber(topicPartition).longValue(), "Seq Number:");
        Assertions.assertEquals(optionalInt, transactionManager.lastAckedSequence(topicPartition), "Last Acked Seq Number:");
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponses() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        ClientRequest peek = this.client.requests().peek();
        this.client.respondToRequest((ClientRequest) this.client.requests().toArray()[1], produceResponse(this.tp0, -1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, ACKS_ALL));
        this.sender.runOnce();
        Deque deque = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals(1, deque.size());
        Assertions.assertEquals(1, ((ProducerBatch) deque.peekFirst()).baseSequence());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(peek, produceResponse(this.tp0, -1L, Errors.NOT_LEADER_OR_FOLLOWER, ACKS_ALL));
        this.sender.runOnce();
        Assertions.assertEquals(2, deque.size());
        Assertions.assertEquals(0, ((ProducerBatch) deque.peekFirst()).baseSequence());
        Assertions.assertEquals(1, ((ProducerBatch) deque.peekLast()).baseSequence());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        this.sender.runOnce();
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1L, ((RecordMetadata) appendToAccumulator2.get()).offset());
    }

    @Test
    public void testCorrectHandlingOfOutOfOrderResponsesWhenSecondSucceeds() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        ClientRequest peek = this.client.requests().peek();
        this.client.respondToRequest((ClientRequest) this.client.requests().toArray()[1], produceResponse(this.tp0, 1L, Errors.NONE, 1));
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertFalse(appendToAccumulator.isDone());
        Deque deque = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals(0, deque.size());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(peek, produceResponse(this.tp0, -1L, Errors.REQUEST_TIMED_OUT, ACKS_ALL));
        this.sender.runOnce();
        Assertions.assertEquals(1, deque.size());
        Assertions.assertEquals(0, ((ProducerBatch) deque.peekFirst()).baseSequence());
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 0L);
        this.sender.runOnce();
        Assertions.assertEquals(0, deque.size());
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0L, ((RecordMetadata) appendToAccumulator.get()).offset());
    }

    @Test
    public void testExpiryOfUnsentBatchesShouldNotCauseUnresolvedSequences() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.time.sleep(10000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, TimeoutException.class);
        Assertions.assertFalse(createTransactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testExpiryOfFirstBatchShouldNotCauseUnresolvedSequencesIfFutureBatchesSucceed() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager, false, null);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.time.sleep(1000L);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals(2, this.sender.inFlightBatches(this.tp0).size());
        sendIdempotentProducerResponse(0, this.tp0, Errors.REQUEST_TIMED_OUT, -1L);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.time.sleep(600L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, TimeoutException.class);
        Assertions.assertTrue(createTransactionManager.hasUnresolvedSequence(this.tp0));
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        this.time.sleep(20L);
        Assertions.assertFalse(appendToAccumulator2.isDone());
        this.sender.runOnce();
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1L);
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        Deque deque = this.accumulator.getDeque(this.tp0);
        Assertions.assertEquals(1, deque.size());
        Assertions.assertFalse(((ProducerBatch) deque.peekFirst()).hasSequence());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertTrue(createTransactionManager.hasUnresolvedSequence(this.tp0));
        this.sender.runOnce();
        Assertions.assertFalse(createTransactionManager.hasUnresolvedSequence(this.tp0));
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0, deque.size());
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertFalse(appendToAccumulator3.isDone());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiryOfFirstBatchShouldCauseEpochBumpIfFutureBatchesFail() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.time.sleep(1000L);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L);
        this.sender.runOnce();
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.time.sleep(1000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, TimeoutException.class);
        Assertions.assertTrue(createTransactionManager.hasUnresolvedSequence(this.tp0));
        appendToAccumulator(this.tp0);
        this.time.sleep(20L);
        Assertions.assertFalse(appendToAccumulator2.isDone());
        this.sender.runOnce();
        sendIdempotentProducerResponse(1, this.tp0, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 1L);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.accumulator.getDeque(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals((short) 1, createTransactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(createTransactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testUnresolvedSequencesAreNotFatal() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        this.apiVersions.update(KafkaChannelTest.CHANNEL_ID, NodeApiVersions.create(ApiKeys.INIT_PRODUCER_ID.id, (short) 0, (short) 3));
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testUnresolvedSeq", 60000, 100L, this.apiVersions);
        setupWithTransactionState(transactionManager);
        doInitTransactions(transactionManager, producerIdAndEpoch);
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        this.time.sleep(1000L);
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L);
        this.sender.runOnce();
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.time.sleep(1000L);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, TimeoutException.class);
        Assertions.assertTrue(transactionManager.hasUnresolvedSequence(this.tp0));
        this.sender.runOnce();
        Assertions.assertTrue(transactionManager.hasAbortableError());
    }

    @Test
    public void testExpiryOfAllSentBatchesShouldCauseUnresolvedSequences() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        sendIdempotentProducerResponse(0, this.tp0, Errors.NOT_LEADER_OR_FOLLOWER, -1L);
        this.sender.runOnce();
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        this.time.sleep(TestUtils.DEFAULT_MAX_WAIT_MS);
        this.client.disconnect(node.idString());
        this.client.backoff(node, 10L);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, TimeoutException.class);
        Assertions.assertTrue(createTransactionManager.hasUnresolvedSequence(this.tp0));
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(0, this.accumulator.getDeque(this.tp0).size());
        Assertions.assertEquals(343434L, createTransactionManager.producerIdAndEpoch().producerId);
        this.sender.runOnce();
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
        Assertions.assertFalse(createTransactionManager.hasUnresolvedSequence(this.tp0));
    }

    @Test
    public void testResetOfProducerStateShouldAllowQueuedBatchesToDrain() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        linkedHashMap.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond(produceResponse(linkedHashMap));
        sender.runOnce();
        prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        sender.runOnce();
        Assertions.assertEquals(343435L, createTransactionManager.producerIdAndEpoch().producerId);
        Assertions.assertFalse(appendToAccumulator.isDone());
        this.client.respond(produceResponse(this.tp1, 10L, Errors.NONE, ACKS_ALL));
        sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(10L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp1).longValue());
    }

    @Test
    public void testCloseWithProducerIdReset() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        appendToAccumulator(this.tp0);
        appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        linkedHashMap.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond(produceResponse(linkedHashMap));
        sender.initiateClose();
        sender.runOnce();
        TestUtils.waitForCondition(() -> {
            prepareInitProducerResponse(Errors.NONE, 343435L, (short) 1);
            sender.runOnce();
            return !this.accumulator.hasUndrained();
        }, 5000L, "Failed to drain batches");
    }

    @Test
    public void testForceCloseWithProducerIdReset() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(1L, Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp1);
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        linkedHashMap.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond(produceResponse(linkedHashMap));
        sender.runOnce();
        sender.forceClose();
        sender.runOnce();
        sender.run();
        Assertions.assertFalse(this.accumulator.hasUndrained(), "Pending batches are not aborted.");
        Assertions.assertTrue(appendToAccumulator.isDone());
    }

    @Test
    public void testBatchesDrainedWithOldProducerIdShouldSucceedOnSubsequentRetry() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp1);
        sender.runOnce();
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        linkedHashMap.put(this.tp1, new OffsetAndError(-1L, Errors.NOT_LEADER_OR_FOLLOWER));
        linkedHashMap.put(this.tp0, new OffsetAndError(-1L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER));
        this.client.respond(produceResponse(linkedHashMap));
        sender.runOnce();
        Assertions.assertFalse(appendToAccumulator.isDone());
        sender.runOnce();
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
        Assertions.assertFalse(appendToAccumulator2.isDone());
        this.client.respond(produceResponse(this.tp1, 0L, Errors.NOT_LEADER_OR_FOLLOWER, ACKS_ALL));
        sender.runOnce();
        Assertions.assertFalse(appendToAccumulator2.isDone());
        sender.runOnce();
        this.client.respond(produceResponse(this.tp1, 0L, Errors.NONE, ACKS_ALL));
        sender.runOnce();
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1, createTransactionManager.sequenceNumber(this.tp1).intValue());
    }

    @Test
    public void testCorrectHandlingOfDuplicateSequenceError() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Node node = new Node(Integer.valueOf(this.client.requests().peek().destination()).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()));
        ClientRequest peek = this.client.requests().peek();
        this.client.respondToRequest((ClientRequest) this.client.requests().toArray()[1], produceResponse(this.tp0, 1000L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals(OptionalLong.of(1000L), createTransactionManager.lastAckedOffset(this.tp0));
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        this.client.respondToRequest(peek, produceResponse(this.tp0, -1L, Errors.DUPLICATE_SEQUENCE_NUMBER, 0));
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(OptionalLong.of(1000L), createTransactionManager.lastAckedOffset(this.tp0));
        Assertions.assertFalse(this.client.hasInFlightRequests());
        RecordMetadata recordMetadata = (RecordMetadata) appendToAccumulator.get();
        Assertions.assertFalse(recordMetadata.hasOffset());
        Assertions.assertEquals(-1L, recordMetadata.offset());
    }

    @Test
    public void testTransactionalUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testUnresolvedSeq", 60000, 100L, this.apiVersions);
        setupWithTransactionState(transactionManager);
        doInitTransactions(transactionManager, new ProducerIdAndEpoch(343434L, (short) 0));
        Assertions.assertTrue(transactionManager.hasProducerId());
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        Assertions.assertEquals(0L, transactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(1000L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(OptionalLong.of(1000L), transactionManager.lastAckedOffset(this.tp0));
        appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(3L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.empty(), transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        this.sender.runOnce();
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, transactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1012L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertEquals(OptionalLong.of(1012L), transactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testIdempotentUnknownProducerHandlingWhenRetentionLimitReached() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(1000L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(OptionalLong.of(1000L), createTransactionManager.lastAckedOffset(this.tp0));
        appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(3L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertTrue(this.client.hasInFlightRequests());
        Assertions.assertEquals((short) 1, createTransactionManager.producerIdAndEpoch().epoch);
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1012L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertEquals(OptionalLong.of(1012L), createTransactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedWhenLogStartOffsetIsUnknown() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(1000L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(OptionalLong.of(1000L), createTransactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, -1L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        this.sender.runOnce();
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.of(1), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(1011L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertEquals(OptionalLong.of(1011L), createTransactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testUnknownProducerErrorShouldBeRetriedForFutureBatchesWhenFirstFails() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(1000L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(OptionalLong.of(1000L), createTransactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        FutureRecordMetadata appendToAccumulator3 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(3L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertFalse(appendToAccumulator3.isDone());
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertFalse(appendToAccumulator2.isDone());
        Assertions.assertFalse(appendToAccumulator3.isDone());
        Assertions.assertEquals(2, this.client.inFlightRequestCount());
        Assertions.assertEquals((short) 1, createTransactionManager.producerIdAndEpoch().epoch);
        sendIdempotentProducerResponse(2, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 1010L);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1011L, 1010L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertFalse(appendToAccumulator3.isDone());
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(1011L, ((RecordMetadata) appendToAccumulator2.get()).offset());
        Assertions.assertEquals(OptionalLong.of(1011L), createTransactionManager.lastAckedOffset(this.tp0));
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        sendIdempotentProducerResponse(1, this.tp0, Errors.NONE, 1012L, 1010L);
        this.sender.runOnce();
        Assertions.assertFalse(this.client.hasInFlightRequests());
        Assertions.assertTrue(appendToAccumulator3.isDone());
        Assertions.assertEquals(1012L, ((RecordMetadata) appendToAccumulator3.get()).offset());
        Assertions.assertEquals(OptionalLong.of(1012L), createTransactionManager.lastAckedOffset(this.tp0));
    }

    @Test
    public void testShouldRaiseOutOfOrderSequenceExceptionToUserIfLogWasNotTruncated() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Assertions.assertEquals(0L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        sendIdempotentProducerResponse(0, this.tp0, Errors.NONE, 1000L, 10L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(1000L, ((RecordMetadata) appendToAccumulator.get()).offset());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(OptionalLong.of(1000L), createTransactionManager.lastAckedOffset(this.tp0));
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(2L, createTransactionManager.sequenceNumber(this.tp0).longValue());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator2.isDone());
        sendIdempotentProducerResponse(1, this.tp0, Errors.UNKNOWN_PRODUCER_ID, -1L, 10L);
        this.sender.runOnce();
        this.sender.runOnce();
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
        Assertions.assertEquals(OptionalInt.empty(), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertFalse(appendToAccumulator2.isDone());
    }

    void sendIdempotentProducerResponse(int i, TopicPartition topicPartition, Errors errors, long j) {
        sendIdempotentProducerResponse(i, topicPartition, errors, j, -1L);
    }

    void sendIdempotentProducerResponse(int i, TopicPartition topicPartition, Errors errors, long j, long j2) {
        sendIdempotentProducerResponse(ACKS_ALL, i, topicPartition, errors, j, j2);
    }

    void sendIdempotentProducerResponse(int i, int i2, TopicPartition topicPartition, Errors errors, long j, long j2) {
        this.client.respond(abstractRequest -> {
            ProduceRequest produceRequest = (ProduceRequest) abstractRequest;
            Assertions.assertTrue(RequestTestUtils.hasIdempotentRecords(produceRequest));
            Iterator it = partitionRecords(produceRequest).get(topicPartition).batches().iterator();
            RecordBatch recordBatch = (RecordBatch) it.next();
            Assertions.assertFalse(it.hasNext());
            if (i > ACKS_ALL) {
                Assertions.assertEquals((short) i, recordBatch.producerEpoch());
            }
            Assertions.assertEquals(i2, recordBatch.baseSequence());
            return true;
        }, (AbstractResponse) produceResponse(topicPartition, j, errors, 0, j2, null));
    }

    @Test
    public void testClusterAuthorizationExceptionInProduceRequest() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.client.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof ProduceRequest) && RequestTestUtils.hasIdempotentRecords((ProduceRequest) abstractRequest);
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, ClusterAuthorizationException.class);
        Assertions.assertTrue(createTransactionManager.hasFatalError());
        assertSendFailure(ClusterAuthorizationException.class);
    }

    @Test
    public void testCancelInFlightRequestAfterFatalError() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp1);
        this.sender.runOnce();
        this.client.respond(abstractRequest -> {
            return (abstractRequest instanceof ProduceRequest) && RequestTestUtils.hasIdempotentRecords((ProduceRequest) abstractRequest);
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.CLUSTER_AUTHORIZATION_FAILED, 0));
        this.sender.runOnce();
        Assertions.assertTrue(createTransactionManager.hasFatalError());
        assertFutureFailure(appendToAccumulator, ClusterAuthorizationException.class);
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator2, ClusterAuthorizationException.class);
        this.client.respond(abstractRequest2 -> {
            return (abstractRequest2 instanceof ProduceRequest) && RequestTestUtils.hasIdempotentRecords((ProduceRequest) abstractRequest2);
        }, (AbstractResponse) produceResponse(this.tp1, 0L, Errors.NONE, 0));
        this.sender.runOnce();
    }

    @Test
    public void testUnsupportedForMessageFormatInProduceRequest() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.client.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof ProduceRequest) && RequestTestUtils.hasIdempotentRecords((ProduceRequest) abstractRequest);
        }, (AbstractResponse) produceResponse(this.tp0, -1L, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, UnsupportedForMessageFormatException.class);
        Assertions.assertFalse(createTransactionManager.hasError());
    }

    @Test
    public void testUnsupportedVersionInProduceRequest() throws Exception {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.client.prepareUnsupportedVersionResponse(abstractRequest -> {
            return (abstractRequest instanceof ProduceRequest) && RequestTestUtils.hasIdempotentRecords((ProduceRequest) abstractRequest);
        });
        this.sender.runOnce();
        assertFutureFailure(appendToAccumulator, UnsupportedVersionException.class);
        Assertions.assertTrue(createTransactionManager.hasFatalError());
        assertSendFailure(UnsupportedVersionException.class);
    }

    @Test
    public void testSequenceNumberIncrement() throws InterruptedException {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.client.prepareResponse(abstractRequest -> {
            if (!(abstractRequest instanceof ProduceRequest)) {
                return false;
            }
            Iterator it = partitionRecords((ProduceRequest) abstractRequest).get(this.tp0).batches().iterator();
            Assertions.assertTrue(it.hasNext());
            RecordBatch recordBatch = (RecordBatch) it.next();
            Assertions.assertFalse(it.hasNext());
            Assertions.assertEquals(0, recordBatch.baseSequence());
            Assertions.assertEquals(343434L, recordBatch.producerId());
            Assertions.assertEquals(0, recordBatch.producerEpoch());
            return true;
        }, (AbstractResponse) produceResponse(this.tp0, 0L, Errors.NONE, 0));
        sender.runOnce();
        sender.runOnce();
        sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(OptionalInt.of(0), createTransactionManager.lastAckedSequence(this.tp0));
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testRetryWhenProducerIdChanges() throws InterruptedException {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Short.MAX_VALUE, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        sender.runOnce();
        sender.runOnce();
        String destination = this.client.requests().peek().destination();
        Node node = new Node(Integer.valueOf(destination).intValue(), "localhost", 0);
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be true");
        this.client.disconnect(destination);
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertFalse(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be false");
        sender.runOnce();
        sender.runOnce();
        prepareAndReceiveInitProducerId(343435L, Errors.NONE);
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount(), "Expected requests to be retried after pid change");
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertEquals(1L, createTransactionManager.sequenceNumber(this.tp0).intValue());
    }

    @Test
    public void testBumpEpochWhenOutOfOrderSequenceReceived() throws InterruptedException {
        TransactionManager createTransactionManager = createTransactionManager();
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 10, new SenderMetricsRegistry(new Metrics()), this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, createTransactionManager, this.apiVersions);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        sender.runOnce();
        sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, sender.inFlightBatches(this.tp0).size());
        this.client.respond(produceResponse(this.tp0, 0L, Errors.OUT_OF_ORDER_SEQUENCE_NUMBER, 0));
        sender.runOnce();
        sender.runOnce();
        Assertions.assertFalse(appendToAccumulator.isDone());
        Assertions.assertEquals(1, sender.inFlightBatches(this.tp0).size());
        Assertions.assertEquals(1, createTransactionManager.producerIdAndEpoch().epoch);
    }

    @Test
    public void testIdempotentSplitBatchAndSend() throws Exception {
        TopicPartition topicPartition = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager createTransactionManager = createTransactionManager();
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        setupWithTransactionState(createTransactionManager);
        prepareAndReceiveInitProducerId(123456L, Errors.NONE);
        Assertions.assertTrue(createTransactionManager.hasProducerId());
        testSplitBatchAndSend(createTransactionManager, producerIdAndEpoch, topicPartition);
    }

    @Test
    public void testTransactionalSplitBatchAndSend() throws Exception {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        TopicPartition topicPartition = new TopicPartition("testSplitBatchAndSend", 1);
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testSplitBatchAndSend", 60000, 100L, this.apiVersions);
        setupWithTransactionState(transactionManager);
        doInitTransactions(transactionManager, producerIdAndEpoch);
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartition(topicPartition);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, Errors.NONE)));
        this.sender.runOnce();
        testSplitBatchAndSend(transactionManager, producerIdAndEpoch, topicPartition);
    }

    private void testSplitBatchAndSend(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch, TopicPartition topicPartition) throws Exception {
        String str = topicPartition.topic();
        CompressionRatioEstimator.setEstimation(str, CompressionType.GZIP, 0.2f);
        Metrics metrics = new Metrics();
        Throwable th = null;
        try {
            try {
                this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.GZIP, 0, 0L, 3000, metrics, "producer-metrics", this.time, new ApiVersions(), transactionManager, new BufferPool(1048576L, this.batchSize, this.metrics, this.time, "producer-internal-metrics"));
                SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
                Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, true, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, 1000L, transactionManager, new ApiVersions());
                this.client.prepareMetadataUpdate(RequestTestUtils.metadataUpdateWith(2, Collections.singletonMap(str, 2)));
                long milliseconds = this.time.milliseconds();
                Cluster singletonCluster = TestUtils.singletonCluster();
                FutureRecordMetadata futureRecordMetadata = this.accumulator.append(topicPartition.topic(), topicPartition.partition(), 0L, "key1".getBytes(), new byte[this.batchSize / 2], (Header[]) null, (RecordAccumulator.AppendCallbacks) null, 1000L, false, milliseconds, singletonCluster).future;
                FutureRecordMetadata futureRecordMetadata2 = this.accumulator.append(topicPartition.topic(), topicPartition.partition(), 0L, "key2".getBytes(), new byte[this.batchSize / 2], (Header[]) null, (RecordAccumulator.AppendCallbacks) null, 1000L, false, milliseconds, singletonCluster).future;
                sender.runOnce();
                sender.runOnce();
                Assertions.assertEquals(2L, transactionManager.sequenceNumber(topicPartition).longValue(), "The next sequence should be 2");
                String destination = this.client.requests().peek().destination();
                Assertions.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
                Node node = new Node(Integer.valueOf(destination).intValue(), "localhost", 0);
                Assertions.assertEquals(1, this.client.inFlightRequestCount());
                Assertions.assertTrue(this.client.isReady(node, this.time.milliseconds()), "Client ready status should be true");
                HashMap hashMap = new HashMap();
                hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.MESSAGE_TOO_LARGE));
                this.client.respond(new ProduceResponse(hashMap));
                sender.runOnce();
                Assertions.assertEquals(2L, transactionManager.sequenceNumber(topicPartition).longValue(), "The next sequence should be 2");
                Assertions.assertEquals(CompressionType.GZIP.rate - 0.005f, CompressionRatioEstimator.estimation(str, CompressionType.GZIP), 0.01d);
                sender.runOnce();
                Assertions.assertEquals(2L, transactionManager.sequenceNumber(topicPartition).longValue(), "The next sequence number should be 2");
                Assertions.assertFalse(futureRecordMetadata.isDone(), "The future shouldn't have been done.");
                Assertions.assertFalse(futureRecordMetadata2.isDone(), "The future shouldn't have been done.");
                String destination2 = this.client.requests().peek().destination();
                Assertions.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
                Node node2 = new Node(Integer.valueOf(destination2).intValue(), "localhost", 0);
                Assertions.assertEquals(1, this.client.inFlightRequestCount());
                Assertions.assertTrue(this.client.isReady(node2, this.time.milliseconds()), "Client ready status should be true");
                hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
                this.client.respond(produceRequestMatcher(topicPartition, producerIdAndEpoch, 0, transactionManager.isTransactional()), (AbstractResponse) new ProduceResponse(hashMap));
                sender.runOnce();
                Assertions.assertTrue(futureRecordMetadata.isDone(), "The future should have been done.");
                Assertions.assertEquals(2L, transactionManager.sequenceNumber(topicPartition).longValue(), "The next sequence number should still be 2");
                Assertions.assertEquals(OptionalInt.of(0), transactionManager.lastAckedSequence(topicPartition), "The last ack'd sequence number should be 0");
                Assertions.assertFalse(futureRecordMetadata2.isDone(), "The future shouldn't have been done.");
                Assertions.assertEquals(0L, ((RecordMetadata) futureRecordMetadata.get()).offset(), "Offset of the first message should be 0");
                sender.runOnce();
                String destination3 = this.client.requests().peek().destination();
                Assertions.assertEquals(ApiKeys.PRODUCE, this.client.requests().peek().requestBuilder().apiKey());
                Node node3 = new Node(Integer.valueOf(destination3).intValue(), "localhost", 0);
                Assertions.assertEquals(1, this.client.inFlightRequestCount());
                Assertions.assertTrue(this.client.isReady(node3, this.time.milliseconds()), "Client ready status should be true");
                hashMap.put(topicPartition, new ProduceResponse.PartitionResponse(Errors.NONE, 1L, 0L, 0L));
                this.client.respond(produceRequestMatcher(topicPartition, producerIdAndEpoch, 1, transactionManager.isTransactional()), (AbstractResponse) new ProduceResponse(hashMap));
                sender.runOnce();
                Assertions.assertTrue(futureRecordMetadata2.isDone(), "The future should have been done.");
                Assertions.assertEquals(2L, transactionManager.sequenceNumber(topicPartition).longValue(), "The next sequence number should be 2");
                Assertions.assertEquals(OptionalInt.of(1), transactionManager.lastAckedSequence(topicPartition), "The last ack'd sequence number should be 1");
                Assertions.assertEquals(1L, ((RecordMetadata) futureRecordMetadata2.get()).offset(), "Offset of the first message should be 1");
                Assertions.assertTrue(this.accumulator.getDeque(topicPartition).isEmpty(), "There should be no batch in the accumulator");
                Assertions.assertTrue(((Double) ((KafkaMetric) metrics.metrics().get(senderMetricsRegistry.batchSplitRate)).metricValue()).doubleValue() > 0.0d, "There should be a split");
                if (metrics != null) {
                    if (0 == 0) {
                        metrics.close();
                        return;
                    }
                    try {
                        metrics.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (metrics != null) {
                if (th != null) {
                    try {
                        metrics.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    metrics.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testNoDoubleDeallocation() throws Exception {
        MatchingBufferPool matchingBufferPool = new MatchingBufferPool(1048576L, this.batchSize, this.metrics, this.time, "producer-custom-metrics");
        setupWithTransactionState(null, false, matchingBufferPool);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(5000L);
        Assertions.assertFalse(matchingBufferPool.allMatch());
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertTrue(matchingBufferPool.allMatch(), "The batch should have been de-allocated");
        Assertions.assertTrue(matchingBufferPool.allMatch());
        this.sender.runOnce();
        Assertions.assertTrue(matchingBufferPool.allMatch(), "The batch should have been de-allocated");
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testInflightBatchesExpireOnDeliveryTimeout() throws InterruptedException {
        setupWithTransactionState(null, true, null);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size(), "Expect one in-flight batch in accumulator");
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond(new ProduceResponse(hashMap));
        this.time.sleep(1500L);
        this.sender.runOnce();
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size(), "Expect zero in-flight batch in accumulator");
        try {
            appendToAccumulator.get();
            Assertions.fail("The expired batch should throw a TimeoutException");
        } catch (ExecutionException e) {
            Assertions.assertTrue(e.getCause() instanceof TimeoutException);
        }
    }

    @Test
    public void testRecordErrorPropagatedToApplication() throws InterruptedException {
        setup();
        HashMap hashMap = new HashMap(5);
        for (int i = 0; i < 5; i++) {
            hashMap.put(Integer.valueOf(i), appendToAccumulator(this.tp0));
        }
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        this.client.respond(produceResponse(Collections.singletonMap(this.tp0, new OffsetAndError(-1L, Errors.INVALID_RECORD, Arrays.asList(new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(0).setBatchIndexErrorMessage(KafkaChannelTest.CHANNEL_ID), new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(2).setBatchIndexErrorMessage("2"), new ProduceResponseData.BatchIndexAndErrorMessage().setBatchIndex(3))))));
        this.sender.runOnce();
        for (Map.Entry entry : hashMap.entrySet()) {
            FutureRecordMetadata futureRecordMetadata = (FutureRecordMetadata) entry.getValue();
            Assertions.assertTrue(futureRecordMetadata.isDone());
            KafkaException assertFutureThrows = TestUtils.assertFutureThrows(futureRecordMetadata, KafkaException.class);
            Integer num = (Integer) entry.getKey();
            if (num.intValue() == 0 || num.intValue() == 2) {
                Assertions.assertTrue(assertFutureThrows instanceof InvalidRecordException);
                Assertions.assertEquals(num.toString(), assertFutureThrows.getMessage());
            } else if (num.intValue() == 3) {
                Assertions.assertTrue(assertFutureThrows instanceof InvalidRecordException);
                Assertions.assertEquals(Errors.INVALID_RECORD.message(), assertFutureThrows.getMessage());
            } else {
                Assertions.assertEquals(KafkaException.class, assertFutureThrows.getClass());
            }
        }
    }

    @Test
    public void testWhenFirstBatchExpireNoSendSecondBatchIfGuaranteeOrder() throws InterruptedException {
        setupWithTransactionState(null, true, null);
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(1500 / 2);
        appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(1500 / 2);
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0, 0L, null));
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiredBatchDoesNotRetry() throws Exception {
        setupWithTransactionState(null, false, null);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        this.time.sleep(1500L);
        this.client.respond(produceResponse(this.tp0, -1L, Errors.NOT_LEADER_OR_FOLLOWER, ACKS_ALL));
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testExpiredBatchDoesNotSplitOnMessageTooLargeError() throws Exception {
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        this.client.respond(produceResponse(this.tp0, -1L, Errors.MESSAGE_TOO_LARGE, ACKS_ALL));
        this.time.sleep(1500L);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        Assertions.assertTrue(appendToAccumulator2.isDone());
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.sender.runOnce();
        Assertions.assertEquals(0, this.client.inFlightRequestCount());
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
    }

    @Test
    public void testResetNextBatchExpiry() throws Exception {
        this.client = (MockClient) Mockito.spy(new MockClient((Time) this.time, (Metadata) this.metadata));
        setupWithTransactionState(null);
        appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        this.time.setCurrentTimeMs(this.time.milliseconds() + this.accumulator.getDeliveryTimeoutMs() + 1);
        this.sender.runOnce();
        InOrder inOrder = Mockito.inOrder(new Object[]{this.client});
        ((MockClient) inOrder.verify(this.client, Mockito.atLeastOnce())).ready((Node) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((MockClient) inOrder.verify(this.client, Mockito.atLeastOnce())).newClientRequest(ArgumentMatchers.anyString(), (AbstractRequest.Builder) ArgumentMatchers.any(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyBoolean(), ArgumentMatchers.anyInt(), (RequestCompletionHandler) ArgumentMatchers.any());
        ((MockClient) inOrder.verify(this.client, Mockito.atLeastOnce())).send((ClientRequest) ArgumentMatchers.any(), ArgumentMatchers.anyLong());
        ((MockClient) inOrder.verify(this.client)).poll(ArgumentMatchers.eq(0L), ArgumentMatchers.anyLong());
        ((MockClient) inOrder.verify(this.client)).poll(ArgumentMatchers.eq(this.accumulator.getDeliveryTimeoutMs()), ArgumentMatchers.anyLong());
        ((MockClient) inOrder.verify(this.client)).poll(AdditionalMatchers.geq(1L), ArgumentMatchers.anyLong());
    }

    @Test
    public void testExpiredBatchesInMultiplePartitions() throws Exception {
        setupWithTransactionState(null, true, null);
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, this.time.milliseconds(), "k1", "v1");
        FutureRecordMetadata appendToAccumulator2 = appendToAccumulator(this.tp1, this.time.milliseconds(), "k2", "v2");
        this.sender.runOnce();
        Assertions.assertEquals(1, this.client.inFlightRequestCount());
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size(), "Expect one in-flight batch in accumulator");
        HashMap hashMap = new HashMap();
        hashMap.put(this.tp0, new ProduceResponse.PartitionResponse(Errors.NONE, 0L, 0L, 0L));
        this.client.respond(new ProduceResponse(hashMap));
        this.time.sleep(1500L);
        this.sender.runOnce();
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size(), "Expect zero in-flight batch in accumulator");
        appendToAccumulator.getClass();
        Assertions.assertTrue(((ExecutionException) Assertions.assertThrows(ExecutionException.class, appendToAccumulator::get)).getCause() instanceof TimeoutException);
        appendToAccumulator2.getClass();
        Assertions.assertTrue(((ExecutionException) Assertions.assertThrows(ExecutionException.class, appendToAccumulator2::get)).getCause() instanceof TimeoutException);
    }

    @Test
    public void testTransactionalRequestsSentOnShutdown() {
        Metrics metrics = new Metrics();
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
        try {
            TransactionManager transactionManager = new TransactionManager(this.logContext, "testTransactionalRequestsSentOnShutdown", 6000, 100L, this.apiVersions);
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
            TopicPartition topicPartition = new TopicPartition("testTransactionalRequestsSentOnShutdown", 1);
            setupWithTransactionState(transactionManager);
            doInitTransactions(transactionManager, producerIdAndEpoch);
            transactionManager.beginTransaction();
            transactionManager.maybeAddPartition(topicPartition);
            this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, Errors.NONE)));
            sender.runOnce();
            sender.initiateClose();
            transactionManager.beginCommit();
            AssertEndTxnRequestMatcher assertEndTxnRequestMatcher = new AssertEndTxnRequestMatcher(TransactionResult.COMMIT);
            this.client.prepareResponse((MockClient.RequestMatcher) assertEndTxnRequestMatcher, (AbstractResponse) new EndTxnResponse(new EndTxnResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0)));
            sender.run();
            Assertions.assertTrue(assertEndTxnRequestMatcher.matched, "Response didn't match in test");
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
        Metrics metrics = new Metrics();
        Throwable th = null;
        try {
            SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
            TransactionManager transactionManager = new TransactionManager(this.logContext, "txnId", 6000, 100L, this.apiVersions);
            setupWithTransactionState(transactionManager, 50);
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, this.apiVersions);
            doInitTransactions(transactionManager, new ProducerIdAndEpoch(123456L, (short) 0));
            transactionManager.beginTransaction();
            addPartitionToTxn(sender, transactionManager, this.tp0);
            appendToAccumulator(this.tp0);
            appendToAccumulator(this.tp0);
            sender.runOnce();
            Assertions.assertFalse(this.client.hasInFlightRequests());
            TransactionalRequestResult beginCommit = transactionManager.beginCommit();
            MockClient mockClient = this.client;
            mockClient.getClass();
            ProducerTestUtils.runUntil(sender, mockClient::hasInFlightRequests);
            respondToProduce(this.tp0, Errors.NONE, 1L);
            transactionManager.getClass();
            ProducerTestUtils.runUntil(sender, transactionManager::hasInFlightRequest);
            respondToEndTxn(Errors.NONE);
            transactionManager.getClass();
            ProducerTestUtils.runUntil(sender, transactionManager::isReady);
            Assertions.assertTrue(beginCommit.isSuccessful());
            beginCommit.await();
            transactionManager.beginTransaction();
            addPartitionToTxn(sender, transactionManager, this.tp0);
            appendToAccumulator(this.tp0);
            appendToAccumulator(this.tp0);
            this.time.sleep(50 - 1);
            sender.runOnce();
            Assertions.assertFalse(this.client.hasInFlightRequests());
            Assertions.assertTrue(this.accumulator.hasUndrained());
            this.time.sleep(1L);
            MockClient mockClient2 = this.client;
            mockClient2.getClass();
            ProducerTestUtils.runUntil(sender, mockClient2::hasInFlightRequests);
            Assertions.assertFalse(this.accumulator.hasUndrained());
            if (metrics != null) {
                if (0 == 0) {
                    metrics.close();
                    return;
                }
                try {
                    metrics.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (metrics != null) {
                if (0 != 0) {
                    try {
                        metrics.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    metrics.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
        Metrics metrics = new Metrics();
        Throwable th = null;
        try {
            SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
            TransactionManager transactionManager = new TransactionManager(this.logContext, "txnId", 6000, 100L, this.apiVersions);
            setupWithTransactionState(transactionManager);
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, this.apiVersions);
            doInitTransactions(transactionManager, new ProducerIdAndEpoch(123456L, (short) 0));
            transactionManager.beginTransaction();
            addPartitionToTxn(sender, transactionManager, this.tp0);
            appendToAccumulator(this.tp0);
            ProducerTestUtils.runUntil(sender, () -> {
                return Boolean.valueOf(this.client.requests().size() == 1);
            });
            Assertions.assertFalse(this.accumulator.hasUndrained());
            Assertions.assertTrue(this.client.hasInFlightRequests());
            Assertions.assertTrue(transactionManager.hasInflightBatches(this.tp0));
            appendToAccumulator(this.tp0);
            transactionManager.beginCommit();
            ProducerTestUtils.runUntil(sender, () -> {
                return Boolean.valueOf(this.client.requests().size() == 2);
            });
            Assertions.assertTrue(transactionManager.isCompleting());
            Assertions.assertFalse(transactionManager.hasInFlightRequest());
            Assertions.assertTrue(transactionManager.hasInflightBatches(this.tp0));
            respondToProduce(this.tp0, Errors.NONE, 0L);
            respondToProduce(this.tp0, Errors.NONE, 1L);
            transactionManager.getClass();
            ProducerTestUtils.runUntil(sender, transactionManager::hasInFlightRequest);
            respondToEndTxn(Errors.NONE);
            transactionManager.getClass();
            ProducerTestUtils.runUntil(sender, transactionManager::isReady);
            if (metrics != null) {
                if (0 == 0) {
                    metrics.close();
                    return;
                }
                try {
                    metrics.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (metrics != null) {
                if (0 != 0) {
                    try {
                        metrics.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    metrics.close();
                }
            }
            throw th3;
        }
    }

    private void addPartitionToTxn(Sender sender, TransactionManager transactionManager, TopicPartition topicPartition) {
        transactionManager.maybeAddPartition(topicPartition);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, Errors.NONE)));
        ProducerTestUtils.runUntil(sender, () -> {
            return Boolean.valueOf(transactionManager.isPartitionAdded(topicPartition));
        });
        Assertions.assertFalse(transactionManager.hasInFlightRequest());
    }

    private void respondToProduce(TopicPartition topicPartition, Errors errors, long j) {
        this.client.respond(abstractRequest -> {
            return abstractRequest instanceof ProduceRequest;
        }, (AbstractResponse) produceResponse(topicPartition, j, errors, 0));
    }

    private void respondToEndTxn(Errors errors) {
        this.client.respond(abstractRequest -> {
            return abstractRequest instanceof EndTxnRequest;
        }, (AbstractResponse) new EndTxnResponse(new EndTxnResponseData().setErrorCode(errors.code()).setThrottleTimeMs(0)));
    }

    @Test
    public void testIncompleteTransactionAbortOnShutdown() {
        Metrics metrics = new Metrics();
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
        try {
            TransactionManager transactionManager = new TransactionManager(this.logContext, "testIncompleteTransactionAbortOnShutdown", 6000, 100L, this.apiVersions);
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
            TopicPartition topicPartition = new TopicPartition("testIncompleteTransactionAbortOnShutdown", 1);
            setupWithTransactionState(transactionManager);
            doInitTransactions(transactionManager, producerIdAndEpoch);
            transactionManager.beginTransaction();
            transactionManager.maybeAddPartition(topicPartition);
            this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, Errors.NONE)));
            sender.runOnce();
            sender.initiateClose();
            AssertEndTxnRequestMatcher assertEndTxnRequestMatcher = new AssertEndTxnRequestMatcher(TransactionResult.ABORT);
            this.client.prepareResponse((MockClient.RequestMatcher) assertEndTxnRequestMatcher, (AbstractResponse) new EndTxnResponse(new EndTxnResponseData().setErrorCode(Errors.NONE.code()).setThrottleTimeMs(0)));
            sender.run();
            Assertions.assertTrue(assertEndTxnRequestMatcher.matched, "Response didn't match in test");
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Timeout(10)
    @Test
    public void testForceShutdownWithIncompleteTransaction() {
        Metrics metrics = new Metrics();
        SenderMetricsRegistry senderMetricsRegistry = new SenderMetricsRegistry(metrics);
        try {
            TransactionManager transactionManager = new TransactionManager(this.logContext, "testForceShutdownWithIncompleteTransaction", 6000, 100L, this.apiVersions);
            Sender sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, false, MAX_REQUEST_SIZE, (short) -1, 1, senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, this.apiVersions);
            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
            TopicPartition topicPartition = new TopicPartition("testForceShutdownWithIncompleteTransaction", 1);
            setupWithTransactionState(transactionManager);
            doInitTransactions(transactionManager, producerIdAndEpoch);
            transactionManager.beginTransaction();
            transactionManager.maybeAddPartition(topicPartition);
            this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(topicPartition, Errors.NONE)));
            sender.runOnce();
            TransactionalRequestResult beginCommit = transactionManager.beginCommit();
            sender.forceClose();
            sender.run();
            beginCommit.getClass();
            Assertions.assertThrows(KafkaException.class, beginCommit::await, "The test expected to throw a KafkaException for forcefully closing the sender");
            metrics.close();
        } catch (Throwable th) {
            metrics.close();
            throw th;
        }
    }

    @Test
    public void testTransactionAbortedExceptionOnAbortWithoutError() throws InterruptedException, ExecutionException {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testTransactionAbortedExceptionOnAbortWithoutError", 60000, 100L, this.apiVersions);
        setupWithTransactionState(transactionManager, false, null);
        doInitTransactions(transactionManager, producerIdAndEpoch);
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, this.time.milliseconds(), "key", "value");
        transactionManager.beginAbort();
        this.sender.runOnce();
        TestUtils.assertFutureThrows(appendToAccumulator, TransactionAbortedException.class);
    }

    @Test
    public void testDoNotPollWhenNoRequestSent() {
        this.client = (MockClient) Mockito.spy(new MockClient((Time) this.time, (Metadata) this.metadata));
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testDoNotPollWhenNoRequestSent", 6000, 100L, this.apiVersions);
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        setupWithTransactionState(transactionManager);
        doInitTransactions(transactionManager, producerIdAndEpoch);
        ((MockClient) Mockito.verify(this.client, Mockito.times(2))).poll(ArgumentMatchers.eq(RETRY_BACKOFF_MS), ArgumentMatchers.anyLong());
    }

    @Test
    public void testTooLargeBatchesAreSafelyRemoved() throws InterruptedException {
        ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
        TransactionManager transactionManager = new TransactionManager(this.logContext, "testSplitBatchAndSend", 60000, 100L, this.apiVersions);
        setupWithTransactionState(transactionManager, false, null);
        doInitTransactions(transactionManager, producerIdAndEpoch);
        transactionManager.beginTransaction();
        transactionManager.maybeAddPartition(this.tp0);
        this.client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(this.tp0, Errors.NONE)));
        this.sender.runOnce();
        appendToAccumulator(this.tp0, this.time.milliseconds(), "key1", "value1");
        appendToAccumulator(this.tp0, this.time.milliseconds(), "key2", "value2");
        this.sender.runOnce();
        Assertions.assertEquals(1, this.sender.inFlightBatches(this.tp0).size());
        this.client.respond(produceResponse(this.tp0, -1L, Errors.MESSAGE_TOO_LARGE, ACKS_ALL));
        this.sender.runOnce();
        this.sender.runOnce();
        this.client.respond(produceResponse(this.tp0, 0L, Errors.NONE, 0));
        this.sender.runOnce();
        Assertions.assertEquals(0, this.sender.inFlightBatches(this.tp0).size());
        this.time.sleep(2000L);
        this.sender.runOnce();
    }

    @Test
    public void testDefaultErrorMessage() throws Exception {
        verifyErrorMessage(produceResponse(this.tp0, 0L, Errors.INVALID_REQUEST, 0), Errors.INVALID_REQUEST.message());
    }

    @Test
    public void testCustomErrorMessage() throws Exception {
        verifyErrorMessage(produceResponse(this.tp0, 0L, Errors.INVALID_REQUEST, 0, -1L, "testCustomErrorMessage"), "testCustomErrorMessage");
    }

    @Test
    public void testSenderShouldRetryWithBackoffOnRetriableError() {
        setupWithTransactionState(createTransactionManager());
        long milliseconds = this.time.milliseconds();
        prepareAndReceiveInitProducerId(343434L, (short) -1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        long milliseconds2 = this.time.milliseconds();
        Assertions.assertEquals(milliseconds, milliseconds2);
        prepareAndReceiveInitProducerId(343434L, (short) -1, Errors.COORDINATOR_LOAD_IN_PROGRESS);
        long milliseconds3 = this.time.milliseconds();
        Assertions.assertEquals(RETRY_BACKOFF_MS, milliseconds3 - milliseconds2);
        prepareAndReceiveInitProducerId(343434L, Errors.NONE);
        Assertions.assertEquals(RETRY_BACKOFF_MS, this.time.milliseconds() - milliseconds3);
    }

    private void verifyErrorMessage(ProduceResponse produceResponse, String str) throws Exception {
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0, 0L, "key", "value");
        this.sender.runOnce();
        this.sender.runOnce();
        this.client.respond(produceResponse);
        this.sender.runOnce();
        this.sender.runOnce();
        ExecutionException executionException = (ExecutionException) Assertions.assertThrows(ExecutionException.class, () -> {
        });
        Assertions.assertEquals(InvalidRequestException.class, executionException.getCause().getClass());
        Assertions.assertEquals(str, executionException.getCause().getMessage());
    }

    private MockClient.RequestMatcher produceRequestMatcher(TopicPartition topicPartition, ProducerIdAndEpoch producerIdAndEpoch, int i, boolean z) {
        return abstractRequest -> {
            MemoryRecords memoryRecords;
            if (!(abstractRequest instanceof ProduceRequest) || (memoryRecords = partitionRecords((ProduceRequest) abstractRequest).get(topicPartition)) == null) {
                return false;
            }
            List list = TestUtils.toList(memoryRecords.batches());
            if (list.size() != 1) {
                return false;
            }
            MutableRecordBatch mutableRecordBatch = (MutableRecordBatch) list.get(0);
            return mutableRecordBatch.baseOffset() == 0 && mutableRecordBatch.baseSequence() == i && mutableRecordBatch.producerId() == producerIdAndEpoch.producerId && mutableRecordBatch.producerEpoch() == producerIdAndEpoch.epoch && mutableRecordBatch.isTransactional() == z;
        };
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition topicPartition) throws InterruptedException {
        return appendToAccumulator(topicPartition, this.time.milliseconds(), "key", "value");
    }

    private FutureRecordMetadata appendToAccumulator(TopicPartition topicPartition, long j, String str, String str2) throws InterruptedException {
        return this.accumulator.append(topicPartition.topic(), topicPartition.partition(), j, str.getBytes(), str2.getBytes(), Record.EMPTY_HEADERS, (RecordAccumulator.AppendCallbacks) null, 1000L, false, this.time.milliseconds(), TestUtils.singletonCluster()).future;
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i, long j2, String str) {
        return new ProduceResponse(Collections.singletonMap(topicPartition, new ProduceResponse.PartitionResponse(errors, j, -1L, j2, Collections.emptyList(), str)), i);
    }

    private ProduceResponse produceResponse(Map<TopicPartition, OffsetAndError> map) {
        ProduceResponseData produceResponseData = new ProduceResponseData();
        for (Map.Entry<TopicPartition, OffsetAndError> entry : map.entrySet()) {
            TopicPartition key = entry.getKey();
            ProduceResponseData.TopicProduceResponse find = produceResponseData.responses().find(key.topic());
            if (find == null) {
                find = new ProduceResponseData.TopicProduceResponse().setName(key.topic());
                produceResponseData.responses().add(find);
            }
            OffsetAndError value = entry.getValue();
            find.partitionResponses().add(new ProduceResponseData.PartitionProduceResponse().setIndex(key.partition()).setBaseOffset(value.offset).setErrorCode(value.error.code()).setRecordErrors(value.recordErrors));
        }
        return new ProduceResponse(produceResponseData);
    }

    private ProduceResponse produceResponse(TopicPartition topicPartition, long j, Errors errors, int i) {
        return produceResponse(topicPartition, j, errors, i, -1L, null);
    }

    private TransactionManager createTransactionManager() {
        return new TransactionManager(new LogContext(), (String) null, 0, RETRY_BACKOFF_MS, new ApiVersions());
    }

    private void setupWithTransactionState(TransactionManager transactionManager) {
        setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, 0);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, int i) {
        setupWithTransactionState(transactionManager, false, null, true, Integer.MAX_VALUE, i);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean z, BufferPool bufferPool) {
        setupWithTransactionState(transactionManager, z, bufferPool, true, Integer.MAX_VALUE, 0);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean z, BufferPool bufferPool, boolean z2) {
        setupWithTransactionState(transactionManager, z, bufferPool, z2, Integer.MAX_VALUE, 0);
    }

    private void setupWithTransactionState(TransactionManager transactionManager, boolean z, BufferPool bufferPool, boolean z2, int i, int i2) {
        this.metrics = new Metrics(new MetricConfig().tags(Collections.singletonMap("client-id", CLIENT_ID)), this.time);
        this.accumulator = new RecordAccumulator(this.logContext, this.batchSize, CompressionType.NONE, i2, 0L, DELIVERY_TIMEOUT_MS, this.metrics, "producer-metrics", this.time, this.apiVersions, transactionManager, bufferPool == null ? new BufferPool(1048576L, this.batchSize, this.metrics, this.time, "producer-metrics") : bufferPool);
        this.senderMetricsRegistry = new SenderMetricsRegistry(this.metrics);
        this.sender = new Sender(this.logContext, this.client, this.metadata, this.accumulator, z, MAX_REQUEST_SIZE, (short) -1, i, this.senderMetricsRegistry, this.time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, transactionManager, this.apiVersions);
        this.metadata.add("test", this.time.milliseconds());
        if (z2) {
            this.client.updateMetadata(RequestTestUtils.metadataUpdateWith(1, Collections.singletonMap("test", 2)));
        }
    }

    private void assertSendFailure(Class<? extends RuntimeException> cls) throws Exception {
        FutureRecordMetadata appendToAccumulator = appendToAccumulator(this.tp0);
        this.sender.runOnce();
        Assertions.assertTrue(appendToAccumulator.isDone());
        try {
            appendToAccumulator.get();
            Assertions.fail("Future should have raised " + cls.getSimpleName());
        } catch (ExecutionException e) {
            Assertions.assertTrue(cls.isAssignableFrom(e.getCause().getClass()));
        }
    }

    private void prepareAndReceiveInitProducerId(long j, Errors errors) {
        prepareAndReceiveInitProducerId(j, (short) 0, errors);
    }

    private void prepareAndReceiveInitProducerId(long j, short s, Errors errors) {
        if (errors != Errors.NONE) {
            s = ACKS_ALL;
        }
        this.client.prepareResponse(abstractRequest -> {
            return (abstractRequest instanceof InitProducerIdRequest) && ((InitProducerIdRequest) abstractRequest).data().transactionalId() == null;
        }, (AbstractResponse) initProducerIdResponse(j, s, errors));
        this.sender.runOnce();
    }

    private InitProducerIdResponse initProducerIdResponse(long j, short s, Errors errors) {
        return new InitProducerIdResponse(new InitProducerIdResponseData().setErrorCode(errors.code()).setProducerEpoch(s).setProducerId(j).setThrottleTimeMs(0));
    }

    private void doInitTransactions(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        TransactionalRequestResult initializeTransactions = transactionManager.initializeTransactions();
        prepareFindCoordinatorResponse(Errors.NONE, transactionManager.transactionalId());
        this.sender.runOnce();
        this.sender.runOnce();
        prepareInitProducerResponse(Errors.NONE, producerIdAndEpoch.producerId, producerIdAndEpoch.epoch);
        this.sender.runOnce();
        Assertions.assertTrue(transactionManager.hasProducerId());
        initializeTransactions.await();
    }

    private void prepareFindCoordinatorResponse(Errors errors, String str) {
        this.client.prepareResponse(FindCoordinatorResponse.prepareResponse(errors, str, (Node) this.metadata.fetch().nodes().get(0)));
    }

    private void prepareInitProducerResponse(Errors errors, long j, short s) {
        this.client.prepareResponse(initProducerIdResponse(j, s, errors));
    }

    private void assertFutureFailure(Future<?> future, Class<? extends Exception> cls) throws InterruptedException {
        Assertions.assertTrue(future.isDone());
        try {
            future.get();
            Assertions.fail("Future should have raised " + cls.getName());
        } catch (ExecutionException e) {
            Class<?> cls2 = e.getCause().getClass();
            Assertions.assertTrue(cls.isAssignableFrom(cls2), "Unexpected cause " + cls2.getName());
        }
    }

    private void createMockClientWithMaxFlightOneMetadataPending() {
        this.client = new MockClient(this.time, this.metadata) { // from class: org.apache.kafka.clients.producer.internals.SenderTest.2
            volatile boolean canSendMore = true;

            @Override // org.apache.kafka.clients.MockClient
            public Node leastLoadedNode(long j) {
                for (Node node : SenderTest.this.metadata.fetch().nodes()) {
                    if (isReady(node, j) && this.canSendMore) {
                        return node;
                    }
                }
                return null;
            }

            @Override // org.apache.kafka.clients.MockClient
            public List<ClientResponse> poll(long j, long j2) {
                this.canSendMore = inFlightRequestCount() < 1;
                return super.poll(j, j2);
            }
        };
        AbstractRequest.Builder<?> builder = new MetadataRequest.Builder<>(Collections.emptyList(), false);
        Node node = (Node) this.metadata.fetch().nodes().get(0);
        ClientRequest newClientRequest = this.client.newClientRequest(node.idString(), builder, this.time.milliseconds(), true);
        while (!this.client.ready(node, this.time.milliseconds())) {
            this.client.poll(0L, this.time.milliseconds());
        }
        this.client.send(newClientRequest, this.time.milliseconds());
        while (this.client.leastLoadedNode(this.time.milliseconds()) != null) {
            this.client.poll(0L, this.time.milliseconds());
        }
    }

    private void waitForProducerId(TransactionManager transactionManager, ProducerIdAndEpoch producerIdAndEpoch) {
        for (int i = 0; i < 5 && !transactionManager.hasProducerId(); i++) {
            this.sender.runOnce();
        }
        Assertions.assertTrue(transactionManager.hasProducerId());
        Assertions.assertEquals(producerIdAndEpoch, transactionManager.producerIdAndEpoch());
    }
}
