package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.SimpleTimeZone;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.internals.SegmentedBytesStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBSegmentedBytesStoreTest.class */
public class RocksDBSegmentedBytesStoreTest {
    private InternalMockProcessorContext context;
    private RocksDBSegmentedBytesStore bytesStore;
    private File stateDir;

    @Parameterized.Parameter
    public SegmentedBytesStore.KeySchema schema;
    private final long retention = 1000;
    private final int numSegments = 3;
    private final String storeName = "bytes-store";
    private long windowSizeForTimeWindow = 500;
    private final Window[] windows = new Window[4];

    @Parameterized.Parameters(name = "{0}")
    public static Object[] getKeySchemas() {
        return new Object[]{new SessionKeySchema(), new WindowKeySchema()};
    }

    @Before
    public void before() {
        this.schema.init("topic");
        if (this.schema instanceof SessionKeySchema) {
            this.windows[0] = new SessionWindow(10L, 10L);
            this.windows[1] = new SessionWindow(500L, 1000L);
            this.windows[2] = new SessionWindow(1000L, 1500L);
            this.windows[3] = new SessionWindow(IntegrationTestUtils.DEFAULT_TIMEOUT, 60000L);
        }
        if (this.schema instanceof WindowKeySchema) {
            this.windows[0] = WindowKeySchema.timeWindowForSize(10L, this.windowSizeForTimeWindow);
            this.windows[1] = WindowKeySchema.timeWindowForSize(500L, this.windowSizeForTimeWindow);
            this.windows[2] = WindowKeySchema.timeWindowForSize(1000L, this.windowSizeForTimeWindow);
            this.windows[3] = WindowKeySchema.timeWindowForSize(60000L, this.windowSizeForTimeWindow);
        }
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 1000L, 3, this.schema);
        this.stateDir = TestUtils.tempDirectory();
        this.context = new InternalMockProcessorContext(this.stateDir, Serdes.String(), Serdes.Long(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 0L, new MockStreamsMetrics(new Metrics())));
        this.bytesStore.init(this.context, this.bytesStore);
    }

    @After
    public void close() {
        this.bytesStore.close();
    }

    @Test
    public void shouldPutAndFetch() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(100L));
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 500L)));
    }

    @Test
    public void shouldFindValuesWithinRange() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(10L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(100L));
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 10L), KeyValue.pair(new Windowed("a", this.windows[1]), 50L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 1L, 999L)));
    }

    @Test
    public void shouldRemove() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(30L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(50L));
        this.bytesStore.remove(serializeKey(new Windowed<>("a", this.windows[0])));
        Assert.assertFalse(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 100L).hasNext());
    }

    @Test
    public void shouldRollSegments() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(100L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[2])), serializeValue(500L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(1000L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(0L), segments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[1]), 100L), KeyValue.pair(new Windowed("a", this.windows[2]), 500L)), toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 1500L)));
    }

    @Test
    public void shouldGetAllSegments() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(0L), segments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L)), toList(this.bytesStore.all()));
    }

    @Test
    public void shouldFetchAllSegments() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        Assert.assertEquals(Collections.singleton(segments.segmentName(0L)), segmentDirs());
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        Assert.assertEquals(Utils.mkSet(new String[]{segments.segmentName(0L), segments.segmentName(1L)}), segmentDirs());
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L)), toList(this.bytesStore.fetchAll(0L, 60000L)));
    }

    @Test
    public void shouldLoadSegementsWithOldStyleDateFormattedName() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        this.bytesStore.close();
        String segmentName = segments.segmentName(0L);
        String[] split = segmentName.split("\\.");
        Long valueOf = Long.valueOf(Long.parseLong(split[1]));
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMddHHmm");
        simpleDateFormat.setTimeZone(new SimpleTimeZone(0, "UTC"));
        String format = simpleDateFormat.format(new Date(valueOf.longValue() * Segments.segmentInterval(1000L, 3)));
        File file = new File(this.stateDir, "bytes-store");
        Assert.assertTrue(new File(file, segmentName).renameTo(new File(file, split[0] + "-" + format)));
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 1000L, 3, this.schema);
        this.bytesStore.init(this.context, this.bytesStore);
        MatcherAssert.assertThat(toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 60000L)), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L))));
    }

    @Test
    public void shouldLoadSegementsWithOldStyleColonFormattedName() {
        Segments segments = new Segments("bytes-store", 1000L, 3);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[3])), serializeValue(100L));
        this.bytesStore.close();
        String segmentName = segments.segmentName(0L);
        String[] split = segmentName.split("\\.");
        File file = new File(this.stateDir, "bytes-store");
        Assert.assertTrue(new File(file, segmentName).renameTo(new File(file, split[0] + ":" + Long.parseLong(split[1]))));
        this.bytesStore = new RocksDBSegmentedBytesStore("bytes-store", 1000L, 3, this.schema);
        this.bytesStore.init(this.context, this.bytesStore);
        MatcherAssert.assertThat(toList(this.bytesStore.fetch(Bytes.wrap("a".getBytes()), 0L, 60000L)), CoreMatchers.equalTo(Arrays.asList(KeyValue.pair(new Windowed("a", this.windows[0]), 50L), KeyValue.pair(new Windowed("a", this.windows[3]), 100L))));
    }

    @Test
    public void shouldBeAbleToWriteToReInitializedStore() {
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[0])), serializeValue(50L));
        this.bytesStore.close();
        this.bytesStore.init(this.context, this.bytesStore);
        this.bytesStore.put(serializeKey(new Windowed<>("a", this.windows[1])), serializeValue(100L));
    }

    private Set<String> segmentDirs() {
        return new HashSet(Arrays.asList(new File(this.stateDir, "bytes-store").list()));
    }

    private byte[] serializeValue(long j) {
        return Serdes.Long().serializer().serialize("", Long.valueOf(j));
    }

    private Bytes serializeKey(Windowed<String> windowed) {
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        return this.schema instanceof SessionKeySchema ? Bytes.wrap(SessionKeySchema.toBinary(windowed, withBuiltinTypes.keySerializer(), "dummy")) : WindowKeySchema.toStoreKeyBinary(windowed, 0, withBuiltinTypes);
    }

    private List<KeyValue<Windowed<String>, Long>> toList(KeyValueIterator<Bytes, byte[]> keyValueIterator) {
        ArrayList arrayList = new ArrayList();
        StateSerdes withBuiltinTypes = StateSerdes.withBuiltinTypes("dummy", String.class, Long.class);
        while (keyValueIterator.hasNext()) {
            KeyValue keyValue = (KeyValue) keyValueIterator.next();
            if (this.schema instanceof WindowKeySchema) {
                arrayList.add(KeyValue.pair(WindowKeySchema.fromStoreKey(((Bytes) keyValue.key).get(), this.windowSizeForTimeWindow, withBuiltinTypes), withBuiltinTypes.valueDeserializer().deserialize("dummy", (byte[]) keyValue.value)));
            } else {
                arrayList.add(KeyValue.pair(SessionKeySchema.from(((Bytes) keyValue.key).get(), withBuiltinTypes.keyDeserializer(), "dummy"), withBuiltinTypes.valueDeserializer().deserialize("dummy", (byte[]) keyValue.value)));
            }
        }
        return arrayList;
    }
}
