/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.function.Supplier;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.test.TestUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class SessionStoreFetchTest {
    private static final String STORE_NAME = "store";
    private static final int DATA_SIZE = 5;
    private static final long WINDOW_SIZE = 500L;
    private static final long RETENTION_MS = 10000L;
    private StoreType storeType;
    private boolean enableLogging;
    private boolean enableCaching;
    private boolean forward;
    private LinkedList<KeyValue<Windowed<String>, Long>> expectedRecords;
    private LinkedList<KeyValue<String, String>> records;
    private Properties streamsConfig;
    @Rule
    public TestName testName = new TestName();

    public SessionStoreFetchTest(StoreType storeType, boolean enableLogging, boolean enableCaching, boolean forward) {
        this.storeType = storeType;
        this.enableLogging = enableLogging;
        this.enableCaching = enableCaching;
        this.forward = forward;
        this.records = new LinkedList();
        this.expectedRecords = new LinkedList();
        int m = 2;
        for (int i = 0; i < 5; ++i) {
            String keyStr = i < 2 ? "a" : "b";
            String key = "key-" + keyStr;
            String key2 = "key-" + keyStr + keyStr;
            String value = "val-" + i;
            KeyValue r = new KeyValue((Object)key, (Object)value);
            KeyValue r2 = new KeyValue((Object)key2, (Object)value);
            this.records.add((KeyValue<String, String>)r);
            this.records.add((KeyValue<String, String>)r2);
        }
        this.expectedRecords.add((KeyValue<Windowed<String>, Long>)new KeyValue((Object)new Windowed((Object)"key-a", (Window)new SessionWindow(0L, 500L)), (Object)4L));
        this.expectedRecords.add((KeyValue<Windowed<String>, Long>)new KeyValue((Object)new Windowed((Object)"key-aa", (Window)new SessionWindow(0L, 500L)), (Object)4L));
        this.expectedRecords.add((KeyValue<Windowed<String>, Long>)new KeyValue((Object)new Windowed((Object)"key-b", (Window)new SessionWindow(1500L, 2000L)), (Object)6L));
        this.expectedRecords.add((KeyValue<Windowed<String>, Long>)new KeyValue((Object)new Windowed((Object)"key-bb", (Window)new SessionWindow(1500L, 2000L)), (Object)6L));
    }

    @Parameterized.Parameters(name="storeType={0}, enableLogging={1}, enableCaching={2}, forward={3}")
    public static Collection<Object[]> data() {
        List<StoreType> types = Arrays.asList(StoreType.InMemory, StoreType.RocksDB);
        List<Boolean> logging = Arrays.asList(true, false);
        List<Boolean> caching = Arrays.asList(true, false);
        List<Boolean> forward = Arrays.asList(true, false);
        return SessionStoreFetchTest.buildParameters(types, logging, caching, forward);
    }

    @Before
    public void setup() {
        this.streamsConfig = Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"state.dir", (Object)TestUtils.tempDirectory().getPath())}));
    }

    @Test
    public void testStoreConfig() {
        Materialized<String, Long, SessionStore<Bytes, byte[]>> stateStoreConfig = this.getStoreConfig(this.storeType, STORE_NAME, this.enableLogging, this.enableCaching);
        StreamsBuilder builder = new StreamsBuilder();
        KStream stream = builder.stream("input", Consumed.with((Serde)Serdes.String(), (Serde)Serdes.String()));
        stream.groupByKey(Grouped.with((Serde)Serdes.String(), (Serde)Serdes.String())).windowedBy(SessionWindows.ofInactivityGapWithNoGrace((Duration)Duration.ofMillis(500L))).count(stateStoreConfig).toStream().to("output");
        Topology topology = builder.build();
        try (TopologyTestDriver driver = new TopologyTestDriver(topology);){
            Iterator<Object> dataIterator;
            TestInputTopic input = driver.createInputTopic("input", (Serializer)new StringSerializer(), (Serializer)new StringSerializer());
            SessionStore stateStore = driver.getSessionStore(STORE_NAME);
            int medium = 4;
            for (int i = 0; i < this.records.size(); ++i) {
                KeyValue<String, String> kv = this.records.get(i);
                long windowStartTime = i < 4 ? 0L : 1500L;
                input.pipeInput(kv.key, kv.value, windowStartTime);
                input.pipeInput(kv.key, kv.value, windowStartTime + 500L);
            }
            try (KeyValueIterator scanIterator = this.forward ? stateStore.fetch((Object)"key-a", (Object)"key-bb") : stateStore.backwardFetch((Object)"key-a", (Object)"key-bb");){
                dataIterator = this.forward ? this.expectedRecords.iterator() : this.expectedRecords.descendingIterator();
                TestUtils.checkEquals((Iterator)scanIterator, dataIterator);
            }
            scanIterator = this.forward ? stateStore.findSessions((Object)"key-a", (Object)"key-bb", 0L, Long.MAX_VALUE) : stateStore.backwardFindSessions((Object)"key-a", (Object)"key-bb", 0L, Long.MAX_VALUE);
            var11_14 = null;
            try {
                dataIterator = this.forward ? this.expectedRecords.iterator() : this.expectedRecords.descendingIterator();
                TestUtils.checkEquals((Iterator)scanIterator, dataIterator);
            }
            catch (Throwable throwable) {
                var11_14 = throwable;
                throw throwable;
            }
            finally {
                if (scanIterator != null) {
                    if (var11_14 != null) {
                        try {
                            scanIterator.close();
                        }
                        catch (Throwable throwable) {
                            var11_14.addSuppressed(throwable);
                        }
                    } else {
                        scanIterator.close();
                    }
                }
            }
        }
    }

    private static Collection<Object[]> buildParameters(List<?> ... argOptions) {
        List<Object[]> result = new LinkedList<Object[]>();
        result.add(new Object[0]);
        for (List<?> argOption : argOptions) {
            result = SessionStoreFetchTest.times(result, argOption);
        }
        return result;
    }

    private static List<Object[]> times(List<Object[]> left, List<?> right) {
        LinkedList<Object[]> result = new LinkedList<Object[]>();
        for (Object[] args : left) {
            for (Object rightElem : right) {
                Object[] resArgs = new Object[args.length + 1];
                System.arraycopy(args, 0, resArgs, 0, args.length);
                resArgs[args.length] = rightElem;
                result.add(resArgs);
            }
        }
        return result;
    }

    private Materialized<String, Long, SessionStore<Bytes, byte[]>> getStoreConfig(StoreType type, String name, boolean cachingEnabled, boolean loggingEnabled) {
        Supplier<SessionBytesStoreSupplier> createStore = () -> {
            if (type == StoreType.InMemory) {
                return Stores.inMemorySessionStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L));
            }
            if (type == StoreType.RocksDB) {
                return Stores.persistentSessionStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L));
            }
            return Stores.inMemorySessionStore((String)STORE_NAME, (Duration)Duration.ofMillis(10000L));
        };
        SessionBytesStoreSupplier stateStoreSupplier = createStore.get();
        Materialized stateStoreConfig = Materialized.as((SessionBytesStoreSupplier)stateStoreSupplier).withKeySerde(Serdes.String()).withValueSerde(Serdes.Long());
        if (cachingEnabled) {
            stateStoreConfig.withCachingEnabled();
        } else {
            stateStoreConfig.withCachingDisabled();
        }
        if (loggingEnabled) {
            stateStoreConfig.withLoggingEnabled(new HashMap());
        } else {
            stateStoreConfig.withLoggingDisabled();
        }
        return stateStoreConfig;
    }

    private static enum StoreType {
        InMemory,
        RocksDB;

    }
}

