package org.apache.kafka.streams.state.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest.class */
public class RocksDBSessionStoreTest extends AbstractSessionBytesStoreTest {
    private static final String STORE_NAME = "rocksDB session store";

    @Parameterized.Parameter
    public StoreType storeType;

    /* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBSessionStoreTest$StoreType.class */
    enum StoreType {
        RocksDBSessionStore,
        RocksDBTimeOrderedSessionStoreWithIndex,
        RocksDBTimeOrderedSessionStoreWithoutIndex
    }

    @Parameterized.Parameters(name = "{0}")
    public static Collection<Object[]> getKeySchema() {
        return Arrays.asList(new Object[]{StoreType.RocksDBSessionStore}, new Object[]{StoreType.RocksDBTimeOrderedSessionStoreWithIndex}, new Object[]{StoreType.RocksDBTimeOrderedSessionStoreWithoutIndex});
    }

    @Override // org.apache.kafka.streams.state.internals.AbstractSessionBytesStoreTest
    <K, V> SessionStore<K, V> buildSessionStore(long j, Serde<K> serde, Serde<V> serde2) {
        switch (this.storeType) {
            case RocksDBSessionStore:
                return Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, Duration.ofMillis(j)), serde, serde2).build();
            case RocksDBTimeOrderedSessionStoreWithIndex:
                return Stores.sessionStoreBuilder(new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, j, true), serde, serde2).build();
            case RocksDBTimeOrderedSessionStoreWithoutIndex:
                return Stores.sessionStoreBuilder(new RocksDbTimeOrderedSessionBytesStoreSupplier(STORE_NAME, j, false), serde, serde2).build();
            default:
                throw new IllegalStateException("Unknown StoreType: " + this.storeType);
        }
    }

    @Test
    public void shouldRemoveExpired() {
        this.sessionStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
        this.sessionStore.put(new Windowed("aa", new SessionWindow(0L, IntegrationTestUtils.DEFAULT_TIMEOUT)), 2L);
        this.sessionStore.put(new Windowed("a", new SessionWindow(10L, IntegrationTestUtils.DEFAULT_TIMEOUT)), 3L);
        this.sessionStore.put(new Windowed("aa", new SessionWindow(10L, 120000L)), 4L);
        KeyValueIterator findSessions = this.sessionStore.findSessions("a", "b", 0L, Long.MAX_VALUE);
        Throwable th = null;
        try {
            Assert.assertEquals(StreamsTestUtils.valuesToSet(findSessions), new HashSet(Arrays.asList(2L, 3L, 4L)));
            if (findSessions != null) {
                if (0 == 0) {
                    findSessions.close();
                    return;
                }
                try {
                    findSessions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (findSessions != null) {
                if (0 != 0) {
                    try {
                        findSessions.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    findSessions.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldMatchPositionAfterPut() {
        WrappedStateStore wrapped = this.sessionStore.wrapped().wrapped();
        this.context.setRecordContext(new ProcessorRecordContext(0L, 1L, 0, "", new RecordHeaders()));
        this.sessionStore.put(new Windowed("a", new SessionWindow(0L, 0L)), 1L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 2L, 0, "", new RecordHeaders()));
        this.sessionStore.put(new Windowed("aa", new SessionWindow(0L, IntegrationTestUtils.DEFAULT_TIMEOUT)), 2L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, 3L, 0, "", new RecordHeaders()));
        this.sessionStore.put(new Windowed("a", new SessionWindow(10L, IntegrationTestUtils.DEFAULT_TIMEOUT)), 3L);
        Assert.assertEquals(Position.fromMap(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("", Utils.mkMap(new Map.Entry[]{Utils.mkEntry(0, 3L)}))})), wrapped.getPosition());
    }
}
