package org.apache.kafka.streams.processor.internals;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.errors.LogAndFailExceptionHandler;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.FailOnInvalidTimestamp;
import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.MockSourceNode;
import org.apache.kafka.test.MockTimestampExtractor;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.core.IsInstanceOf;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordQueueTest.class */
public class RecordQueueTest {
    private final Serializer<Integer> intSerializer = new IntegerSerializer();
    private final Deserializer<Integer> intDeserializer = new IntegerDeserializer();
    private final TimestampExtractor timestampExtractor = new MockTimestampExtractor();
    final InternalMockProcessorContext context = new InternalMockProcessorContext((StateSerdes<?, ?>) StateSerdes.withBuiltinTypes("anyName", Bytes.class, Bytes.class), new MockRecordCollector());
    private final MockSourceNode<Integer, Integer, ?, ?> mockSourceNodeWithMetrics = new MockSourceNode<>(this.intDeserializer, this.intDeserializer);
    private final RecordQueue queue = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, this.timestampExtractor, new LogAndFailExceptionHandler(), this.context, new LogContext());
    private final RecordQueue queueThatSkipsDeserializeErrors = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, this.timestampExtractor, new LogAndContinueExceptionHandler(), this.context, new LogContext());
    private final byte[] recordValue = this.intSerializer.serialize((String) null, 10);
    private final byte[] recordKey = this.intSerializer.serialize((String) null, 1);

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/RecordQueueTest$PartitionTimeTrackingTimestampExtractor.class */
    private static class PartitionTimeTrackingTimestampExtractor implements TimestampExtractor {
        private long partitionTime;

        private PartitionTimeTrackingTimestampExtractor() {
            this.partitionTime = -1L;
        }

        public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
            if (j < this.partitionTime) {
                throw new IllegalStateException("Partition time should not decrease");
            }
            this.partitionTime = j;
            return consumerRecord.offset();
        }
    }

    @Before
    public void before() {
        this.mockSourceNodeWithMetrics.init(this.context);
    }

    @After
    public void after() {
        this.mockSourceNodeWithMetrics.close();
    }

    @Test
    public void testTimeTracking() {
        Assert.assertTrue(this.queue.isEmpty());
        Assert.assertEquals(0L, this.queue.size());
        Assert.assertEquals(-1L, this.queue.headRecordTimestamp());
        Assert.assertNull(this.queue.headRecordOffset());
        this.queue.addRawRecords(Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(2L, this.queue.headRecordTimestamp());
        Assert.assertEquals(2L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(2L, this.queue.poll().timestamp);
        Assert.assertEquals(2L, this.queue.size());
        Assert.assertEquals(1L, this.queue.headRecordTimestamp());
        Assert.assertEquals(1L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(1L, this.queue.poll().timestamp);
        Assert.assertEquals(1L, this.queue.size());
        Assert.assertEquals(3L, this.queue.headRecordTimestamp());
        Assert.assertEquals(3L, this.queue.headRecordOffset().longValue());
        this.queue.addRawRecords(Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        Assert.assertEquals(4L, this.queue.size());
        Assert.assertEquals(3L, this.queue.headRecordTimestamp());
        Assert.assertEquals(3L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(3L, this.queue.poll().timestamp);
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(4L, this.queue.headRecordTimestamp());
        Assert.assertEquals(4L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(4L, this.queue.poll().timestamp);
        Assert.assertEquals(1L, this.queue.headRecordTimestamp());
        Assert.assertEquals(1L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(1L, this.queue.poll().timestamp);
        Assert.assertEquals(2L, this.queue.headRecordTimestamp());
        Assert.assertEquals(2L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(2L, this.queue.poll().timestamp);
        Assert.assertTrue(this.queue.isEmpty());
        Assert.assertEquals(0L, this.queue.size());
        Assert.assertEquals(-1L, this.queue.headRecordTimestamp());
        Assert.assertNull(this.queue.headRecordOffset());
        List asList = Arrays.asList(new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 5L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 6L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue));
        this.queue.addRawRecords(asList);
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(4L, this.queue.headRecordTimestamp());
        Assert.assertEquals(4L, this.queue.headRecordOffset().longValue());
        Assert.assertEquals(4L, this.queue.poll().timestamp);
        Assert.assertEquals(2L, this.queue.size());
        Assert.assertEquals(5L, this.queue.headRecordTimestamp());
        Assert.assertEquals(5L, this.queue.headRecordOffset().longValue());
        this.queue.clear();
        Assert.assertTrue(this.queue.isEmpty());
        Assert.assertEquals(0L, this.queue.size());
        Assert.assertEquals(-1L, this.queue.headRecordTimestamp());
        Assert.assertEquals(-1L, this.queue.partitionTime());
        Assert.assertNull(this.queue.headRecordOffset());
        this.queue.addRawRecords(asList);
        Assert.assertEquals(3L, this.queue.size());
        Assert.assertEquals(4L, this.queue.headRecordTimestamp());
        Assert.assertEquals(4L, this.queue.headRecordOffset().longValue());
    }

    @Test
    public void shouldTrackPartitionTimeAsMaxProcessedTimestamp() {
        Assert.assertTrue(this.queue.isEmpty());
        MatcherAssert.assertThat(Integer.valueOf(this.queue.size()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Long.valueOf(this.queue.headRecordTimestamp()), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(-1L));
        this.queue.addRawRecords(Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(-1L));
        this.queue.poll();
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(2L));
        this.queue.poll();
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(2L));
        this.queue.poll();
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(3L));
    }

    @Test
    public void shouldSetTimestampAndRespectMaxTimestampPolicy() {
        Assert.assertTrue(this.queue.isEmpty());
        MatcherAssert.assertThat(Integer.valueOf(this.queue.size()), CoreMatchers.is(0));
        MatcherAssert.assertThat(Long.valueOf(this.queue.headRecordTimestamp()), CoreMatchers.is(-1L));
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(-1L));
        this.queue.setPartitionTime(150L);
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(150L));
        this.queue.addRawRecords(Arrays.asList(new ConsumerRecord("topic", 1, 200L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 100L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 300L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 400L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue)));
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(150L));
        this.queue.poll();
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(200L));
        this.queue.setPartitionTime(500L);
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(500L));
        this.queue.poll();
        MatcherAssert.assertThat(Long.valueOf(this.queue.partitionTime()), CoreMatchers.is(500L));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenKeyDeserializationFails() {
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, Serdes.Long().serializer().serialize("foo", 1L), this.recordValue));
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            this.queue.addRawRecords(singletonList);
        }).getCause(), IsInstanceOf.instanceOf(SerializationException.class));
    }

    @Test
    public void shouldThrowStreamsExceptionWhenValueDeserializationFails() {
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, Serdes.Long().serializer().serialize("foo", 1L)));
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            this.queue.addRawRecords(singletonList);
        }).getCause(), IsInstanceOf.instanceOf(SerializationException.class));
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenKeyDeserializationFailsWithSkipHandler() {
        this.queueThatSkipsDeserializeErrors.addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, Serdes.Long().serializer().serialize("foo", 1L), this.recordValue)));
        Assert.assertEquals(0L, this.queueThatSkipsDeserializeErrors.size());
    }

    @Test
    public void shouldNotThrowStreamsExceptionWhenValueDeserializationFailsWithSkipHandler() {
        this.queueThatSkipsDeserializeErrors.addRawRecords(Collections.singletonList(new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, Serdes.Long().serializer().serialize("foo", 1L))));
        Assert.assertEquals(0L, this.queueThatSkipsDeserializeErrors.size());
    }

    @Test
    public void shouldThrowOnNegativeTimestamp() {
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue));
        RecordQueue recordQueue = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, new FailOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), new LogContext());
        MatcherAssert.assertThat(Assert.assertThrows(StreamsException.class, () -> {
            recordQueue.addRawRecords(singletonList);
        }).getMessage(), Matchers.equalTo("Input record ConsumerRecord(topic = topic, partition = 1, leaderEpoch = null, offset = 1, CreateTime = -1, serialized key size = 0, serialized value size = 0, headers = RecordHeaders(headers = [], isReadOnly = false), key = 1, value = 10) has invalid (negative) timestamp. Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. Use a different TimestampExtractor to process this data."));
    }

    @Test
    public void shouldDropOnNegativeTimestamp() {
        List singletonList = Collections.singletonList(new ConsumerRecord("topic", 1, 1L, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue));
        new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, new LogAndSkipOnInvalidTimestamp(), new LogAndContinueExceptionHandler(), new InternalMockProcessorContext(), new LogContext()).addRawRecords(singletonList);
        Assert.assertEquals(0L, r0.size());
    }

    @Test
    public void shouldPassPartitionTimeToTimestampExtractor() {
        PartitionTimeTrackingTimestampExtractor partitionTimeTrackingTimestampExtractor = new PartitionTimeTrackingTimestampExtractor();
        RecordQueue recordQueue = new RecordQueue(new TopicPartition("topic", 1), this.mockSourceNodeWithMetrics, partitionTimeTrackingTimestampExtractor, new LogAndFailExceptionHandler(), this.context, new LogContext());
        Assert.assertTrue(recordQueue.isEmpty());
        Assert.assertEquals(0L, recordQueue.size());
        Assert.assertEquals(-1L, recordQueue.headRecordTimestamp());
        List asList = Arrays.asList(new ConsumerRecord("topic", 1, 2L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 1L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 3L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue), new ConsumerRecord("topic", 1, 4L, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, this.recordKey, this.recordValue));
        Assert.assertEquals(-1L, partitionTimeTrackingTimestampExtractor.partitionTime);
        recordQueue.addRawRecords(asList);
        Assert.assertEquals(-1L, partitionTimeTrackingTimestampExtractor.partitionTime);
        recordQueue.poll();
        Assert.assertEquals(2L, partitionTimeTrackingTimestampExtractor.partitionTime);
        recordQueue.poll();
        Assert.assertEquals(2L, partitionTimeTrackingTimestampExtractor.partitionTime);
        recordQueue.poll();
        Assert.assertEquals(3L, partitionTimeTrackingTimestampExtractor.partitionTime);
    }
}
