package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.SimpleTimeZone;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.internals.RecordHeader;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.internals.ChangelogRecordDeserializationHelper;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.PrefixedWindowKeySchemas;
import org.apache.kafka.streams.state.internals.Segment;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import org.rocksdb.WriteBatch;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/AbstractDualSchemaRocksDBSegmentedBytesStoreTest.class */
public abstract class AbstractDualSchemaRocksDBSegmentedBytesStoreTest<S extends Segment> {
    private InternalMockProcessorContext context;
    private AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore;
    private File stateDir;
    private Window nextSegmentWindow;
    private final long windowSizeForTimeWindow = 500;
    private final Window[] windows = new Window[4];
    final long retention = 1000;
    final long segmentInterval = IntegrationTestUtils.DEFAULT_TIMEOUT;
    final String storeName = "bytes-store";

    @Before
    public void before() {
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            this.windows[0] = WindowKeySchema.timeWindowForSize(10L, 500L);
            this.windows[1] = WindowKeySchema.timeWindowForSize(500L, 500L);
            this.windows[2] = WindowKeySchema.timeWindowForSize(1000L, 500L);
            this.windows[3] = WindowKeySchema.timeWindowForSize(IntegrationTestUtils.DEFAULT_TIMEOUT, 500L);
            this.nextSegmentWindow = WindowKeySchema.timeWindowForSize(61000L, 500L);
        }
        this.bytesStore = getBytesStore();
        this.stateDir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.stateDir, Serdes.String(), Serdes.Long(), new MockRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())));
        this.bytesStore.init(this.context, this.bytesStore);
    }

    @After
    public void close() {
        this.bytesStore.close();
    }

    abstract AbstractDualSchemaRocksDBSegmentedBytesStore<S> getBytesStore();

    abstract AbstractSegments<S> newSegments();

    abstract SegmentedBytesStore.KeySchema getBaseSchema();

    abstract SegmentedBytesStore.KeySchema getIndexSchema();

    @Test
    public void shouldPutAndFetch() {
        KeyValueIterator<Bytes, byte[]> fetch;
        Throwable th;
        KeyValueIterator<Bytes, byte[]> fetch2;
        Throwable th2;
        Throwable th3;
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue(200L));
        KeyValueIterator<Bytes, byte[]> fetch3 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, this.windows[2].start());
        Throwable th4 = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L)), toList(fetch3));
                if (fetch3 != null) {
                    if (0 != 0) {
                        try {
                            fetch3.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    } else {
                        fetch3.close();
                    }
                }
                fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                th = null;
            } catch (Throwable th6) {
                th4 = th6;
                throw th6;
            }
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("b", this.windows[2]), 100L)), toList(fetch));
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    fetch2 = this.bytesStore.fetch((Bytes) null, Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                    th2 = null;
                } catch (Throwable th8) {
                    th = th8;
                    throw th8;
                }
                try {
                    try {
                        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("b", this.windows[2]), 100L)), toList(fetch2));
                        if (fetch2 != null) {
                            if (0 != 0) {
                                try {
                                    fetch2.close();
                                } catch (Throwable th9) {
                                    th2.addSuppressed(th9);
                                }
                            } else {
                                fetch2.close();
                            }
                        }
                        fetch = this.bytesStore.fetch(Bytes.wrap("b".getBytes()), (Bytes) null, 0L, this.windows[3].start());
                        th3 = null;
                    } catch (Throwable th10) {
                        th2 = th10;
                        throw th10;
                    }
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.windows[2]), 100L), KeyValue.pair(new Windowed("c", this.windows[3]), 200L)), toList(fetch));
                            if (fetch != null) {
                                if (0 != 0) {
                                    try {
                                        fetch.close();
                                    } catch (Throwable th11) {
                                        th3.addSuppressed(th11);
                                    }
                                } else {
                                    fetch.close();
                                }
                            }
                            fetch = this.bytesStore.fetch((Bytes) null, (Bytes) null, 0L, this.windows[3].start());
                            Throwable th12 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("b", this.windows[2]), 100L), KeyValue.pair(new Windowed("c", this.windows[3]), 200L)), toList(fetch));
                                    if (fetch != null) {
                                        if (0 == 0) {
                                            fetch.close();
                                            return;
                                        }
                                        try {
                                            fetch.close();
                                        } catch (Throwable th13) {
                                            th12.addSuppressed(th13);
                                        }
                                    }
                                } catch (Throwable th14) {
                                    th12 = th14;
                                    throw th14;
                                }
                            } finally {
                            }
                        } catch (Throwable th15) {
                            th3 = th15;
                            throw th15;
                        }
                    } finally {
                    }
                } finally {
                    if (fetch2 != null) {
                        if (th2 != null) {
                            try {
                                fetch2.close();
                            } catch (Throwable th16) {
                                th2.addSuppressed(th16);
                            }
                        } else {
                            fetch2.close();
                        }
                    }
                }
            } finally {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th17) {
                            th.addSuppressed(th17);
                        }
                    } else {
                        fetch.close();
                    }
                }
            }
        } catch (Throwable th18) {
            if (fetch3 != null) {
                if (th4 != null) {
                    try {
                        fetch3.close();
                    } catch (Throwable th19) {
                        th4.addSuppressed(th19);
                    }
                } else {
                    fetch3.close();
                }
            }
            throw th18;
        }
    }

    @Test
    public void shouldPutAndBackwardFetch() {
        KeyValueIterator<Bytes, byte[]> backwardFetch;
        Throwable th;
        KeyValueIterator<Bytes, byte[]> backwardFetch2;
        Throwable th2;
        Throwable th3;
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue(200L));
        KeyValueIterator<Bytes, byte[]> backwardFetch3 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), 0L, this.windows[2].start());
        Throwable th4 = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("a", this.windows[0]), 10L)), toList(backwardFetch3));
                if (backwardFetch3 != null) {
                    if (0 != 0) {
                        try {
                            backwardFetch3.close();
                        } catch (Throwable th5) {
                            th4.addSuppressed(th5);
                        }
                    } else {
                        backwardFetch3.close();
                    }
                }
                backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                th = null;
            } catch (Throwable th6) {
                th4 = th6;
                throw th6;
            }
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.windows[2]), 100L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("a", this.windows[0]), 10L)), toList(backwardFetch));
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                    backwardFetch2 = this.bytesStore.backwardFetch((Bytes) null, Bytes.wrap("b".getBytes()), 0L, this.windows[2].start());
                    th2 = null;
                } catch (Throwable th8) {
                    th = th8;
                    throw th8;
                }
                try {
                    try {
                        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.windows[2]), 100L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("a", this.windows[0]), 10L)), toList(backwardFetch2));
                        if (backwardFetch2 != null) {
                            if (0 != 0) {
                                try {
                                    backwardFetch2.close();
                                } catch (Throwable th9) {
                                    th2.addSuppressed(th9);
                                }
                            } else {
                                backwardFetch2.close();
                            }
                        }
                        backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("b".getBytes()), (Bytes) null, 0L, this.windows[3].start());
                        th3 = null;
                    } catch (Throwable th10) {
                        th2 = th10;
                        throw th10;
                    }
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("c", this.windows[3]), 200L), KeyValue.pair(new Windowed("b", this.windows[2]), 100L)), toList(backwardFetch));
                            if (backwardFetch != null) {
                                if (0 != 0) {
                                    try {
                                        backwardFetch.close();
                                    } catch (Throwable th11) {
                                        th3.addSuppressed(th11);
                                    }
                                } else {
                                    backwardFetch.close();
                                }
                            }
                            backwardFetch = this.bytesStore.backwardFetch((Bytes) null, (Bytes) null, 0L, this.windows[3].start());
                            Throwable th12 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("c", this.windows[3]), 200L), KeyValue.pair(new Windowed("b", this.windows[2]), 100L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L), KeyValue.pair(new Windowed("a", this.windows[0]), 10L)), toList(backwardFetch));
                                    if (backwardFetch != null) {
                                        if (0 == 0) {
                                            backwardFetch.close();
                                            return;
                                        }
                                        try {
                                            backwardFetch.close();
                                        } catch (Throwable th13) {
                                            th12.addSuppressed(th13);
                                        }
                                    }
                                } catch (Throwable th14) {
                                    th12 = th14;
                                    throw th14;
                                }
                            } finally {
                            }
                        } catch (Throwable th15) {
                            th3 = th15;
                            throw th15;
                        }
                    } finally {
                    }
                } finally {
                    if (backwardFetch2 != null) {
                        if (th2 != null) {
                            try {
                                backwardFetch2.close();
                            } catch (Throwable th16) {
                                th2.addSuppressed(th16);
                            }
                        } else {
                            backwardFetch2.close();
                        }
                    }
                }
            } finally {
                if (backwardFetch != null) {
                    if (th != null) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th17) {
                            th.addSuppressed(th17);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
            }
        } catch (Throwable th18) {
            if (backwardFetch3 != null) {
                if (th4 != null) {
                    try {
                        backwardFetch3.close();
                    } catch (Throwable th19) {
                        th4.addSuppressed(th19);
                    }
                } else {
                    backwardFetch3.close();
                }
            }
            throw th18;
        }
    }

    @Test
    public void shouldPutAndFetchWithPrefixKey() {
        KeyValueIterator<Bytes, byte[]> fetch;
        Throwable th;
        TimeWindow timeWindow = new TimeWindow(9223372036854775806L, Long.MAX_VALUE);
        Bytes serializeKey = serializeKey(new Windowed<>("a", timeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey2 = serializeKey(new Windowed<>("aa", timeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey3 = serializeKey(new Windowed<>("aaa", timeWindow), false, Integer.MAX_VALUE);
        Assert.assertTrue(serializeKey.compareTo(serializeKey2) > 0);
        Assert.assertTrue(serializeKey2.compareTo(serializeKey3) > 0);
        this.bytesStore.put(serializeKey, serializeValue(10L));
        this.bytesStore.put(serializeKey2, serializeValue(50L));
        this.bytesStore.put(serializeKey3, serializeValue(100L));
        KeyValueIterator<Bytes, byte[]> fetch2 = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, Long.MAX_VALUE);
        Throwable th2 = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch2));
                if (fetch2 != null) {
                    if (0 != 0) {
                        try {
                            fetch2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        fetch2.close();
                    }
                }
                fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch));
                    if (fetch != null) {
                        if (0 != 0) {
                            try {
                                fetch.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            fetch.close();
                        }
                    }
                    KeyValueIterator<Bytes, byte[]> fetch3 = this.bytesStore.fetch((Bytes) null, Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                    Throwable th6 = null;
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch3));
                            if (fetch3 != null) {
                                if (0 != 0) {
                                    try {
                                        fetch3.close();
                                    } catch (Throwable th7) {
                                        th6.addSuppressed(th7);
                                    }
                                } else {
                                    fetch3.close();
                                }
                            }
                            fetch3 = this.bytesStore.fetch(Bytes.wrap("aa".getBytes()), (Bytes) null, 0L, Long.MAX_VALUE);
                            Throwable th8 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aaa", timeWindow), 100L), KeyValue.pair(new Windowed("aa", timeWindow), 50L)), toList(fetch3));
                                    if (fetch3 != null) {
                                        if (0 != 0) {
                                            try {
                                                fetch3.close();
                                            } catch (Throwable th9) {
                                                th8.addSuppressed(th9);
                                            }
                                        } else {
                                            fetch3.close();
                                        }
                                    }
                                    fetch = this.bytesStore.fetch((Bytes) null, (Bytes) null, 0L, Long.MAX_VALUE);
                                    Throwable th10 = null;
                                    try {
                                        try {
                                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aaa", timeWindow), 100L), KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(fetch));
                                            if (fetch != null) {
                                                if (0 == 0) {
                                                    fetch.close();
                                                    return;
                                                }
                                                try {
                                                    fetch.close();
                                                } catch (Throwable th11) {
                                                    th10.addSuppressed(th11);
                                                }
                                            }
                                        } catch (Throwable th12) {
                                            th10 = th12;
                                            throw th12;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th13) {
                                    th8 = th13;
                                    throw th13;
                                }
                            } finally {
                            }
                        } catch (Throwable th14) {
                            th6 = th14;
                            throw th14;
                        }
                    } finally {
                        if (fetch3 != null) {
                            if (th6 != null) {
                                try {
                                    fetch3.close();
                                } catch (Throwable th15) {
                                    th6.addSuppressed(th15);
                                }
                            } else {
                                fetch3.close();
                            }
                        }
                    }
                } catch (Throwable th16) {
                    th = th16;
                    throw th16;
                }
            } finally {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th17) {
                            th.addSuppressed(th17);
                        }
                    } else {
                        fetch.close();
                    }
                }
            }
        } catch (Throwable th18) {
            if (fetch2 != null) {
                if (th2 != null) {
                    try {
                        fetch2.close();
                    } catch (Throwable th19) {
                        th2.addSuppressed(th19);
                    }
                } else {
                    fetch2.close();
                }
            }
            throw th18;
        }
    }

    @Test
    public void shouldPutAndBackwardFetchWithPrefix() {
        KeyValueIterator<Bytes, byte[]> backwardFetch;
        Throwable th;
        TimeWindow timeWindow = new TimeWindow(9223372036854775806L, Long.MAX_VALUE);
        Bytes serializeKey = serializeKey(new Windowed<>("a", timeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey2 = serializeKey(new Windowed<>("aa", timeWindow), false, Integer.MAX_VALUE);
        Bytes serializeKey3 = serializeKey(new Windowed<>("aaa", timeWindow), false, Integer.MAX_VALUE);
        Assert.assertTrue(serializeKey.compareTo(serializeKey2) > 0);
        Assert.assertTrue(serializeKey2.compareTo(serializeKey3) > 0);
        this.bytesStore.put(serializeKey, serializeValue(10L));
        this.bytesStore.put(serializeKey2, serializeValue(50L));
        this.bytesStore.put(serializeKey3, serializeValue(100L));
        KeyValueIterator<Bytes, byte[]> backwardFetch2 = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), 0L, Long.MAX_VALUE);
        Throwable th2 = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L)), toList(backwardFetch2));
                if (backwardFetch2 != null) {
                    if (0 != 0) {
                        try {
                            backwardFetch2.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        backwardFetch2.close();
                    }
                }
                backwardFetch = this.bytesStore.backwardFetch(Bytes.wrap("a".getBytes()), Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                th = null;
            } catch (Throwable th4) {
                th2 = th4;
                throw th4;
            }
            try {
                try {
                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L), KeyValue.pair(new Windowed("aa", timeWindow), 50L)), toList(backwardFetch));
                    if (backwardFetch != null) {
                        if (0 != 0) {
                            try {
                                backwardFetch.close();
                            } catch (Throwable th5) {
                                th.addSuppressed(th5);
                            }
                        } else {
                            backwardFetch.close();
                        }
                    }
                    KeyValueIterator<Bytes, byte[]> backwardFetch3 = this.bytesStore.backwardFetch((Bytes) null, Bytes.wrap("aa".getBytes()), 0L, Long.MAX_VALUE);
                    Throwable th6 = null;
                    try {
                        try {
                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L), KeyValue.pair(new Windowed("aa", timeWindow), 50L)), toList(backwardFetch3));
                            if (backwardFetch3 != null) {
                                if (0 != 0) {
                                    try {
                                        backwardFetch3.close();
                                    } catch (Throwable th7) {
                                        th6.addSuppressed(th7);
                                    }
                                } else {
                                    backwardFetch3.close();
                                }
                            }
                            backwardFetch3 = this.bytesStore.backwardFetch(Bytes.wrap("aa".getBytes()), (Bytes) null, 0L, Long.MAX_VALUE);
                            Throwable th8 = null;
                            try {
                                try {
                                    Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("aaa", timeWindow), 100L)), toList(backwardFetch3));
                                    if (backwardFetch3 != null) {
                                        if (0 != 0) {
                                            try {
                                                backwardFetch3.close();
                                            } catch (Throwable th9) {
                                                th8.addSuppressed(th9);
                                            }
                                        } else {
                                            backwardFetch3.close();
                                        }
                                    }
                                    backwardFetch = this.bytesStore.backwardFetch((Bytes) null, (Bytes) null, 0L, Long.MAX_VALUE);
                                    Throwable th10 = null;
                                    try {
                                        try {
                                            Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", timeWindow), 10L), KeyValue.pair(new Windowed("aa", timeWindow), 50L), KeyValue.pair(new Windowed("aaa", timeWindow), 100L)), toList(backwardFetch));
                                            if (backwardFetch != null) {
                                                if (0 == 0) {
                                                    backwardFetch.close();
                                                    return;
                                                }
                                                try {
                                                    backwardFetch.close();
                                                } catch (Throwable th11) {
                                                    th10.addSuppressed(th11);
                                                }
                                            }
                                        } catch (Throwable th12) {
                                            th10 = th12;
                                            throw th12;
                                        }
                                    } finally {
                                    }
                                } catch (Throwable th13) {
                                    th8 = th13;
                                    throw th13;
                                }
                            } finally {
                            }
                        } catch (Throwable th14) {
                            th6 = th14;
                            throw th14;
                        }
                    } finally {
                        if (backwardFetch3 != null) {
                            if (th6 != null) {
                                try {
                                    backwardFetch3.close();
                                } catch (Throwable th15) {
                                    th6.addSuppressed(th15);
                                }
                            } else {
                                backwardFetch3.close();
                            }
                        }
                    }
                } catch (Throwable th16) {
                    th = th16;
                    throw th16;
                }
            } finally {
                if (backwardFetch != null) {
                    if (th != null) {
                        try {
                            backwardFetch.close();
                        } catch (Throwable th17) {
                            th.addSuppressed(th17);
                        }
                    } else {
                        backwardFetch.close();
                    }
                }
            }
        } catch (Throwable th18) {
            if (backwardFetch2 != null) {
                if (th2 != null) {
                    try {
                        backwardFetch2.close();
                    } catch (Throwable th19) {
                        th2.addSuppressed(th19);
                    }
                } else {
                    backwardFetch2.close();
                }
            }
            throw th18;
        }
    }

    @Test
    public void shouldSkipAndRemoveDanglingIndex() {
        if (getIndexSchema() == null) {
            Assertions.assertThrows(IllegalStateException.class, () -> {
                this.bytesStore.putIndex(Bytes.wrap("a".getBytes()), new byte[0]);
            });
            return;
        }
        Bytes serializeKeyForIndex = serializeKeyForIndex(new Windowed<>("a", this.windows[1]));
        this.bytesStore.putIndex(serializeKeyForIndex, new byte[0]);
        MatcherAssert.assertThat(Bytes.wrap(this.bytesStore.getIndex(serializeKeyForIndex)), CoreMatchers.is(Bytes.wrap(new byte[0])));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(20L));
        KeyValueIterator<Bytes, byte[]> fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), Bytes.wrap("b".getBytes()), 1L, 2000L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("b", this.windows[2]), 20L)), toList(fetch));
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fetch.close();
                    }
                }
                MatcherAssert.assertThat(this.bytesStore.getIndex(serializeKeyForIndex), CoreMatchers.is(CoreMatchers.nullValue()));
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldFindValuesWithinRange() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(100L));
        KeyValueIterator<Bytes, byte[]> fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 1L, 999L);
        Throwable th = null;
        try {
            try {
                Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L)), toList(fetch));
                if (fetch != null) {
                    if (0 == 0) {
                        fetch.close();
                        return;
                    }
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (fetch != null) {
                if (th != null) {
                    try {
                        fetch.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldRemove() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(30L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.remove(serializeKey(new Windowed<>("a", this.windows[0])));
        KeyValueIterator fetch = this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 100L);
        Throwable th = null;
        try {
            Assert.assertFalse(fetch.hasNext());
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    fetch.close();
                }
            }
            if (getIndexSchema() != null) {
                MatcherAssert.assertThat(this.bytesStore.getIndex(serializeKeyForIndex(new Windowed<>("a", this.windows[0]))), CoreMatchers.is(CoreMatchers.nullValue()));
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldRollSegments() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(500L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(1000L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[1]), 100L), KeyValue.pair(new Windowed("a", this.windows[2]), 500L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 1500L)));
        newSegments.close();
    }

    @Test
    public void shouldGetAllSegments() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("b", this.windows[3]), 100L)), toList(this.bytesStore.all()));
        newSegments.close();
    }

    @Test
    public void shouldGetAllBackwards() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("b", this.windows[3]), 100L), KeyValue.pair(new Windowed("a", this.windows[0]), 50L)), toList(this.bytesStore.backwardAll()));
        newSegments.close();
    }

    @Test
    public void shouldFetchAllSegments() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(newSegments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{newSegments.segmentName(0L), newSegments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L)), toList(this.bytesStore.fetchAll(0L, IntegrationTestUtils.DEFAULT_TIMEOUT)));
        newSegments.close();
    }

    @Test
    public void shouldLoadSegmentsWithOldStyleDateFormattedName() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        this.bytesStore.close();
        String segmentName = newSegments.segmentName(0L);
        String[] split = segmentName.split("\\.");
        long parseLong = Long.parseLong(split[1]);
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
        simpleDateFormat.setTimeZone(new SimpleTimeZone(0, "UTC"));
        String format = simpleDateFormat.format(new Date(parseLong * IntegrationTestUtils.DEFAULT_TIMEOUT));
        File file = new File(this.stateDir, "bytes-store");
        Assert.assertTrue(new File(file, segmentName).renameTo(new File(file, split[0] + "-" + format)));
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        MatcherAssert.assertThat(toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, IntegrationTestUtils.DEFAULT_TIMEOUT)), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L))));
        newSegments.close();
    }

    @Test
    public void shouldLoadSegmentsWithOldStyleColonFormattedName() {
        AbstractSegments<S> newSegments = newSegments();
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        this.bytesStore.close();
        String segmentName = newSegments.segmentName(0L);
        String[] split = segmentName.split("\\.");
        File file = new File(this.stateDir, "bytes-store");
        Assert.assertTrue(new File(file, segmentName).renameTo(new File(file, split[0] + ":" + Long.parseLong(split[1]))));
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        MatcherAssert.assertThat(toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, IntegrationTestUtils.DEFAULT_TIMEOUT)), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L))));
        newSegments.close();
    }

    @Test
    public void shouldBeAbleToWriteToReInitializedStore() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.close();
        this.bytesStore.init(this.context, this.bytesStore);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(100L));
    }

    @Test
    public void shouldCreateWriteBatches() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L)));
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(100L)));
        Map writeBatches = this.bytesStore.getWriteBatches(arrayList);
        Assert.assertEquals(2L, writeBatches.size());
        int i = getIndexSchema() == null ? 1 : 2;
        Iterator it = writeBatches.values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals(i, ((WriteBatch) it.next()).count());
        }
    }

    @Test
    public void shouldRestoreToByteStoreForActiveTask() {
        shouldRestoreToByteStore(Task.TaskType.ACTIVE);
    }

    @Test
    public void shouldRestoreToByteStoreForStandbyTask() {
        this.context.transitionToStandby(null);
        shouldRestoreToByteStore(Task.TaskType.STANDBY);
    }

    private void shouldRestoreToByteStore(Task.TaskType taskType) {
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L)));
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(100L)));
        this.bytesStore.restoreAllInternal(arrayList);
        Assert.assertEquals(2L, this.bytesStore.getSegments().size());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new KeyValue(new Windowed("a", this.windows[0]), 50L));
        arrayList2.add(new KeyValue(new Windowed("a", this.windows[3]), 100L));
        Assert.assertEquals(arrayList2, toList(this.bytesStore.all()));
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        this.bytesStore.init(this.context, this.bytesStore);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("b", this.windows[2])), serializeValue(100L));
        this.context.setRecordContext(new ProcessorRecordContext(0L, 4L, 0, "", new RecordHeaders()));
        this.bytesStore.put(serializeKey(new Windowed<>("c", this.windows[3])), serializeValue(200L));
        Assert.assertEquals(Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("", Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 4L)}))})), this.bytesStore.getPosition());
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorSingleTopic() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        this.bytesStore.restoreAllInternal(getChangelogRecords());
        Assert.assertEquals(2L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Windowed("a", this.windows[0]), 50L));
        arrayList.add(new KeyValue(new Windowed("a", this.windows[2]), 100L));
        arrayList.add(new KeyValue(new Windowed("a", this.windows[3]), 200L));
        Assert.assertEquals(arrayList, toList(this.bytesStore.all()));
        MatcherAssert.assertThat(this.bytesStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions(""), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions(""), Matchers.hasEntry(0, 3L));
    }

    @Test
    public void shouldRestoreRecordsAndConsistencyVectorMultipleTopics() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        this.bytesStore.restoreAllInternal(getChangelogRecordsMultipleTopics());
        Assert.assertEquals(2L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Windowed("a", this.windows[0]), 50L));
        arrayList.add(new KeyValue(new Windowed("a", this.windows[2]), 100L));
        arrayList.add(new KeyValue(new Windowed("a", this.windows[3]), 200L));
        Assert.assertEquals(arrayList, toList(this.bytesStore.all()));
        MatcherAssert.assertThat(this.bytesStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("A"), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("A"), Matchers.hasEntry(0, 3L));
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("B"), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("B"), Matchers.hasEntry(0, 2L));
    }

    @Test
    public void shouldHandleTombstoneRecords() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        Assert.assertEquals(0L, this.bytesStore.getSegments().size());
        this.bytesStore.restoreAllInternal(getChangelogRecordsWithTombstones());
        Assert.assertEquals(1L, this.bytesStore.getSegments().size());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new KeyValue(new Windowed("a", this.windows[0]), 50L));
        Assert.assertEquals(arrayList, toList(this.bytesStore.all()));
        MatcherAssert.assertThat(this.bytesStore.getPosition(), Matchers.notNullValue());
        MatcherAssert.assertThat(this.bytesStore.getPosition().getPartitionPositions("A"), Matchers.hasEntry(0, 2L));
    }

    @Test
    public void shouldNotThrowWhenRestoringOnMissingHeaders() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        streamsConfig.put("__iq.consistency.offset.vector.enabled__", true);
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.String(), new StreamsMetricsImpl(new Metrics(), "mock", "latest", new MockTime()), new StreamsConfig(streamsConfig), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())), Time.SYSTEM);
        this.bytesStore = getBytesStore();
        this.bytesStore.init(this.context, this.bytesStore);
        this.bytesStore.restoreAllInternal(getChangelogRecordsWithoutHeaders());
        MatcherAssert.assertThat(this.bytesStore.getPosition(), CoreMatchers.is(Position.emptyPosition()));
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecords() {
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent("", 0, 1L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        Position withComponent2 = withComponent.withComponent("", 0, 2L);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[2]), true).get(), serializeValue(100L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2.withComponent("", 0, 3L)).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(200L), recordHeaders, Optional.empty()));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsMultipleTopics() {
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent("A", 0, 1L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        Position withComponent2 = withComponent.withComponent("B", 0, 2L);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[2]), true).get(), serializeValue(100L), recordHeaders, Optional.empty()));
        recordHeaders.remove("c");
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2.withComponent("A", 0, 3L)).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[3]), true).get(), serializeValue(200L), recordHeaders, Optional.empty()));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsWithTombstones() {
        ArrayList arrayList = new ArrayList();
        RecordHeaders recordHeaders = new RecordHeaders();
        Position withComponent = Position.emptyPosition().withComponent("A", 0, 1L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[0]), true).get(), serializeValue(50L), recordHeaders, Optional.empty()));
        Position withComponent2 = withComponent.withComponent("A", 0, 2L);
        recordHeaders.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
        recordHeaders.add(new RecordHeader("c", PositionSerde.serialize(withComponent2).array()));
        arrayList.add(new ConsumerRecord("", 0, 0L, -1L, TimestampType.NO_TIMESTAMP_TYPE, -1, -1, serializeKey(new Windowed<>("a", this.windows[2]), true).get(), (Object) null, recordHeaders, Optional.empty()));
        return arrayList;
    }

    private List<ConsumerRecord<byte[], byte[]>> getChangelogRecordsWithoutHeaders() {
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ConsumerRecord("", 0, 0L, serializeKey(new Windowed<>("a", this.windows[2])).get(), serializeValue(50L)));
        return arrayList;
    }

    @Test
    public void shouldLogAndMeasureExpiredRecords() {
        Properties streamsConfig = StreamsTestUtils.getStreamsConfig();
        AbstractDualSchemaRocksDBSegmentedBytesStore<S> bytesStore = getBytesStore();
        InternalMockProcessorContext internalMockProcessorContext = new InternalMockProcessorContext(TestUtils.tempDirectory(), new StreamsConfig(streamsConfig));
        internalMockProcessorContext.setSystemTimeMs(new SystemTime().milliseconds());
        bytesStore.init(internalMockProcessorContext, bytesStore);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        Throwable th = null;
        try {
            try {
                bytesStore.put(serializeKey(new Windowed<>("dummy", this.nextSegmentWindow)), serializeValue(0L));
                bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(5L));
                MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record for expired segment."));
                if (createAndRegister != null) {
                    if (0 != 0) {
                        try {
                            createAndRegister.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAndRegister.close();
                    }
                }
                Map metrics = internalMockProcessorContext.metrics().metrics();
                String name = Thread.currentThread().getName();
                Metric metric = (Metric) metrics.get(new MetricName("dropped-records-total", "stream-task-metrics", "", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", name), Utils.mkEntry("task-id", "0_0")})));
                Metric metric2 = (Metric) metrics.get(new MetricName("dropped-records-rate", "stream-task-metrics", "", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("thread-id", name), Utils.mkEntry("task-id", "0_0")})));
                Assert.assertEquals(Double.valueOf(1.0d), metric.metricValue());
                Assert.assertNotEquals(Double.valueOf(0.0d), metric2.metricValue());
                bytesStore.close();
            } finally {
            }
        } catch (Throwable th3) {
            if (createAndRegister != null) {
                if (th != null) {
                    try {
                        createAndRegister.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAndRegister.close();
                }
            }
            throw th3;
        }
    }

    private Set<String> segmentDirs() {
        return Utils.mkSet((Object[]) Objects.requireNonNull(new File(this.stateDir, "bytes-store").list()));
    }

    private Bytes serializeKey(Windowed<String> windowed) {
        return serializeKey(windowed, false);
    }

    private Bytes serializeKey(Windowed<String> windowed, boolean z) {
        return serializeKey(windowed, z, 0);
    }

    private Bytes serializeKey(Windowed<String> windowed, boolean z, int i) {
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        if (z) {
            return WindowKeySchema.toStoreKeyBinary(windowed, i, withBuiltinTypes);
        }
        if (getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema) {
            return PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.toStoreKeyBinary(windowed, i, withBuiltinTypes);
        }
        throw new IllegalStateException("Unrecognized serde schema");
    }

    private Bytes serializeKeyForIndex(Windowed<String> windowed) {
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        if (getIndexSchema() instanceof PrefixedWindowKeySchemas.KeyFirstWindowKeySchema) {
            return PrefixedWindowKeySchemas.KeyFirstWindowKeySchema.toStoreKeyBinary(windowed, 0, withBuiltinTypes);
        }
        throw new IllegalStateException("Unrecognized serde schema");
    }

    private byte[] serializeValue(long j) {
        return Serdes.Long().serializer().serialize("", Long.valueOf(j));
    }

    private List<KeyValue<Windowed<String>, Long>> toList(KeyValueIterator<Bytes, byte[]> keyValueIterator) {
        ArrayList arrayList = new ArrayList();
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        while (keyValueIterator.hasNext()) {
            KeyValue keyValue = (KeyValue) keyValueIterator.next();
            if (!(getBaseSchema() instanceof PrefixedWindowKeySchemas.TimeFirstWindowKeySchema)) {
                throw new IllegalStateException("Unrecognized serde schema");
            }
            arrayList.add(KeyValue.pair(PrefixedWindowKeySchemas.TimeFirstWindowKeySchema.fromStoreKey(((Bytes) keyValue.key).get(), 500L, withBuiltinTypes.keyDeserializer(), withBuiltinTypes.topic()), withBuiltinTypes.valueDeserializer().deserialize("dummy", (byte[]) keyValue.value)));
        }
        return arrayList;
    }
}
