package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.ProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskMigratedException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.test.MockClientSupplier;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsEqual;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordCollectorTest.class */
public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final TaskId taskId = new TaskId(0, 0);
    private final ProductionExceptionHandler productionExceptionHandler = new DefaultProductionExceptionHandler();
    private final StreamsMetricsImpl streamsMetrics = new MockStreamsMetrics(new Metrics());
    private final StreamsConfig config = new StreamsConfig(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "dummy:1234")}));
    private final StreamsConfig eosConfig = new StreamsConfig(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "appId"), Utils.mkEntry("bootstrap.servers", "dummy:1234"), Utils.mkEntry("processing.guarantee", "exactly_once_v2")}));
    private final String topic = "topic";
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), Arrays.asList(new PartitionInfo("topic", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic", 2, Node.noNode(), new Node[0], new Node[0])), Collections.emptySet(), Collections.emptySet());
    private final StringSerializer stringSerializer = new StringSerializer();
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final UUID processId = UUID.randomUUID();
    private final StreamPartitioner<String, Object> streamPartitioner = (str, str2, obj, i) -> {
        return Integer.valueOf(Integer.parseInt(str2) % i);
    };
    private MockProducer<byte[], byte[]> mockProducer;
    private StreamsProducer streamsProducer;
    private RecordCollectorImpl collector;

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordCollectorTest$CustomStringSerializer.class */
    private static class CustomStringSerializer extends StringSerializer {
        private boolean isKey;

        private CustomStringSerializer() {
        }

        public void configure(Map<String, ?> map, boolean z) {
            this.isKey = z;
            super.configure(map, z);
        }

        public byte[] serialize(String str, Headers headers, String str2) {
            if (this.isKey) {
                headers.add(new RecordHeader("key", "key".getBytes()));
            } else {
                headers.add(new RecordHeader("value", "value".getBytes()));
            }
            return serialize(str, str2);
        }
    }

    @Before
    public void setup() {
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        mockClientSupplier.setCluster(this.cluster);
        this.streamsProducer = new StreamsProducer(this.config, this.processId + "-StreamThread-1", mockClientSupplier, (TaskId) null, this.processId, this.logContext, Time.SYSTEM);
        this.mockProducer = mockClientSupplier.producers.get(0);
        this.collector = new RecordCollectorImpl(this.logContext, this.taskId, this.streamsProducer, this.productionExceptionHandler, this.streamsMetrics);
    }

    @After
    public void cleanup() {
        this.collector.closeClean();
    }

    @Test
    public void shouldSendToSpecificPartition() {
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", recordHeaders, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", recordHeaders, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", recordHeaders, 2, (Long) null, this.stringSerializer, this.stringSerializer);
        Map offsets = this.collector.offsets();
        Assert.assertEquals(2L, ((Long) offsets.get(new TopicPartition("topic", 0))).longValue());
        Assert.assertEquals(1L, ((Long) offsets.get(new TopicPartition("topic", 1))).longValue());
        Assert.assertEquals(0L, ((Long) offsets.get(new TopicPartition("topic", 2))).longValue());
        Assert.assertEquals(6L, this.mockProducer.history().size());
        this.collector.send("topic", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", (Headers) null, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", recordHeaders, 2, (Long) null, this.stringSerializer, this.stringSerializer);
        Map offsets2 = this.collector.offsets();
        Assert.assertEquals(3L, ((Long) offsets2.get(new TopicPartition("topic", 0))).longValue());
        Assert.assertEquals(2L, ((Long) offsets2.get(new TopicPartition("topic", 1))).longValue());
        Assert.assertEquals(1L, ((Long) offsets2.get(new TopicPartition("topic", 2))).longValue());
        Assert.assertEquals(9L, this.mockProducer.history().size());
    }

    @Test
    public void shouldSendWithPartitioner() {
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "9", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "27", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "81", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "243", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "28", "0", recordHeaders, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "82", "0", recordHeaders, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "244", "0", recordHeaders, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        this.collector.send("topic", "245", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Map offsets = this.collector.offsets();
        Assert.assertEquals(4L, ((Long) offsets.get(new TopicPartition("topic", 0))).longValue());
        Assert.assertEquals(2L, ((Long) offsets.get(new TopicPartition("topic", 1))).longValue());
        Assert.assertEquals(0L, ((Long) offsets.get(new TopicPartition("topic", 2))).longValue());
        Assert.assertEquals(9L, this.mockProducer.history().size());
        TopicPartition topicPartition = new TopicPartition("topic", 0);
        Assert.assertThrows(UnsupportedOperationException.class, () -> {
        });
    }

    @Test
    public void shouldSendWithNoPartition() {
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        this.collector.send("topic", "3", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "9", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "27", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "81", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "243", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "28", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "82", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "244", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "245", "0", recordHeaders, (Integer) null, (Long) null, this.stringSerializer, this.stringSerializer);
        Map offsets = this.collector.offsets();
        Assert.assertEquals(3L, ((Long) offsets.get(new TopicPartition("topic", 0))).longValue());
        Assert.assertEquals(2L, ((Long) offsets.get(new TopicPartition("topic", 1))).longValue());
        Assert.assertEquals(1L, ((Long) offsets.get(new TopicPartition("topic", 2))).longValue());
        Assert.assertEquals(9L, this.mockProducer.history().size());
    }

    @Test
    public void shouldUpdateOffsetsUponCompletion() {
        Map offsets = this.collector.offsets();
        this.collector.send("topic", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", (Headers) null, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        this.collector.send("topic", "999", "0", (Headers) null, 2, (Long) null, this.stringSerializer, this.stringSerializer);
        Assert.assertEquals(Collections.emptyMap(), offsets);
        this.collector.flush();
        Map offsets2 = this.collector.offsets();
        Assert.assertEquals(0L, offsets2.get(new TopicPartition("topic", 0)));
        Assert.assertEquals(0L, offsets2.get(new TopicPartition("topic", 1)));
        Assert.assertEquals(0L, offsets2.get(new TopicPartition("topic", 2)));
    }

    @Test
    public void shouldPassThroughRecordHeaderToSerializer() {
        CustomStringSerializer customStringSerializer = new CustomStringSerializer();
        CustomStringSerializer customStringSerializer2 = new CustomStringSerializer();
        customStringSerializer.configure(Collections.emptyMap(), true);
        this.collector.send("topic", "3", "0", new RecordHeaders(), (Long) null, customStringSerializer, customStringSerializer2, this.streamPartitioner);
        Iterator it = this.mockProducer.history().iterator();
        while (it.hasNext()) {
            Headers headers = ((ProducerRecord) it.next()).headers();
            Assert.assertEquals(2L, headers.toArray().length);
            Assert.assertEquals(new RecordHeader("key", "key".getBytes()), headers.lastHeader("key"));
            Assert.assertEquals(new RecordHeader("value", "value".getBytes()), headers.lastHeader("value"));
        }
    }

    @Test
    public void shouldForwardFlushToStreamsProducer() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        EasyMock.expect(Boolean.valueOf(streamsProducer.eosEnabled())).andReturn(false);
        streamsProducer.flush();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{streamsProducer});
        new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics).flush();
        EasyMock.verify(new Object[]{streamsProducer});
    }

    @Test
    public void shouldForwardFlushToStreamsProducerEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        EasyMock.expect(Boolean.valueOf(streamsProducer.eosEnabled())).andReturn(true);
        streamsProducer.flush();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{streamsProducer});
        new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics).flush();
        EasyMock.verify(new Object[]{streamsProducer});
    }

    @Test
    public void shouldNotAbortTxOnCloseCleanIfEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        EasyMock.expect(Boolean.valueOf(streamsProducer.eosEnabled())).andReturn(true);
        EasyMock.replay(new Object[]{streamsProducer});
        new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics).closeClean();
        EasyMock.verify(new Object[]{streamsProducer});
    }

    @Test
    public void shouldAbortTxOnCloseDirtyIfEosEnabled() {
        StreamsProducer streamsProducer = (StreamsProducer) EasyMock.mock(StreamsProducer.class);
        EasyMock.expect(Boolean.valueOf(streamsProducer.eosEnabled())).andReturn(true);
        streamsProducer.abortTransaction();
        EasyMock.replay(new Object[]{streamsProducer});
        new RecordCollectorImpl(this.logContext, this.taskId, streamsProducer, this.productionExceptionHandler, this.streamsMetrics).closeDirty();
        EasyMock.verify(new Object[]{streamsProducer});
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKeyClassCastException() {
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.collector.send("topic", "key", "value", new RecordHeaders(), 0, 0L, new LongSerializer(), new StringSerializer());
        });
        MatcherAssert.assertThat(assertThrows.getCause(), IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKeyAndNullValueClassCastException() {
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.collector.send("topic", "key", (Object) null, new RecordHeaders(), 0, 0L, new LongSerializer(), new StringSerializer());
        });
        MatcherAssert.assertThat(assertThrows.getCause(), IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.LongSerializer / value: org.apache.kafka.common.serialization.StringSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: unknown because value is null). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnValueClassCastException() {
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.collector.send("topic", "key", "value", new RecordHeaders(), 0, 0L, new StringSerializer(), new LongSerializer());
        });
        MatcherAssert.assertThat(assertThrows.getCause(), IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: java.lang.String / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnValueAndNullKeyClassCastException() {
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            this.collector.send("topic", (Object) null, "value", new RecordHeaders(), 0, 0L, new StringSerializer(), new LongSerializer());
        });
        MatcherAssert.assertThat(assertThrows.getCause(), IsInstanceOf.instanceOf(ClassCastException.class));
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("ClassCastException while producing data to topic topic. A serializer (key: org.apache.kafka.common.serialization.StringSerializer / value: org.apache.kafka.common.serialization.LongSerializer) is not compatible to the actual key or value type (key type: unknown because key is null / value type: java.lang.String). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters (for example if using the DSL, `#to(String topic, Produced<K, V> produced)` with `Produced.keySerde(WindowedSerdes.timeWindowedSerdeFrom(String.class))`)."));
    }

    @Test
    public void shouldThrowInformativeStreamsExceptionOnKafkaExceptionFromStreamPartitioner() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamProducerOnPartitionsFor(new KafkaException("Kaboom!")), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.initialize();
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            recordCollectorImpl.send("topic", "0", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        }).getMessage(), IsEqual.equalTo("Could not determine the number of partitions for topic 'topic' for task " + this.taskId + " due to org.apache.kafka.common.KafkaException: Kaboom!"));
    }

    @Test
    public void shouldForwardTimeoutExceptionFromStreamPartitionerWithoutWrappingIt() {
        shouldForwardExceptionWithoutWrappingIt(new TimeoutException("Kaboom!"));
    }

    @Test
    public void shouldForwardRuntimeExceptionFromStreamPartitionerWithoutWrappingIt() {
        shouldForwardExceptionWithoutWrappingIt(new RuntimeException("Kaboom!"));
    }

    private <E extends RuntimeException> void shouldForwardExceptionWithoutWrappingIt(E e) {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamProducerOnPartitionsFor(e), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.initialize();
        MatcherAssert.assertThat(((RuntimeException) Assert.assertThrows(e.getClass(), () -> {
            recordCollectorImpl.send("topic", "0", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        })).getMessage(), IsEqual.equalTo("Kaboom!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenProducerFencedInCallback() {
        testThrowTaskMigratedExceptionOnSubsequentSend(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentSendWhenInvalidEpochInCallback() {
        testThrowTaskMigratedExceptionOnSubsequentSend(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentSend(RuntimeException runtimeException) {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(runtimeException), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.initialize();
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals(runtimeException, Assert.assertThrows(TaskMigratedException.class, () -> {
            recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        }).getCause());
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenProducerFencedInCallback() {
        testThrowTaskMigratedExceptionOnSubsequentFlush(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentFlushWhenInvalidEpochInCallback() {
        testThrowTaskMigratedExceptionOnSubsequentFlush(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentFlush(RuntimeException runtimeException) {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(runtimeException), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.initialize();
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.getClass();
        Assert.assertEquals(runtimeException, Assert.assertThrows(TaskMigratedException.class, recordCollectorImpl::flush).getCause());
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenProducerFencedInCallback() {
        testThrowTaskMigratedExceptionOnSubsequentClose(new ProducerFencedException("KABOOM!"));
    }

    @Test
    public void shouldThrowTaskMigratedExceptionOnSubsequentCloseWhenInvalidEpochInCallback() {
        testThrowTaskMigratedExceptionOnSubsequentClose(new InvalidProducerEpochException("KABOOM!"));
    }

    private void testThrowTaskMigratedExceptionOnSubsequentClose(RuntimeException runtimeException) {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(runtimeException), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.initialize();
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.getClass();
        Assert.assertEquals(runtimeException, Assert.assertThrows(TaskMigratedException.class, recordCollectorImpl::closeClean).getCause());
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentSendIfASendFailsWithDefaultExceptionHandler() {
        KafkaException kafkaException = new KafkaException("KABOOM!");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(kafkaException), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        });
        Assert.assertEquals(kafkaException, assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentFlushIfASendFailsWithDefaultExceptionHandler() {
        KafkaException kafkaException = new KafkaException("KABOOM!");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(kafkaException), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, recordCollectorImpl::flush);
        Assert.assertEquals(kafkaException, assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCloseIfASendFailsWithDefaultExceptionHandler() {
        KafkaException kafkaException = new KafkaException("KABOOM!");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(kafkaException), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, recordCollectorImpl::closeClean);
        Assert.assertEquals(kafkaException, assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.KafkaException: KABOOM!\nException handler choose to FAIL the processing, no more records would be sent."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentSendIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException authenticationException = new AuthenticationException("KABOOM!");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(authenticationException), new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, () -> {
            recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        });
        Assert.assertEquals(authenticationException, assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentFlushIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException authenticationException = new AuthenticationException("KABOOM!");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(authenticationException), new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, recordCollectorImpl::flush);
        Assert.assertEquals(authenticationException, assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCloseIfFatalEvenWithContinueExceptionHandler() {
        AuthenticationException authenticationException = new AuthenticationException("KABOOM!");
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(authenticationException), new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.getClass();
        StreamsException assertThrows = Assert.assertThrows(StreamsException.class, recordCollectorImpl::closeClean);
        Assert.assertEquals(authenticationException, assertThrows.getCause());
        MatcherAssert.assertThat(assertThrows.getMessage(), IsEqual.equalTo("Error encountered sending record to topic topic for task 0_0 due to:\norg.apache.kafka.common.errors.AuthenticationException: KABOOM!\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error."));
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, getExceptionalStreamsProducerOnSend(new Exception()), new AlwaysContinueProductionExceptionHandler(), this.streamsMetrics);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister(RecordCollectorImpl.class);
        Throwable th = null;
        try {
            try {
                recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
                recordCollectorImpl.flush();
                List<String> messages = createAndRegister.getMessages();
                StringBuilder sb = new StringBuilder("Messages received:");
                Iterator<String> it = messages.iterator();
                while (it.hasNext()) {
                    sb.append("\n - ").append(it.next());
                }
                Assert.assertTrue(sb.toString(), messages.get(messages.size() - 1).endsWith("Exception handler choose to CONTINUE processing in spite of this error but written offsets would not be recorded."));
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                Assert.assertEquals(Double.valueOf(1.0d), ((Metric) this.streamsMetrics.metrics().get(new MetricName("dropped-records-total", "stream-task-metrics", "The total number of dropped records", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", Thread.currentThread().getName()), Utils.mkEntry("task-id", this.taskId.toString())})))).metricValue());
                recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
                recordCollectorImpl.flush();
                recordCollectorImpl.closeClean();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldNotAbortTxnOnEOSCloseDirtyIfNothingSent() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.eosConfig, "-StreamThread-1", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.1
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, new DefaultPartitioner(), RecordCollectorTest.this.byteArraySerializer, RecordCollectorTest.this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.1.1
                    public void abortTransaction() {
                        atomicBoolean.set(true);
                    }
                };
            }
        }, this.taskId, this.processId, this.logContext, Time.SYSTEM), this.productionExceptionHandler, this.streamsMetrics).closeDirty();
        Assert.assertFalse(atomicBoolean.get());
    }

    @Test
    public void shouldThrowIfTopicIsUnknownOnSendWithPartitioner() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.config, this.processId + "-StreamThread-1", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.2
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, new DefaultPartitioner(), RecordCollectorTest.this.byteArraySerializer, RecordCollectorTest.this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.2.1
                    public List<PartitionInfo> partitionsFor(String str) {
                        return Collections.emptyList();
                    }
                };
            }
        }, (TaskId) null, (UUID) null, this.logContext, Time.SYSTEM), this.productionExceptionHandler, this.streamsMetrics);
        recordCollectorImpl.initialize();
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            recordCollectorImpl.send("topic", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        }).getMessage(), IsEqual.equalTo("Could not get partition information for topic topic for task 0_0. This can happen if the topic does not exist."));
    }

    @Test
    public void shouldNotCloseInternalProducerForEOS() {
        new RecordCollectorImpl(this.logContext, this.taskId, new StreamsProducer(this.eosConfig, this.processId + "-StreamThread-1", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.3
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return RecordCollectorTest.this.mockProducer;
            }
        }, this.taskId, this.processId, this.logContext, Time.SYSTEM), this.productionExceptionHandler, this.streamsMetrics).closeClean();
        this.streamsProducer.flush();
    }

    @Test
    public void shouldNotCloseInternalProducerForNonEOS() {
        this.collector.closeClean();
        this.streamsProducer.flush();
    }

    private StreamsProducer getExceptionalStreamsProducerOnSend(final Exception exc) {
        return new StreamsProducer(this.config, this.processId + "-StreamThread-1", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.4
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, new DefaultPartitioner(), RecordCollectorTest.this.byteArraySerializer, RecordCollectorTest.this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.4.1
                    public synchronized Future<RecordMetadata> send(ProducerRecord<byte[], byte[]> producerRecord, Callback callback) {
                        callback.onCompletion((RecordMetadata) null, exc);
                        return null;
                    }
                };
            }
        }, (TaskId) null, (UUID) null, this.logContext, Time.SYSTEM);
    }

    private StreamsProducer getExceptionalStreamProducerOnPartitionsFor(final RuntimeException runtimeException) {
        return new StreamsProducer(this.config, this.processId + "-StreamThread-1", new MockClientSupplier() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.5
            @Override // org.apache.kafka.test.MockClientSupplier
            public Producer<byte[], byte[]> getProducer(Map<String, Object> map) {
                return new MockProducer<byte[], byte[]>(RecordCollectorTest.this.cluster, true, new DefaultPartitioner(), RecordCollectorTest.this.byteArraySerializer, RecordCollectorTest.this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.5.1
                    public synchronized List<PartitionInfo> partitionsFor(String str) {
                        throw runtimeException;
                    }
                };
            }
        }, (TaskId) null, (UUID) null, this.logContext, Time.SYSTEM);
    }
}
