package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.class */
public class StreamThreadStateStoreProviderTest {
    private StreamTask taskOne;
    private StreamThreadStateStoreProvider provider;
    private StateDirectory stateDirectory;
    private File stateDir;
    private final String topicName = "topic";
    private StreamThread threadMock;
    private Map<TaskId, Task> tasks;

    @Before
    public void before() {
        TopologyWrapper topologyWrapper = new TopologyWrapper();
        topologyWrapper.addSource("the-source", new String[]{"topic"});
        topologyWrapper.addProcessor("the-processor", new MockProcessorSupplier(), new String[]{"the-source"});
        topologyWrapper.addStateStore(Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore("kv-store"), Serdes.String(), Serdes.String()), new String[]{"the-processor"});
        topologyWrapper.addStateStore(Stores.timestampedKeyValueStoreBuilder(Stores.inMemoryKeyValueStore("timestamped-kv-store"), Serdes.String(), Serdes.String()), new String[]{"the-processor"});
        topologyWrapper.addStateStore(Stores.windowStoreBuilder(Stores.inMemoryWindowStore("window-store", Duration.ofMillis(10L), Duration.ofMillis(2L), false), Serdes.String(), Serdes.String()), new String[]{"the-processor"});
        topologyWrapper.addStateStore(Stores.timestampedWindowStoreBuilder(Stores.inMemoryWindowStore("timestamped-window-store", Duration.ofMillis(10L), Duration.ofMillis(2L), false), Serdes.String(), Serdes.String()), new String[]{"the-processor"});
        Properties properties = new Properties();
        properties.put("application.id", "applicationId");
        properties.put("bootstrap.servers", "localhost:9092");
        this.stateDir = TestUtils.tempDirectory();
        properties.put("state.dir", this.stateDir.getPath());
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        MockClientSupplier mockClientSupplier = new MockClientSupplier();
        configureClients(mockClientSupplier, "applicationId-kv-store-changelog");
        configureClients(mockClientSupplier, "applicationId-window-store-changelog");
        ProcessorTopology buildTopology = topologyWrapper.getInternalBuilder("applicationId").buildTopology();
        this.tasks = new HashMap();
        this.stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
        this.taskOne = createStreamsTask(streamsConfig, mockClientSupplier, buildTopology, new TaskId(0, 0));
        this.taskOne.initializeIfNeeded();
        this.tasks.put(new TaskId(0, 0), this.taskOne);
        Task createStreamsTask = createStreamsTask(streamsConfig, mockClientSupplier, buildTopology, new TaskId(0, 1));
        createStreamsTask.initializeIfNeeded();
        this.tasks.put(new TaskId(0, 1), createStreamsTask);
        this.threadMock = (StreamThread) EasyMock.createNiceMock(StreamThread.class);
        this.provider = new StreamThreadStateStoreProvider(this.threadMock);
    }

    @After
    public void cleanUp() throws IOException {
        Utils.delete(this.stateDir);
    }

    @Test
    public void shouldFindKeyValueStores() {
        mockThread(true);
        List<ReadOnlyKeyValueStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
        Assert.assertEquals(2L, stores.size());
        for (ReadOnlyKeyValueStore readOnlyKeyValueStore : stores) {
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.not(Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
    }

    @Test
    public void shouldFindTimestampedKeyValueStores() {
        mockThread(true);
        List<ReadOnlyKeyValueStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
        Assert.assertEquals(2L, stores.size());
        for (ReadOnlyKeyValueStore readOnlyKeyValueStore : stores) {
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.instanceOf(TimestampedKeyValueStore.class));
        }
    }

    @Test
    public void shouldNotFindKeyValueStoresAsTimestampedStore() {
        mockThread(true);
        MatcherAssert.assertThat(Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
        }).getMessage(), Matchers.is("Cannot get state store kv-store because the queryable store type [class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreType] does not accept the actual store type [class org.apache.kafka.streams.state.internals.MeteredKeyValueStore]."));
    }

    @Test
    public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
        mockThread(true);
        List<ReadOnlyKeyValueStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.keyValueStore()));
        Assert.assertEquals(2L, stores.size());
        for (ReadOnlyKeyValueStore readOnlyKeyValueStore : stores) {
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.not(Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
    }

    @Test
    public void shouldFindWindowStores() {
        mockThread(true);
        List<ReadOnlyWindowStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore()));
        Assert.assertEquals(2L, stores.size());
        for (ReadOnlyWindowStore readOnlyWindowStore : stores) {
            MatcherAssert.assertThat(readOnlyWindowStore, Matchers.instanceOf(ReadOnlyWindowStore.class));
            MatcherAssert.assertThat(readOnlyWindowStore, Matchers.not(Matchers.instanceOf(TimestampedWindowStore.class)));
        }
    }

    @Test
    public void shouldFindTimestampedWindowStores() {
        mockThread(true);
        List<ReadOnlyWindowStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
        Assert.assertEquals(2L, stores.size());
        for (ReadOnlyWindowStore readOnlyWindowStore : stores) {
            MatcherAssert.assertThat(readOnlyWindowStore, Matchers.instanceOf(ReadOnlyWindowStore.class));
            MatcherAssert.assertThat(readOnlyWindowStore, Matchers.instanceOf(TimestampedWindowStore.class));
        }
    }

    @Test
    public void shouldNotFindWindowStoresAsTimestampedStore() {
        mockThread(true);
        MatcherAssert.assertThat(Assert.assertThrows(InvalidStateStoreException.class, () -> {
            this.provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.timestampedWindowStore()));
        }).getMessage(), Matchers.is("Cannot get state store window-store because the queryable store type [class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreType] does not accept the actual store type [class org.apache.kafka.streams.state.internals.MeteredWindowStore]."));
    }

    @Test
    public void shouldFindTimestampedWindowStoresAsWindowStore() {
        mockThread(true);
        List<ReadOnlyWindowStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.windowStore()));
        Assert.assertEquals(2L, stores.size());
        for (ReadOnlyWindowStore readOnlyWindowStore : stores) {
            MatcherAssert.assertThat(readOnlyWindowStore, Matchers.instanceOf(ReadOnlyWindowStore.class));
            MatcherAssert.assertThat(readOnlyWindowStore, Matchers.not(Matchers.instanceOf(TimestampedWindowStore.class)));
        }
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
        mockThread(true);
        this.taskOne.getStore("kv-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() {
        mockThread(true);
        this.taskOne.getStore("timestamped-kv-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType("timestamped-kv-store", QueryableStoreTypes.timestampedKeyValueStore()));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() {
        mockThread(true);
        this.taskOne.getStore("window-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType("window-store", QueryableStoreTypes.windowStore()));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
        mockThread(true);
        this.taskOne.getStore("timestamped-window-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType("timestamped-window-store", QueryableStoreTypes.timestampedWindowStore()));
    }

    @Test
    public void shouldReturnEmptyListIfNoStoresFoundWithName() {
        mockThread(true);
        Assert.assertEquals(Collections.emptyList(), this.provider.stores(StoreQueryParameters.fromNameAndType("not-a-store", QueryableStoreTypes.keyValueStore())));
    }

    @Test
    public void shouldReturnSingleStoreForPartition() {
        mockThread(true);
        List<ReadOnlyKeyValueStore> stores = this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(0));
        Assert.assertEquals(1L, stores.size());
        for (ReadOnlyKeyValueStore readOnlyKeyValueStore : stores) {
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat(readOnlyKeyValueStore, Matchers.not(Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
        List<ReadOnlyKeyValueStore> stores2 = this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(1));
        Assert.assertEquals(1L, stores2.size());
        for (ReadOnlyKeyValueStore readOnlyKeyValueStore2 : stores2) {
            MatcherAssert.assertThat(readOnlyKeyValueStore2, Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat(readOnlyKeyValueStore2, Matchers.not(Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
    }

    @Test
    public void shouldReturnEmptyListForInvalidPartitions() {
        mockThread(true);
        Assert.assertEquals(Collections.emptyList(), this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()).withPartition(2)));
    }

    @Test(expected = InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() {
        mockThread(false);
        this.provider.stores(StoreQueryParameters.fromNameAndType("kv-store", QueryableStoreTypes.keyValueStore()));
    }

    private StreamTask createStreamsTask(StreamsConfig streamsConfig, MockClientSupplier mockClientSupplier, ProcessorTopology processorTopology, TaskId taskId) {
        Metrics metrics = new Metrics();
        LogContext logContext = new LogContext("test-stream-task ");
        Set singleton = Collections.singleton(new TopicPartition("topic", taskId.partition));
        ProcessorStateManager processorStateManager = new ProcessorStateManager(taskId, Task.TaskType.ACTIVE, StreamThread.eosEnabled(streamsConfig), logContext, this.stateDirectory, new StoreChangelogReader(new MockTime(), streamsConfig, logContext, mockClientSupplier.adminClient, mockClientSupplier.restoreConsumer, new MockStateRestoreListener()), processorTopology.storeToChangelogTopic(), singleton);
        RecordCollectorImpl recordCollectorImpl = new RecordCollectorImpl(logContext, taskId, new StreamsProducer(streamsConfig, "threadId", mockClientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext), streamsConfig.defaultProductionExceptionHandler(), new MockStreamsMetrics(metrics));
        MockStreamsMetrics mockStreamsMetrics = new MockStreamsMetrics(metrics);
        return new StreamTask(taskId, singleton, processorTopology, mockClientSupplier.consumer, streamsConfig, mockStreamsMetrics, this.stateDirectory, (ThreadCache) EasyMock.createNiceMock(ThreadCache.class), new MockTime(), processorStateManager, recordCollectorImpl, new ProcessorContextImpl(taskId, streamsConfig, processorStateManager, mockStreamsMetrics, (ThreadCache) null));
    }

    private void mockThread(boolean z) {
        EasyMock.expect(Boolean.valueOf(this.threadMock.isRunning())).andReturn(Boolean.valueOf(z));
        EasyMock.expect(this.threadMock.allTasks()).andStubReturn(this.tasks);
        EasyMock.expect(this.threadMock.activeTaskMap()).andStubReturn(this.tasks);
        EasyMock.expect(this.threadMock.activeTasks()).andStubReturn(new ArrayList(this.tasks.values()));
        EasyMock.expect(this.threadMock.state()).andReturn(z ? StreamThread.State.RUNNING : StreamThread.State.PARTITIONS_ASSIGNED).anyTimes();
        EasyMock.replay(new Object[]{this.threadMock});
    }

    private void configureClients(MockClientSupplier mockClientSupplier, String str) {
        mockClientSupplier.restoreConsumer.updatePartitions(str, Arrays.asList(new PartitionInfo(str, 0, (Node) null, (Node[]) null, (Node[]) null), new PartitionInfo(str, 1, (Node) null, (Node[]) null, (Node[]) null)));
        TopicPartition topicPartition = new TopicPartition(str, 0);
        TopicPartition topicPartition2 = new TopicPartition(str, 1);
        mockClientSupplier.restoreConsumer.assign(Arrays.asList(topicPartition, topicPartition2));
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 0L);
        hashMap.put(topicPartition2, 0L);
        mockClientSupplier.restoreConsumer.updateBeginningOffsets(hashMap);
        mockClientSupplier.restoreConsumer.updateEndOffsets(hashMap);
        mockClientSupplier.adminClient.updateBeginningOffsets(hashMap);
        mockClientSupplier.adminClient.updateEndOffsets(hashMap);
    }
}
