package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.FullChangeSerde;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.RecordBatchingStateRestoreCallback;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer;
import org.apache.kafka.streams.state.internals.TimeOrderedKeyValueBuffer;
import org.apache.kafka.test.MockInternalProcessorContext;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/TimeOrderedKeyValueBufferTest.class */
public class TimeOrderedKeyValueBufferTest<B extends TimeOrderedKeyValueBuffer<String, String>> {
    private static final RecordHeaders V_2_CHANGELOG_HEADERS = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{2})});
    private static final String APP_ID = "test-app";
    private final Function<String, B> bufferSupplier;
    private final String testName;

    @Parameterized.Parameters(name = "{index}: test={0}")
    public static Collection<Object[]> parameters() {
        return Collections.singletonList(new Object[]{"in-memory buffer", str -> {
            return new InMemoryTimeOrderedKeyValueBuffer.Builder(str, Serdes.String(), Serdes.String()).build();
        }});
    }

    public TimeOrderedKeyValueBufferTest(String str, Function<String, B> function) {
        this.testName = str + "_" + new Random().nextInt(Integer.MAX_VALUE);
        this.bufferSupplier = function;
    }

    private static MockInternalProcessorContext makeContext() {
        Properties properties = new Properties();
        properties.setProperty("application.id", APP_ID);
        properties.setProperty("bootstrap.servers", "");
        MockInternalProcessorContext mockInternalProcessorContext = new MockInternalProcessorContext(properties, new TaskId(0, 0), TestUtils.tempDirectory());
        mockInternalProcessorContext.setRecordCollector(new MockInternalProcessorContext.MockRecordCollector());
        return mockInternalProcessorContext;
    }

    private static void cleanup(MockInternalProcessorContext mockInternalProcessorContext, TimeOrderedKeyValueBuffer<String, String> timeOrderedKeyValueBuffer) {
        try {
            timeOrderedKeyValueBuffer.close();
            Utils.delete(mockInternalProcessorContext.stateDir());
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    @Test
    public void shouldInit() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldAcceptData() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 0L, 0L, "asdf", "2p93nf");
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldRejectNullValues() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        try {
            apply.put(0L, "asdf", (Change) null, getContext(0L));
            Assert.fail("expected an exception");
        } catch (NullPointerException e) {
        }
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldRemoveData() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 0L, 0L, "asdf", "qwer");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(1));
        apply.evictWhile(() -> {
            return true;
        }, eviction -> {
        });
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(0));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldRespectEvictionPredicate() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 0L, 0L, "asdf", "eyt");
        putRecord(apply, makeContext, 1L, 0L, "zxcv", "rtg");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
        LinkedList linkedList = new LinkedList();
        Supplier supplier = () -> {
            return Boolean.valueOf(apply.numRecords() > 1);
        };
        linkedList.getClass();
        apply.evictWhile(supplier, (v1) -> {
            r2.add(v1);
        });
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(1));
        MatcherAssert.assertThat(linkedList, Matchers.is(Collections.singletonList(new TimeOrderedKeyValueBuffer.Eviction("asdf", new Change("eyt", (Object) null), getContext(0L)))));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldTrackCount() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 0L, 0L, "asdf", "oin");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(1));
        putRecord(apply, makeContext, 1L, 0L, "asdf", "wekjn");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(1));
        putRecord(apply, makeContext, 0L, 0L, "zxcv", "24inf");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldTrackSize() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 0L, 0L, "asdf", "23roni");
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(51L));
        putRecord(apply, makeContext, 1L, 0L, "asdf", "3l");
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(47L));
        putRecord(apply, makeContext, 0L, 0L, "zxcv", "qfowin");
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(98L));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldTrackMinTimestamp() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 1L, 0L, "asdf", "2093j");
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(1L));
        putRecord(apply, makeContext, 0L, 0L, "zxcv", "3gon4i");
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(0L));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldEvictOldestAndUpdateSizeAndCountAndMinTimestamp() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 1L, 0L, "zxcv", "o23i4");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(1));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(50L));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(1L));
        putRecord(apply, makeContext, 0L, 0L, "asdf", "3ng");
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(98L));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(0L));
        AtomicInteger atomicInteger = new AtomicInteger(0);
        apply.evictWhile(() -> {
            return true;
        }, eviction -> {
            switch (atomicInteger.incrementAndGet()) {
                case 1:
                    MatcherAssert.assertThat(eviction.key(), Matchers.is("asdf"));
                    MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
                    MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(98L));
                    MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(0L));
                    return;
                case 2:
                    MatcherAssert.assertThat(eviction.key(), Matchers.is("zxcv"));
                    MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(1));
                    MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(50L));
                    MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(1L));
                    return;
                default:
                    Assert.fail("too many invocations");
                    return;
            }
        });
        MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), Matchers.is(2));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(0));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(Long.MAX_VALUE));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldReturnUndefinedOnPriorValueForNotBufferedKey() {
        B apply = this.bufferSupplier.apply(this.testName);
        apply.init(makeContext(), apply);
        MatcherAssert.assertThat(apply.priorValueForBuffered("ASDF"), Matchers.is(Maybe.undefined()));
    }

    @Test
    public void shouldReturnPriorValueForBufferedKey() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        ProcessorRecordContext context = getContext(0L);
        makeContext.setRecordContext(context);
        apply.put(1L, "A", new Change("new-value", "old-value"), context);
        apply.put(1L, "B", new Change("new-value", (Object) null), context);
        MatcherAssert.assertThat(apply.priorValueForBuffered("A"), Matchers.is(Maybe.defined(ValueAndTimestamp.make("old-value", -1L))));
        MatcherAssert.assertThat(apply.priorValueForBuffered("B"), Matchers.is(Maybe.defined((Object) null)));
    }

    @Test
    public void shouldFlush() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        putRecord(apply, makeContext, 2L, 0L, "asdf", "2093j");
        putRecord(apply, makeContext, 1L, 1L, "zxcv", "3gon4i");
        putRecord(apply, makeContext, 0L, 2L, "deleteme", "deadbeef");
        apply.evictWhile(() -> {
            return Boolean.valueOf(apply.minTimestamp() < 1);
        }, eviction -> {
        });
        apply.flush();
        MatcherAssert.assertThat((List) ((MockInternalProcessorContext.MockRecordCollector) makeContext.recordCollector()).collected().stream().map(producerRecord -> {
            KeyValue keyValue;
            if (producerRecord.value() == null) {
                keyValue = null;
            } else {
                ByteBuffer wrap = ByteBuffer.wrap((byte[]) producerRecord.value());
                keyValue = new KeyValue(Long.valueOf(wrap.getLong()), BufferValue.deserialize(wrap));
            }
            return new ProducerRecord(producerRecord.topic(), producerRecord.partition(), producerRecord.timestamp(), new String((byte[]) producerRecord.key(), StandardCharsets.UTF_8), keyValue, producerRecord.headers());
        }).collect(Collectors.toList()), Matchers.is(Arrays.asList(new ProducerRecord("test-app-" + this.testName + "-changelog", 0, (Long) null, "deleteme", (Object) null, new RecordHeaders()), new ProducerRecord("test-app-" + this.testName + "-changelog", 0, (Long) null, "zxcv", new KeyValue(1L, getBufferValue("3gon4i", 1L)), V_2_CHANGELOG_HEADERS), new ProducerRecord("test-app-" + this.testName + "-changelog", 0, (Long) null, "asdf", new KeyValue(2L, getBufferValue("2093j", 0L)), V_2_CHANGELOG_HEADERS))));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldRestoreOldFormat() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        RecordBatchingStateRestoreCallback stateRestoreCallback = makeContext.stateRestoreCallback(this.testName);
        makeContext.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", (Headers) null));
        Serializer serializer = FullChangeSerde.castOrWrap(Serdes.String()).serializer();
        byte[] serialize = serializer.serialize((String) null, new Change("doomed", (Object) null));
        byte[] serialize2 = serializer.serialize((String) null, new Change("qwer", (Object) null));
        byte[] serialize3 = serializer.serialize((String) null, new Change("eo4im", "previous"));
        byte[] serialize4 = serializer.serialize((String) null, new Change("next", "eo4im"));
        stateRestoreCallback.restoreBatch(Arrays.asList(new ConsumerRecord("changelog-topic", 0, 0L, 0L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + serialize.length).putLong(0L).put(serialize).array()), new ConsumerRecord("changelog-topic", 0, 1L, 1L, TimestampType.CREATE_TIME, -1L, -1, -1, "asdf".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + serialize2.length).putLong(2L).put(serialize2).array()), new ConsumerRecord("changelog-topic", 0, 2L, 2L, TimestampType.CREATE_TIME, -1L, -1, -1, "zxcv".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + serialize3.length).putLong(1L).put(serialize3).array()), new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, "zxcv".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + serialize4.length).putLong(1L).put(serialize4).array())));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(3));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(196L));
        stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), (Object) null)));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(1L));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(131L));
        MatcherAssert.assertThat(apply.priorValueForBuffered("todelete"), Matchers.is(Maybe.undefined()));
        MatcherAssert.assertThat(apply.priorValueForBuffered("asdf"), Matchers.is(Maybe.defined((Object) null)));
        MatcherAssert.assertThat(apply.priorValueForBuffered("zxcv"), Matchers.is(Maybe.defined(ValueAndTimestamp.make("previous", -1L))));
        LinkedList linkedList = new LinkedList();
        Supplier supplier = () -> {
            return true;
        };
        linkedList.getClass();
        apply.evictWhile(supplier, (v1) -> {
            r2.add(v1);
        });
        MatcherAssert.assertThat(linkedList, Matchers.is(Arrays.asList(new TimeOrderedKeyValueBuffer.Eviction("zxcv", new Change("next", "eo4im"), new ProcessorRecordContext(3L, 3L, 0, "changelog-topic", new RecordHeaders())), new TimeOrderedKeyValueBuffer.Eviction("asdf", new Change("qwer", (Object) null), new ProcessorRecordContext(1L, 1L, 0, "changelog-topic", new RecordHeaders())))));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldRestoreV1Format() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        RecordBatchingStateRestoreCallback stateRestoreCallback = makeContext.stateRestoreCallback(this.testName);
        makeContext.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", (Headers) null));
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{1})});
        byte[] array = getContextualRecord("doomed", 0L).serialize(0).array();
        byte[] array2 = getContextualRecord("qwer", 1L).serialize(0).array();
        byte[] array3 = new ContextualRecord(FullChangeSerde.castOrWrap(Serdes.String()).serializer().serialize((String) null, new Change("3o4im", "previous")), getContext(2L)).serialize(0).array();
        byte[] array4 = new ContextualRecord(FullChangeSerde.castOrWrap(Serdes.String()).serializer().serialize((String) null, new Change("next", "3o4im")), getContext(3L)).serialize(0).array();
        stateRestoreCallback.restoreBatch(Arrays.asList(new ConsumerRecord("changelog-topic", 0, 0L, 999L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array.length).putLong(0L).put(array).array(), recordHeaders), new ConsumerRecord("changelog-topic", 0, 1L, 9999L, TimestampType.CREATE_TIME, -1L, -1, -1, "asdf".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array2.length).putLong(2L).put(array2).array(), recordHeaders), new ConsumerRecord("changelog-topic", 0, 2L, 99L, TimestampType.CREATE_TIME, -1L, -1, -1, "zxcv".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array3.length).putLong(1L).put(array3).array(), recordHeaders), new ConsumerRecord("changelog-topic", 0, 3L, 100L, TimestampType.CREATE_TIME, -1L, -1, -1, "zxcv".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array4.length).putLong(1L).put(array4).array(), recordHeaders)));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(3));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(166L));
        stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), (Object) null)));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(1L));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(111L));
        MatcherAssert.assertThat(apply.priorValueForBuffered("todelete"), Matchers.is(Maybe.undefined()));
        MatcherAssert.assertThat(apply.priorValueForBuffered("asdf"), Matchers.is(Maybe.defined((Object) null)));
        MatcherAssert.assertThat(apply.priorValueForBuffered("zxcv"), Matchers.is(Maybe.defined(ValueAndTimestamp.make("previous", -1L))));
        LinkedList linkedList = new LinkedList();
        Supplier supplier = () -> {
            return true;
        };
        linkedList.getClass();
        apply.evictWhile(supplier, (v1) -> {
            r2.add(v1);
        });
        MatcherAssert.assertThat(linkedList, Matchers.is(Arrays.asList(new TimeOrderedKeyValueBuffer.Eviction("zxcv", new Change("next", "3o4im"), getContext(3L)), new TimeOrderedKeyValueBuffer.Eviction("asdf", new Change("qwer", (Object) null), getContext(1L)))));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldRestoreV2Format() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        RecordBatchingStateRestoreCallback stateRestoreCallback = makeContext.stateRestoreCallback(this.testName);
        makeContext.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", (Headers) null));
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{2})});
        byte[] array = getBufferValue("doomed", 0L).serialize(0).array();
        byte[] array2 = getBufferValue("qwer", 1L).serialize(0).array();
        byte[] array3 = new BufferValue(new ContextualRecord(FullChangeSerde.castOrWrap(Serdes.String()).serializer().serialize((String) null, new Change("3o4im", "IGNORED")), getContext(2L)), Serdes.String().serializer().serialize((String) null, "previous")).serialize(0).array();
        byte[] array4 = new BufferValue(new ContextualRecord(FullChangeSerde.castOrWrap(Serdes.String()).serializer().serialize((String) null, new Change("next", "3o4im")), getContext(3L)), Serdes.String().serializer().serialize((String) null, "previous")).serialize(0).array();
        stateRestoreCallback.restoreBatch(Arrays.asList(new ConsumerRecord("changelog-topic", 0, 0L, 999L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array.length).put(array).putLong(0L).array(), recordHeaders), new ConsumerRecord("changelog-topic", 0, 1L, 9999L, TimestampType.CREATE_TIME, -1L, -1, -1, "asdf".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array2.length).put(array2).putLong(2L).array(), recordHeaders), new ConsumerRecord("changelog-topic", 0, 2L, 99L, TimestampType.CREATE_TIME, -1L, -1, -1, "zxcv".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array3.length).put(array3).putLong(1L).array(), recordHeaders), new ConsumerRecord("changelog-topic", 0, 2L, 100L, TimestampType.CREATE_TIME, -1L, -1, -1, "zxcv".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array4.length).put(array4).putLong(1L).array(), recordHeaders)));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(3));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(0L));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(166L));
        stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 3L, 3L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), (Object) null)));
        MatcherAssert.assertThat(Integer.valueOf(apply.numRecords()), Matchers.is(2));
        MatcherAssert.assertThat(Long.valueOf(apply.minTimestamp()), Matchers.is(1L));
        MatcherAssert.assertThat(Long.valueOf(apply.bufferSize()), Matchers.is(111L));
        MatcherAssert.assertThat(apply.priorValueForBuffered("todelete"), Matchers.is(Maybe.undefined()));
        MatcherAssert.assertThat(apply.priorValueForBuffered("asdf"), Matchers.is(Maybe.defined((Object) null)));
        MatcherAssert.assertThat(apply.priorValueForBuffered("zxcv"), Matchers.is(Maybe.defined(ValueAndTimestamp.make("previous", -1L))));
        LinkedList linkedList = new LinkedList();
        Supplier supplier = () -> {
            return true;
        };
        linkedList.getClass();
        apply.evictWhile(supplier, (v1) -> {
            r2.add(v1);
        });
        MatcherAssert.assertThat(linkedList, Matchers.is(Arrays.asList(new TimeOrderedKeyValueBuffer.Eviction("zxcv", new Change("next", "3o4im"), getContext(3L)), new TimeOrderedKeyValueBuffer.Eviction("asdf", new Change("qwer", (Object) null), getContext(1L)))));
        cleanup(makeContext, apply);
    }

    @Test
    public void shouldNotRestoreUnrecognizedVersionRecord() {
        B apply = this.bufferSupplier.apply(this.testName);
        MockInternalProcessorContext makeContext = makeContext();
        apply.init(makeContext, apply);
        RecordBatchingStateRestoreCallback stateRestoreCallback = makeContext.stateRestoreCallback(this.testName);
        makeContext.setRecordContext(new ProcessorRecordContext(0L, 0L, 0, "", (Headers) null));
        RecordHeaders recordHeaders = new RecordHeaders(new Header[]{new RecordHeader("v", new byte[]{-1})});
        byte[] array = getBufferValue("doomed", 0L).serialize(0).array();
        try {
            stateRestoreCallback.restoreBatch(Collections.singletonList(new ConsumerRecord("changelog-topic", 0, 0L, 999L, TimestampType.CREATE_TIME, -1L, -1, -1, "todelete".getBytes(StandardCharsets.UTF_8), ByteBuffer.allocate(8 + array.length).putLong(0L).put(array).array(), recordHeaders)));
            Assert.fail("expected an exception");
            cleanup(makeContext, apply);
        } catch (IllegalArgumentException e) {
            cleanup(makeContext, apply);
        } catch (Throwable th) {
            cleanup(makeContext, apply);
            throw th;
        }
    }

    private static void putRecord(TimeOrderedKeyValueBuffer<String, String> timeOrderedKeyValueBuffer, MockInternalProcessorContext mockInternalProcessorContext, long j, long j2, String str, String str2) {
        ProcessorRecordContext context = getContext(j2);
        mockInternalProcessorContext.setRecordContext(context);
        timeOrderedKeyValueBuffer.put(j, str, new Change(str2, (Object) null), context);
    }

    private static BufferValue getBufferValue(String str, long j) {
        return new BufferValue(getContextualRecord(str, j), (byte[]) null);
    }

    private static ContextualRecord getContextualRecord(String str, long j) {
        return new ContextualRecord(FullChangeSerde.castOrWrap(Serdes.String()).serializer().serialize((String) null, new Change(str, (Object) null)), getContext(j));
    }

    private static ProcessorRecordContext getContext(long j) {
        return new ProcessorRecordContext(j, 0L, 0, "topic", (Headers) null);
    }
}
