package org.apache.kafka.streams.kstream.internals;

import java.io.File;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.Aggregator;
import org.apache.kafka.streams.kstream.Initializer;
import org.apache.kafka.streams.kstream.Merger;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.processor.internals.testutil.LogCaptureAppender;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StoreBuilder;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.NoOpRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.class */
public class KStreamSessionWindowAggregateProcessorTest {
    private static final long GAP_MS = 300000;
    private static final String STORE_NAME = "session-store";
    private final Initializer<Long> initializer = () -> {
        return 0L;
    };
    private final Aggregator<String, String, Long> aggregator = (str, str2, l) -> {
        return Long.valueOf(l.longValue() + 1);
    };
    private final Merger<String, Long> sessionMerger = (str, l, l2) -> {
        return Long.valueOf(l.longValue() + l2.longValue());
    };
    private final KStreamSessionWindowAggregate<String, String, Long> sessionAggregator = new KStreamSessionWindowAggregate<>(SessionWindows.with(Duration.ofMillis(GAP_MS)), STORE_NAME, this.initializer, this.aggregator, this.sessionMerger);
    private final List<KeyValue> results = new ArrayList();
    private final Processor<String, String> processor = this.sessionAggregator.get();
    private SessionStore<String, Long> sessionStore;
    private InternalMockProcessorContext context;
    private Metrics metrics;

    @Before
    public void initializeStore() {
        File tempDirectory = TestUtils.tempDirectory();
        this.metrics = new Metrics();
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(this.metrics);
        this.context = new InternalMockProcessorContext(tempDirectory, Serdes.String(), Serdes.String(), mockStreamsMetrics, new StreamsConfig(StreamsTestUtils.getStreamsConfig()), NoOpRecordCollector::new, new ThreadCache(new LogContext("testCache "), 100000L, mockStreamsMetrics)) { // from class: org.apache.kafka.streams.kstream.internals.KStreamSessionWindowAggregateProcessorTest.1
            @Override // org.apache.kafka.test.InternalMockProcessorContext
            public <K, V> void forward(K k, V v) {
                KStreamSessionWindowAggregateProcessorTest.this.results.add(KeyValue.pair(k, v));
            }
        };
        initStore(true);
        this.processor.init(this.context);
    }

    private void initStore(boolean z) {
        StoreBuilder withLoggingDisabled = Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME, Duration.ofMillis(900000L)), Serdes.String(), Serdes.Long()).withLoggingDisabled();
        if (z) {
            withLoggingDisabled.withCachingEnabled();
        }
        this.sessionStore = withLoggingDisabled.build();
        this.sessionStore.init(this.context, this.sessionStore);
    }

    @After
    public void closeStore() {
        this.sessionStore.close();
    }

    @Test
    public void shouldCreateSingleSessionWhenWithinGap() {
        this.context.setTime(0L);
        this.processor.process("john", "first");
        this.context.setTime(500L);
        this.processor.process("john", "second");
        KeyValueIterator findSessions = this.sessionStore.findSessions("john", 0L, 2000L);
        Assert.assertTrue(findSessions.hasNext());
        Assert.assertEquals(2L, ((KeyValue) findSessions.next()).value);
    }

    @Test
    public void shouldMergeSessions() {
        this.context.setTime(0L);
        this.processor.process("mel", "first");
        Assert.assertTrue(this.sessionStore.findSessions("mel", 0L, 0L).hasNext());
        this.context.setTime(300001L);
        this.processor.process("mel", "second");
        Assert.assertTrue(this.sessionStore.findSessions("mel", 300001L, 300001L).hasNext());
        Assert.assertTrue(this.sessionStore.findSessions("mel", 0L, 0L).hasNext());
        this.context.setTime(150000L);
        this.processor.process("mel", "third");
        KeyValueIterator findSessions = this.sessionStore.findSessions("mel", 0L, 300001L);
        Assert.assertEquals(3L, ((KeyValue) findSessions.next()).value);
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldUpdateSessionIfTheSameTime() {
        this.context.setTime(0L);
        this.processor.process("mel", "first");
        this.processor.process("mel", "second");
        KeyValueIterator findSessions = this.sessionStore.findSessions("mel", 0L, 0L);
        Assert.assertEquals(2L, ((KeyValue) findSessions.next()).value);
        Assert.assertFalse(findSessions.hasNext());
    }

    /* JADX WARN: Type inference failed for: r0v13, types: [long, org.apache.kafka.test.InternalMockProcessorContext] */
    /* JADX WARN: Type inference failed for: r0v7, types: [long, org.apache.kafka.test.InternalMockProcessorContext] */
    @Test
    public void shouldHaveMultipleSessionsForSameIdWhenTimestampApartBySessionGap() {
        this.context.setTime(0L);
        this.processor.process("mel", "first");
        ?? r0 = this.context;
        r0.setTime(0 + 300001);
        this.processor.process("mel", "second");
        this.processor.process("mel", "second");
        ?? r02 = this.context;
        r02.setTime(r0 + 300001);
        this.processor.process("mel", "third");
        this.processor.process("mel", "third");
        this.processor.process("mel", "third");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("mel", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("mel", new SessionWindow(300001L, 300001L)), new Change(2L, (Object) null)), KeyValue.pair(new Windowed("mel", new SessionWindow((long) r02, (long) r02)), new Change(3L, (Object) null))), this.results);
    }

    @Test
    public void shouldRemoveMergedSessionsFromStateStore() {
        this.context.setTime(0L);
        this.processor.process("a", "1");
        Assert.assertEquals(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), 1L), this.sessionStore.findSessions("a", 0L, 0L).next());
        this.context.setTime(100L);
        this.processor.process("a", "2");
        KeyValueIterator findSessions = this.sessionStore.findSessions("a", 0L, 100L);
        Assert.assertEquals(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 100L)), 2L), findSessions.next());
        Assert.assertFalse(findSessions.hasNext());
    }

    @Test
    public void shouldHandleMultipleSessionsAndMerging() {
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.processor.process("b", "1");
        this.processor.process("c", "1");
        this.processor.process("d", "1");
        this.context.setTime(150000L);
        this.processor.process("d", "2");
        this.context.setTime(300001L);
        this.processor.process("a", "2");
        this.processor.process("b", "2");
        this.context.setTime(450001L);
        this.processor.process("a", "3");
        this.processor.process("c", "3");
        this.sessionStore.flush();
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("b", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("c", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("d", new SessionWindow(0L, 150000L)), new Change(2L, (Object) null)), KeyValue.pair(new Windowed("b", new SessionWindow(300001L, 300001L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("a", new SessionWindow(300001L, 450001L)), new Change(2L, (Object) null)), KeyValue.pair(new Windowed("c", new SessionWindow(450001L, 450001L)), new Change(1L, (Object) null))), this.results);
    }

    @Test
    public void shouldGetAggregatedValuesFromValueGetter() {
        KTableValueGetter kTableValueGetter = this.sessionAggregator.view().get();
        kTableValueGetter.init(this.context);
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.context.setTime(300001L);
        this.processor.process("a", "1");
        this.processor.process("a", "2");
        long longValue = ((Long) kTableValueGetter.get(new Windowed("a", new SessionWindow(0L, 0L)))).longValue();
        long longValue2 = ((Long) kTableValueGetter.get(new Windowed("a", new SessionWindow(300001L, 300001L)))).longValue();
        Assert.assertEquals(1L, longValue);
        Assert.assertEquals(2L, longValue2);
    }

    @Test
    public void shouldImmediatelyForwardNewSessionWhenNonCachedStore() {
        initStore(false);
        this.processor.init(this.context);
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.processor.process("b", "1");
        this.processor.process("c", "1");
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("b", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("c", new SessionWindow(0L, 0L)), new Change(1L, (Object) null))), this.results);
    }

    @Test
    public void shouldImmediatelyForwardRemovedSessionsWhenMerging() {
        initStore(false);
        this.processor.init(this.context);
        this.context.setTime(0L);
        this.processor.process("a", "1");
        this.context.setTime(5L);
        this.processor.process("a", "1");
        Assert.assertEquals(Arrays.asList(KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), new Change(1L, (Object) null)), KeyValue.pair(new Windowed("a", new SessionWindow(0L, 0L)), new Change((Object) null, (Object) null)), KeyValue.pair(new Windowed("a", new SessionWindow(0L, 5L)), new Change(2L, (Object) null))), this.results);
    }

    @Test
    public void shouldLogAndMeterWhenSkippingNullKey() {
        initStore(false);
        this.processor.init(this.context);
        this.context.setRecordContext(new ProcessorRecordContext(-1L, -2L, -3, "topic", (Headers) null));
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        this.processor.process((Object) null, "1");
        LogCaptureAppender.unregister(createAndRegister);
        Assert.assertEquals(Double.valueOf(1.0d), StreamsTestUtils.getMetricByName(this.context.metrics().metrics(), "skipped-records-total", "stream-metrics").metricValue());
        MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record due to null key. value=[1] topic=[topic] partition=[-3] offset=[-2]"));
    }

    @Test
    public void shouldLogAndMeterWhenSkippingLateRecord() {
        LogCaptureAppender.setClassLoggerToDebug(KStreamSessionWindowAggregate.class);
        LogCaptureAppender createAndRegister = LogCaptureAppender.createAndRegister();
        Processor processor = new KStreamSessionWindowAggregate(SessionWindows.with(Duration.ofMillis(10L)).grace(Duration.ofMillis(10L)), STORE_NAME, this.initializer, this.aggregator, this.sessionMerger).get();
        initStore(false);
        processor.init(this.context);
        this.context.setStreamTime(20L);
        this.context.setRecordContext(new ProcessorRecordContext(0L, -2L, -3, "topic", (Headers) null));
        processor.process("A", "1");
        this.context.setRecordContext(new ProcessorRecordContext(1L, -2L, -3, "topic", (Headers) null));
        processor.process("A", "1");
        LogCaptureAppender.unregister(createAndRegister);
        MatcherAssert.assertThat(((KafkaMetric) this.metrics.metrics().get(new MetricName("late-record-drop-total", "stream-processor-node-metrics", "The total number of occurrence of late-record-drop operations.", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client-id", "test"), Utils.mkEntry("task-id", "0_0"), Utils.mkEntry("processor-node-id", "TESTING_NODE")})))).metricValue(), CoreMatchers.is(Double.valueOf(2.0d)));
        MatcherAssert.assertThat((Double) ((KafkaMetric) this.metrics.metrics().get(new MetricName("late-record-drop-rate", "stream-processor-node-metrics", "The average number of occurrence of late-record-drop operations.", Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client-id", "test"), Utils.mkEntry("task-id", "0_0"), Utils.mkEntry("processor-node-id", "TESTING_NODE")})))).metricValue(), Matchers.greaterThan(Double.valueOf(0.0d)));
        MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0] window=[0,0) expiration=[10]"));
        MatcherAssert.assertThat(createAndRegister.getMessages(), CoreMatchers.hasItem("Skipping record for expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1] window=[1,1) expiration=[10]"));
    }
}
