package org.apache.kafka.streams.state.internals;

import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRecordCollector;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStoreTest.class */
public class MeteredTimestampedWindowStoreTest {
    private static final String STORE_NAME = "mocked-store";
    private static final String STORE_TYPE = "scope";
    private static final String CHANGELOG_TOPIC = "changelog-topic";
    private static final String KEY = "key";
    private static final Bytes KEY_BYTES = Bytes.wrap(KEY.getBytes());
    private static final long TIMESTAMP = 97;
    private static final ValueAndTimestamp<String> VALUE_AND_TIMESTAMP = ValueAndTimestamp.make("value", TIMESTAMP);
    private static final byte[] VALUE_AND_TIMESTAMP_BYTES = "��������������avalue".getBytes();
    private static final int WINDOW_SIZE_MS = 10;
    private InternalMockProcessorContext context;

    @Mock
    private WindowStore<Bytes, byte[]> innerStoreMock;
    private MeteredTimestampedWindowStore<String, String> store;
    private final TaskId taskId = new TaskId(0, 0, "My-Topology");
    private final Metrics metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.DEBUG));

    @Before
    public void setUp() {
        StreamsMetricsImpl streamsMetricsImpl = new StreamsMetricsImpl(this.metrics, "test", "latest", new MockTime());
        this.context = new InternalMockProcessorContext(TestUtils.tempDirectory(), Serdes.String(), Serdes.Long(), streamsMetricsImpl, new StreamsConfig(StreamsTestUtils.getStreamsConfig()), MockRecordCollector::new, new ThreadCache(new LogContext("testCache "), 0L, streamsMetricsImpl), Time.SYSTEM, this.taskId);
        Mockito.when(this.innerStoreMock.name()).thenReturn(STORE_NAME);
        this.store = new MeteredTimestampedWindowStore<>(this.innerStoreMock, 10L, STORE_TYPE, new MockTime(), Serdes.String(), new ValueAndTimestampSerde(new SerdeThatDoesntHandleNull()));
    }

    @Test
    public void shouldDelegateDeprecatedInit() {
        WindowStore windowStore = (WindowStore) Mockito.mock(WindowStore.class);
        MeteredTimestampedWindowStore meteredTimestampedWindowStore = new MeteredTimestampedWindowStore(windowStore, 10L, STORE_TYPE, new MockTime(), Serdes.String(), new ValueAndTimestampSerde(new SerdeThatDoesntHandleNull()));
        Mockito.when(windowStore.name()).thenReturn("store");
        meteredTimestampedWindowStore.init(this.context, meteredTimestampedWindowStore);
        ((WindowStore) Mockito.verify(windowStore)).init(this.context, meteredTimestampedWindowStore);
    }

    @Test
    public void shouldDelegateInit() {
        WindowStore windowStore = (WindowStore) Mockito.mock(WindowStore.class);
        MeteredTimestampedWindowStore meteredTimestampedWindowStore = new MeteredTimestampedWindowStore(windowStore, 10L, STORE_TYPE, new MockTime(), Serdes.String(), new ValueAndTimestampSerde(new SerdeThatDoesntHandleNull()));
        Mockito.when(windowStore.name()).thenReturn("store");
        meteredTimestampedWindowStore.init(this.context, meteredTimestampedWindowStore);
        ((WindowStore) Mockito.verify(windowStore)).init(this.context, meteredTimestampedWindowStore);
    }

    @Test
    public void shouldPassChangelogTopicNameToStateStoreSerde() {
        this.context.addChangelogForStore(STORE_NAME, "changelog-topic");
        doShouldPassChangelogTopicNameToStateStoreSerde("changelog-topic");
    }

    @Test
    public void shouldPassDefaultChangelogTopicNameToStateStoreSerdeIfLoggingDisabled() {
        doShouldPassChangelogTopicNameToStateStoreSerde(ProcessorStateManager.storeChangelogTopic(this.context.applicationId(), STORE_NAME, this.taskId.topologyName()));
    }

    private void doShouldPassChangelogTopicNameToStateStoreSerde(String str) {
        Serde serde = (Serde) Mockito.mock(Serde.class);
        Serializer serializer = (Serializer) Mockito.mock(Serializer.class);
        Serde serde2 = (Serde) Mockito.mock(Serde.class);
        Deserializer deserializer = (Deserializer) Mockito.mock(Deserializer.class);
        Serializer serializer2 = (Serializer) Mockito.mock(Serializer.class);
        Mockito.when(serde.serializer()).thenReturn(serializer);
        Mockito.when(serializer.serialize(str, KEY)).thenReturn(KEY.getBytes());
        Mockito.when(serde2.deserializer()).thenReturn(deserializer);
        Mockito.when(deserializer.deserialize(str, VALUE_AND_TIMESTAMP_BYTES)).thenReturn(VALUE_AND_TIMESTAMP);
        Mockito.when(serde2.serializer()).thenReturn(serializer2);
        Mockito.when(serializer2.serialize(str, VALUE_AND_TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
        Mockito.when(this.innerStoreMock.fetch(KEY_BYTES, TIMESTAMP)).thenReturn(VALUE_AND_TIMESTAMP_BYTES);
        this.store = new MeteredTimestampedWindowStore<>(this.innerStoreMock, 10L, STORE_TYPE, new MockTime(), serde, serde2);
        this.store.init(this.context, this.store);
        this.store.fetch(KEY, TIMESTAMP);
        this.store.put(KEY, VALUE_AND_TIMESTAMP, TIMESTAMP);
        ((WindowStore) Mockito.verify(this.innerStoreMock)).fetch(KEY_BYTES, TIMESTAMP);
        ((WindowStore) Mockito.verify(this.innerStoreMock)).put(KEY_BYTES, VALUE_AND_TIMESTAMP_BYTES, TIMESTAMP);
    }

    @Test
    public void shouldCloseUnderlyingStore() {
        this.store.init(this.context, this.store);
        this.store.close();
        ((WindowStore) Mockito.verify(this.innerStoreMock)).close();
    }

    @Test
    public void shouldNotExceptionIfFetchReturnsNull() {
        Mockito.when(this.innerStoreMock.fetch(Bytes.wrap("a".getBytes()), 0L)).thenReturn((Object) null);
        this.store.init(this.context, this.store);
        Assert.assertNull(this.store.fetch("a", 0L));
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromProcessorContext() {
        Mockito.when(this.innerStoreMock.name()).thenReturn(STORE_NAME);
        MeteredTimestampedWindowStore meteredTimestampedWindowStore = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, new MockTime(), (Serde) null, (Serde) null);
        meteredTimestampedWindowStore.init(this.context, this.innerStoreMock);
        try {
            meteredTimestampedWindowStore.put(KEY, ValueAndTimestamp.make(42L, IntegrationTestUtils.DEFAULT_TIMEOUT), IntegrationTestUtils.DEFAULT_TIMEOUT);
        } catch (StreamsException e) {
            if (e.getCause() instanceof ClassCastException) {
                Assert.fail("Serdes are not correctly set from processor context.");
            }
            throw e;
        }
    }

    @Test
    public void shouldNotThrowExceptionIfSerdesCorrectlySetFromConstructorParameters() {
        Mockito.when(this.innerStoreMock.name()).thenReturn(STORE_NAME);
        MeteredTimestampedWindowStore meteredTimestampedWindowStore = new MeteredTimestampedWindowStore(this.innerStoreMock, 10L, STORE_TYPE, new MockTime(), Serdes.String(), new ValueAndTimestampSerde(Serdes.Long()));
        meteredTimestampedWindowStore.init(this.context, this.innerStoreMock);
        try {
            meteredTimestampedWindowStore.put(KEY, ValueAndTimestamp.make(42L, IntegrationTestUtils.DEFAULT_TIMEOUT), IntegrationTestUtils.DEFAULT_TIMEOUT);
        } catch (StreamsException e) {
            if (e.getCause() instanceof ClassCastException) {
                Assert.fail("Serdes are not correctly set from constructor parameters.");
            }
            throw e;
        }
    }
}
