package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.DefaultProductionExceptionHandler;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordQueueTest.class */
public class RecordQueueTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    private final String[] topics = {"topic"};
    private final Sensor skippedRecordsSensor = new Metrics().sensor("skipped-records");
    final InternalMockProcessorContext context = new InternalMockProcessorContext((StateSerdes<?, ?>) StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), (RecordCollector) new RecordCollectorImpl((Producer) null, (String) null, new LogContext("record-queue-test "), new DefaultProductionExceptionHandler(), this.skippedRecordsSensor));
    private final MockSourceNode mockSourceNodeWithMetrics = new MockSourceNode(this.topics, this.intDeserializer, this.intDeserializer);
    private final RecordQueue queue = new RecordQueue(new TopicPartition(this.topics[0], 1), this.mockSourceNodeWithMetrics, this.timestampExtractor, new LogAndFailExceptionHandler(), this.context, new LogContext());
    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition(this.topics[0], 1), this.mockSourceNodeWithMetrics, this.timestampExtractor, new LogAndContinueExceptionHandler(), this.context, new LogContext());
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);

    @Before
    public void before() {
        this.mockSourceNodeWithMetrics.init(this.context);
    }

    @After
    public void after() {
        this.mockSourceNodeWithMetrics.close();
    }

    @Test
    public void testTimeTracking() {
        Assert.assertTrue(this.queue.isEmpty());
        Assert.assertEquals(0L, this.queue.size());
        Assert.assertEquals(-1L, this.queue.timestamp());
        this.queue.addRawRecords(Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(1L, this.queue.timestamp());
        Assert.assertEquals(2L, this.queue.timeTracker().size());
        Assert.assertEquals(2L, this.queue.poll().timestamp);
        Assert.assertEquals(2L, this.queue.size());
        Assert.assertEquals(1L, this.queue.timestamp());
        Assert.assertEquals(2L, this.queue.timeTracker().size());
        Assert.assertEquals(1L, this.queue.poll().timestamp);
        Assert.assertEquals(1L, this.queue.size());
        Assert.assertEquals(3L, this.queue.timestamp());
        Assert.assertEquals(1L, this.queue.timeTracker().size());
        this.queue.addRawRecords(Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(4L, this.queue.size());
        Assert.assertEquals(3L, this.queue.timestamp());
        Assert.assertEquals(2L, this.queue.timeTracker().size());
        Assert.assertEquals(3L, this.queue.poll().timestamp);
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(3L, this.queue.timestamp());
        Assert.assertEquals(2L, this.queue.timeTracker().size());
        Assert.assertEquals(4L, this.queue.poll().timestamp);
        Assert.assertEquals(3L, this.queue.timestamp());
        Assert.assertEquals(2L, this.queue.timeTracker().size());
        Assert.assertEquals(1L, this.queue.poll().timestamp);
        Assert.assertEquals(3L, this.queue.timestamp());
        Assert.assertEquals(1L, this.queue.timeTracker().size());
        Assert.assertEquals(2L, this.queue.poll().timestamp);
        Assert.assertTrue(this.queue.isEmpty());
        Assert.assertEquals(0L, this.queue.size());
        Assert.assertEquals(3L, this.queue.timestamp());
        Assert.assertEquals(0L, this.queue.timeTracker().size());
        List asList = Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 6L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue));
        this.queue.addRawRecords(asList);
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(4L, this.queue.timestamp());
        Assert.assertEquals(4L, this.queue.poll().timestamp);
        Assert.assertEquals(2L, this.queue.size());
        Assert.assertEquals(5L, this.queue.timestamp());
        Assert.assertEquals(2L, this.queue.timeTracker().size());
        this.queue.clear();
        Assert.assertTrue(this.queue.isEmpty());
        Assert.assertEquals(0L, this.queue.size());
        Assert.assertEquals(0L, this.queue.timeTracker().size());
        Assert.assertEquals(-1L, this.queue.timestamp());
        this.queue.addRawRecords(asList);
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(4L, this.queue.timestamp());
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
        this.queue.addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, Serdes.Long().serializer().serialize("foo", 1L), this.recordValue)));
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
        this.queue.addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, Serdes.Long().serializer().serialize("foo", 1L))));
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
        this.queueThatSkipsDeserializeErrors.addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, Serdes.Long().serializer().serialize("foo", 1L), this.recordValue)));
        Assert.assertEquals(0L, this.queueThatSkipsDeserializeErrors.size());
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
        this.queueThatSkipsDeserializeErrors.addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, Serdes.Long().serializer().serialize("foo", 1L))));
        Assert.assertEquals(0L, this.queueThatSkipsDeserializeErrors.size());
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowOnNegativeTimestamp() {
        new RecordQueue(new TopicPartition(this.topics[0], 1), new MockSourceNode(this.topics, this.intDeserializer, this.intDeserializer), new FailOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), new LogContext()).addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
    }

    @Test
    public void shouldDropOnNegativeTimestamp() {
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue));
        new RecordQueue(new TopicPartition(this.topics[0], 1), new MockSourceNode(this.topics, this.intDeserializer, this.intDeserializer), new LogAndSkipOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), new LogContext()).addRawRecords(singletonList);
        Assert.assertEquals(0L, r0.size());
    }
}
