package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Sum;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.AlwaysContinueProductionExceptionHandler;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordCollectorTest.class */
public class RecordCollectorTest {
    private final LogContext logContext = new LogContext("test ");
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster cluster = new Cluster("cluster", Collections.singletonList(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final ByteArraySerializer byteArraySerializer = new ByteArraySerializer();
    private final StringSerializer stringSerializer = new StringSerializer();
    private final StreamPartitioner<String, Object> streamPartitioner = new StreamPartitioner<String, Object>() { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.1
        public Integer partition(String str, String str2, Object obj, int i) {
            return Integer.valueOf(Integer.parseInt(str2) % i);
        }
    };

    @Test
    public void testSpecificPartition() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("RecordCollectorTest-TestSpecificPartition", new LogContext("RecordCollectorTest-TestSpecificPartition "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer));
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        recordCollectorImpl.send("topic1", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", recordHeaders, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", recordHeaders, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", recordHeaders, 2, (Long) null, this.stringSerializer, this.stringSerializer);
        Map offsets = recordCollectorImpl.offsets();
        Assert.assertEquals(2L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals(1L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals(0L, offsets.get(new TopicPartition("topic1", 2)));
        recordCollectorImpl.send("topic1", "999", "0", (Headers) null, 0, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", (Headers) null, 1, (Long) null, this.stringSerializer, this.stringSerializer);
        recordCollectorImpl.send("topic1", "999", "0", recordHeaders, 2, (Long) null, this.stringSerializer, this.stringSerializer);
        Assert.assertEquals(3L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals(2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals(1L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test
    public void testStreamPartitioner() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("RecordCollectorTest-TestStreamPartitioner", new LogContext("RecordCollectorTest-TestStreamPartitioner "), new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer));
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("key", "value".getBytes())});
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "9", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "27", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "81", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "243", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "28", "0", recordHeaders, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "82", "0", recordHeaders, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "244", "0", recordHeaders, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "245", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Map offsets = recordCollectorImpl.offsets();
        Assert.assertEquals(4L, offsets.get(new TopicPartition("topic1", 0)));
        Assert.assertEquals(2L, offsets.get(new TopicPartition("topic1", 1)));
        Assert.assertEquals(0L, offsets.get(new TopicPartition("topic1", 2)));
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionOnAnyExceptionButProducerFencedException() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.2
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                throw new KafkaException();
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void shouldThrowStreamsExceptionOnSubsequentCallIfASendFailsWithDefaultExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.3
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        try {
            recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnSubsequentCallIfASendFailsWithContinueExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.4
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void shouldRecordSkippedMetricAndLogWarningIfSendFailsWithContinueExceptionHandler() {
        Metrics metrics = new Metrics();
        Sensor sensor = metrics.sensor("skipped-records");
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        MetricName metricName = new MetricName("name", "group", "description", Collections.EMPTY_MAP);
        sensor.add(metricName, new Sum());
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new AlwaysContinueProductionExceptionHandler(), sensor);
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.5
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        Assert.assertEquals(Double.valueOf(1.0d), ((KafkaMetric) metrics.metrics().get(metricName)).metricValue());
        Assert.assertTrue(createAndRegister.getMessages().contains("test Error sending records (key=[3] value=[0] timestamp=[null]) to topic=[topic1] and partition=[0]; The exception handler chose to CONTINUE processing in spite of this error."));
        LogCaptureAppender.unregister(createAndRegister);
    }

    @Test
    public void shouldThrowStreamsExceptionOnFlushIfASendFailedWithDefaultExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.6
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        try {
            recordCollectorImpl.flush();
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnFlushIfASendFailedWithContinueExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.7
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.flush();
    }

    @Test
    public void shouldThrowStreamsExceptionOnCloseIfASendFailedWithDefaultExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.8
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        try {
            recordCollectorImpl.close();
            Assert.fail("Should have thrown StreamsException");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldNotThrowStreamsExceptionOnCloseIfASendFailedWithContinueExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.9
            public synchronized Future<RecordMetadata> send(ProducerRecord producerRecord, Callback callback) {
                callback.onCompletion((RecordMetadata) null, new Exception());
                return null;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
        recordCollectorImpl.close();
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowIfTopicIsUnknownWithDefaultExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.10
            public List<PartitionInfo> partitionsFor(String str) {
                return Collections.EMPTY_LIST;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() {
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl("test", this.logContext, new AlwaysContinueProductionExceptionHandler(), new Metrics().sensor("skipped-records"));
        recordCollectorImpl.init(new MockProducer(this.cluster, true, new DefaultPartitioner(), this.byteArraySerializer, this.byteArraySerializer) { // from class: org.apache.kafka.streams.processor.internals.RecordCollectorTest.11
            public List<PartitionInfo> partitionsFor(String str) {
                return Collections.EMPTY_LIST;
            }
        });
        recordCollectorImpl.send("topic1", "3", "0", (Headers) null, (Long) null, this.stringSerializer, this.stringSerializer, this.streamPartitioner);
    }

    @Test
    public void testShouldNotThrowNPEOnCloseIfProducerIsNotInitialized() {
        new RecordCollectorImpl("NoNPE", this.logContext, new DefaultProductionExceptionHandler(), new Metrics().sensor("skipped-records")).close();
    }
}
