package org.apache.kafka.streams.kstream.internals;

import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.internals.CacheFlushListener;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
import org.easymock.EasyMock;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/SessionTupleForwarderTest.class */
public class SessionTupleForwarderTest {
    @Test
    public void shouldSetFlushListenerOnWrappedStateStore() {
        setFlushListener(true);
        setFlushListener(false);
    }

    private void setFlushListener(boolean z) {
        WrappedStateStore wrappedStateStore = (WrappedStateStore) EasyMock.mock(WrappedStateStore.class);
        SessionCacheFlushListener sessionCacheFlushListener = (SessionCacheFlushListener) EasyMock.mock(SessionCacheFlushListener.class);
        EasyMock.expect(Boolean.valueOf(wrappedStateStore.setFlushListener(sessionCacheFlushListener, z))).andReturn(false);
        EasyMock.replay(new Object[]{wrappedStateStore});
        new SessionTupleForwarder(wrappedStateStore, (ProcessorContext) null, sessionCacheFlushListener, z);
        EasyMock.verify(new Object[]{wrappedStateStore});
    }

    @Test
    public void shouldForwardRecordsIfWrappedStateStoreDoesNotCache() {
        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(false);
        shouldForwardRecordsIfWrappedStateStoreDoesNotCache(true);
    }

    private void shouldForwardRecordsIfWrappedStateStoreDoesNotCache(boolean z) {
        WrappedStateStore wrappedStateStore = (WrappedStateStore) EasyMock.mock(WrappedStateStore.class);
        ProcessorContext processorContext = (ProcessorContext) EasyMock.mock(ProcessorContext.class);
        EasyMock.expect(Boolean.valueOf(wrappedStateStore.setFlushListener((CacheFlushListener) null, z))).andReturn(false);
        if (z) {
            processorContext.forward(new Record(new Windowed("key", new SessionWindow(21L, 42L)), new Change("value", "oldValue"), 42L));
        } else {
            processorContext.forward(new Record(new Windowed("key", new SessionWindow(21L, 42L)), new Change("value", (Object) null), 42L));
        }
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{wrappedStateStore, processorContext});
        new SessionTupleForwarder(wrappedStateStore, processorContext, (CacheFlushListener) null, z).maybeForward(new Record(new Windowed("key", new SessionWindow(21L, 42L)), new Change("value", "oldValue"), 42L));
        EasyMock.verify(new Object[]{wrappedStateStore, processorContext});
    }

    @Test
    public void shouldNotForwardRecordsIfWrappedStateStoreDoesCache() {
        WrappedStateStore wrappedStateStore = (WrappedStateStore) EasyMock.mock(WrappedStateStore.class);
        ProcessorContext processorContext = (ProcessorContext) EasyMock.mock(ProcessorContext.class);
        EasyMock.expect(Boolean.valueOf(wrappedStateStore.setFlushListener((CacheFlushListener) null, false))).andReturn(true);
        EasyMock.replay(new Object[]{wrappedStateStore, processorContext});
        new SessionTupleForwarder(wrappedStateStore, processorContext, (CacheFlushListener) null, false).maybeForward(new Record(new Windowed("key", new SessionWindow(21L, 42L)), new Change("value", "oldValue"), 42L));
        EasyMock.verify(new Object[]{wrappedStateStore, processorContext});
    }
}
