/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Window;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.Change;
import org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregate;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.internals.RocksDBSessionStoreSupplier;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class KStreamSessionWindowAggregateProcessorTest {
    private static final long GAP_MS = 300000L;
    private static final String STORE_NAME = "session-store";
    private final Initializer<Long> initializer = new Initializer<Long>(){

        public Long apply() {
            return 0L;
        }
    };
    private final Aggregator<String, String, Long> aggregator = new Aggregator<String, String, Long>(){

        public Long apply(String aggKey, String value, Long aggregate) {
            return aggregate + 1L;
        }
    };
    private final Merger<String, Long> sessionMerger = new Merger<String, Long>(){

        public Long apply(String aggKey, Long aggOne, Long aggTwo) {
            return aggOne + aggTwo;
        }
    };
    private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = new KStreamSessionWindowAggregate(SessionWindows.with((long)300000L).until(900000L), "session-store", this.initializer, this.aggregator, this.sessionMerger);
    private final List<KeyValue> results = new ArrayList<KeyValue>();
    private Processor<String, String> processor = this.sessionAggregator.get();
    private SessionStore<String, Long> sessionStore;
    private MockProcessorContext context;

    @Before
    public void initializeStore() {
        File stateDir = TestUtils.tempDirectory();
        this.context = new MockProcessorContext(stateDir, Serdes.String(), Serdes.String(), new NoOpRecordCollector(), new ThreadCache(new LogContext("testCache "), 100000L, (StreamsMetrics)new MockStreamsMetrics(new Metrics()))){

            @Override
            public <K, V> void forward(K key, V value) {
                KStreamSessionWindowAggregateProcessorTest.this.results.add(KeyValue.pair(key, value));
            }
        };
        this.initStore(true);
        this.processor.init((ProcessorContext)this.context);
    }

    private void initStore(boolean enableCaching) {
        RocksDBSessionStoreSupplier supplier = new RocksDBSessionStoreSupplier(STORE_NAME, 900000L, Serdes.String(), Serdes.Long(), false, Collections.emptyMap(), enableCaching);
        this.sessionStore = supplier.get();
        this.sessionStore.init((ProcessorContext)this.context, this.sessionStore);
    }

    @After
    public void closeStore() {
        this.context.close();
        this.sessionStore.close();
    }

    @Test
    public void shouldCreateSingleSessionWhenWithinGap() {
        this.context.setTime(0L);
        this.processor.process((Object)"john", (Object)"first");
        this.context.setTime(500L);
        this.processor.process((Object)"john", (Object)"second");
        KeyValueIterator values = this.sessionStore.findSessions((Object)"john", 0L, 2000L);
        Assert.assertTrue((boolean)values.hasNext());
        Assert.assertEquals((Object)2L, (Object)((KeyValue)values.next()).value);
    }

    @Test
    public void shouldMergeSessions() {
        this.context.setTime(0L);
        String sessionId = "mel";
        this.processor.process((Object)"mel", (Object)"first");
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 0L, 0L).hasNext());
        this.context.setTime(300001L);
        this.processor.process((Object)"mel", (Object)"second");
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 300001L, 300001L).hasNext());
        Assert.assertTrue((boolean)this.sessionStore.findSessions((Object)"mel", 0L, 0L).hasNext());
        this.context.setTime(150000L);
        this.processor.process((Object)"mel", (Object)"third");
        KeyValueIterator iterator = this.sessionStore.findSessions((Object)"mel", 0L, 300001L);
        KeyValue kv = (KeyValue)iterator.next();
        Assert.assertEquals((Object)3L, (Object)kv.value);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldUpdateSessionIfTheSameTime() {
        this.context.setTime(0L);
        this.processor.process((Object)"mel", (Object)"first");
        this.processor.process((Object)"mel", (Object)"second");
        KeyValueIterator iterator = this.sessionStore.findSessions((Object)"mel", 0L, 0L);
        Assert.assertEquals((Object)2L, (Object)((KeyValue)iterator.next()).value);
        Assert.assertFalse((boolean)iterator.hasNext());
    }

    @Test
    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
        String sessionId = "mel";
        long time = 0L;
        this.context.setTime(time);
        this.processor.process((Object)"mel", (Object)"first");
        this.context.setTime(time += 300001L);
        this.processor.process((Object)"mel", (Object)"second");
        this.processor.process((Object)"mel", (Object)"second");
        this.context.setTime(time += 300001L);
        this.processor.process((Object)"mel", (Object)"third");
        this.processor.process((Object)"mel", (Object)"third");
        this.processor.process((Object)"mel", (Object)"third");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"mel", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"mel", (Window)new SessionWindow(300001L, 300001L)), (Object)new Change((Object)2L, null)), KeyValue.pair((Object)new Windowed((Object)"mel", (Window)new SessionWindow(time, time)), (Object)new Change((Object)3L, null))), this.results);
    }

    @Test
    public void shouldRemoveMergedSessionsFromStateStore() {
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        KeyValueIterator a1 = this.sessionStore.findSessions((Object)"a", 0L, 0L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)1L), (Object)a1.next());
        this.context.setTime(100L);
        this.processor.process((Object)"a", (Object)"2");
        KeyValueIterator a2 = this.sessionStore.findSessions((Object)"a", 0L, 100L);
        Assert.assertEquals((Object)KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 100L)), (Object)2L), (Object)a2.next());
        Assert.assertFalse((boolean)a2.hasNext());
    }

    @Test
    public void shouldHandleMultipleSessionsAndMerging() {
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.processor.process((Object)"b", (Object)"1");
        this.processor.process((Object)"c", (Object)"1");
        this.processor.process((Object)"d", (Object)"1");
        this.context.setTime(150000L);
        this.processor.process((Object)"d", (Object)"2");
        this.context.setTime(300001L);
        this.processor.process((Object)"a", (Object)"2");
        this.processor.process((Object)"b", (Object)"2");
        this.context.setTime(450001L);
        this.processor.process((Object)"a", (Object)"3");
        this.processor.process((Object)"c", (Object)"3");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"d", (Window)new SessionWindow(0L, 150000L)), (Object)new Change((Object)2L, null)), KeyValue.pair((Object)new Windowed((Object)"b", (Window)new SessionWindow(300001L, 300001L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(300001L, 450001L)), (Object)new Change((Object)2L, null)), KeyValue.pair((Object)new Windowed((Object)"c", (Window)new SessionWindow(450001L, 450001L)), (Object)new Change((Object)1L, null))), this.results);
    }

    @Test
    public void shouldGetAggregatedValuesFromValueGetter() {
        KTableValueGetter getter = this.sessionAggregator.view().get();
        getter.init((ProcessorContext)this.context);
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.context.setTime(300001L);
        this.processor.process((Object)"a", (Object)"1");
        this.processor.process((Object)"a", (Object)"2");
        long t0 = (Long)getter.get((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)));
        long t1 = (Long)getter.get((Object)new Windowed((Object)"a", (Window)new SessionWindow(300001L, 300001L)));
        Assert.assertEquals((long)1L, (long)t0);
        Assert.assertEquals((long)2L, (long)t1);
    }

    @Test
    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
        this.initStore(false);
        this.processor.init((ProcessorContext)this.context);
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.processor.process((Object)"b", (Object)"1");
        this.processor.process((Object)"c", (Object)"1");
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"b", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"c", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null))), this.results);
    }

    @Test
    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
        this.initStore(false);
        this.processor.init((ProcessorContext)this.context);
        this.context.setTime(0L);
        this.processor.process((Object)"a", (Object)"1");
        this.context.setTime(5L);
        this.processor.process((Object)"a", (Object)"1");
        Assert.assertEquals(Arrays.asList(KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)new Change((Object)1L, null)), KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 0L)), (Object)new Change(null, null)), KeyValue.pair((Object)new Windowed((Object)"a", (Window)new SessionWindow(0L, 5L)), (Object)new Change((Object)2L, null))), this.results);
    }
}

