/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.TopologyWrapper;
import org.apache.kafka.streams.errors.InvalidStateStoreException;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.ChangelogRegister;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.RecordCollector;
import org.apache.kafka.streams.processor.internals.RecordCollectorImpl;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsProducer;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.QueryableStoreType;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlySessionStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.SessionBytesStoreSupplier;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.WindowBytesStoreSupplier;
import org.apache.kafka.streams.state.internals.StreamThreadStateStoreProvider;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockApiProcessorSupplier;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class StreamThreadStateStoreProviderTest {
    private StreamTask taskOne;
    private StreamThreadStateStoreProvider provider;
    private StateDirectory stateDirectory;
    private File stateDir;
    private final String topicName = "topic";
    private StreamThread threadMock;
    private Map<TaskId, Task> tasks;

    @Before
    public void before() {
        TopologyWrapper topology = new TopologyWrapper();
        topology.addSource("the-source", new String[]{"topic"});
        topology.addProcessor("the-processor", new MockApiProcessorSupplier(), new String[]{"the-source"});
        topology.addStateStore(Stores.keyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"kv-store"), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        topology.addStateStore(Stores.timestampedKeyValueStoreBuilder((KeyValueBytesStoreSupplier)Stores.inMemoryKeyValueStore((String)"timestamped-kv-store"), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        topology.addStateStore(Stores.windowStoreBuilder((WindowBytesStoreSupplier)Stores.inMemoryWindowStore((String)"window-store", (Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(2L), (boolean)false), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        topology.addStateStore(Stores.timestampedWindowStoreBuilder((WindowBytesStoreSupplier)Stores.inMemoryWindowStore((String)"timestamped-window-store", (Duration)Duration.ofMillis(10L), (Duration)Duration.ofMillis(2L), (boolean)false), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        topology.addStateStore(Stores.sessionStoreBuilder((SessionBytesStoreSupplier)Stores.inMemorySessionStore((String)"session-store", (Duration)Duration.ofMillis(10L)), (Serde)Serdes.String(), (Serde)Serdes.String()), new String[]{"the-processor"});
        Properties properties = new Properties();
        String applicationId = "applicationId";
        properties.put("application.id", "applicationId");
        properties.put("bootstrap.servers", "localhost:9092");
        this.stateDir = TestUtils.tempDirectory();
        properties.put("state.dir", this.stateDir.getPath());
        StreamsConfig streamsConfig = new StreamsConfig((Map)properties);
        MockClientSupplier clientSupplier = new MockClientSupplier();
        this.configureClients(clientSupplier, "applicationId-kv-store-changelog");
        this.configureClients(clientSupplier, "applicationId-window-store-changelog");
        InternalTopologyBuilder internalTopologyBuilder = topology.getInternalBuilder("applicationId");
        ProcessorTopology processorTopology = internalTopologyBuilder.buildTopology();
        this.tasks = new HashMap<TaskId, Task>();
        this.stateDirectory = new StateDirectory(streamsConfig, (Time)new MockTime(), true);
        this.taskOne = this.createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 0));
        this.taskOne.initializeIfNeeded();
        this.tasks.put(new TaskId(0, 0), (Task)this.taskOne);
        StreamTask taskTwo = this.createStreamsTask(streamsConfig, clientSupplier, processorTopology, new TaskId(0, 1));
        taskTwo.initializeIfNeeded();
        this.tasks.put(new TaskId(0, 1), (Task)taskTwo);
        this.threadMock = (StreamThread)EasyMock.createNiceMock(StreamThread.class);
        this.provider = new StreamThreadStateStoreProvider(this.threadMock);
    }

    @After
    public void cleanUp() throws IOException {
        Utils.delete((File)this.stateDir);
    }

    @Test
    public void shouldFindKeyValueStores() {
        this.mockThread(true);
        List kvStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        Assert.assertEquals((long)2L, (long)kvStores.size());
        for (ReadOnlyKeyValueStore store : kvStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
    }

    @Test
    public void shouldFindTimestampedKeyValueStores() {
        this.mockThread(true);
        List tkvStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"timestamped-kv-store", (QueryableStoreType)QueryableStoreTypes.timestampedKeyValueStore()));
        Assert.assertEquals((long)2L, (long)tkvStores.size());
        for (ReadOnlyKeyValueStore store : tkvStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(TimestampedKeyValueStore.class));
        }
    }

    @Test
    public void shouldNotFindKeyValueStoresAsTimestampedStore() {
        this.mockThread(true);
        InvalidStateStoreException exception = (InvalidStateStoreException)Assert.assertThrows(InvalidStateStoreException.class, () -> this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.timestampedKeyValueStore())));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"Cannot get state store kv-store because the queryable store type [class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedKeyValueStoreType] does not accept the actual store type [class org.apache.kafka.streams.state.internals.MeteredKeyValueStore]."));
    }

    @Test
    public void shouldFindTimestampedKeyValueStoresAsKeyValueStores() {
        this.mockThread(true);
        List tkvStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"timestamped-kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
        Assert.assertEquals((long)2L, (long)tkvStores.size());
        for (ReadOnlyKeyValueStore store : tkvStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
    }

    @Test
    public void shouldFindWindowStores() {
        this.mockThread(true);
        List windowStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"window-store", (QueryableStoreType)QueryableStoreTypes.windowStore()));
        Assert.assertEquals((long)2L, (long)windowStores.size());
        for (ReadOnlyWindowStore store : windowStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyWindowStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(TimestampedWindowStore.class)));
        }
    }

    @Test
    public void shouldFindTimestampedWindowStores() {
        this.mockThread(true);
        List windowStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"timestamped-window-store", (QueryableStoreType)QueryableStoreTypes.timestampedWindowStore()));
        Assert.assertEquals((long)2L, (long)windowStores.size());
        for (ReadOnlyWindowStore store : windowStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyWindowStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(TimestampedWindowStore.class));
        }
    }

    @Test
    public void shouldNotFindWindowStoresAsTimestampedStore() {
        this.mockThread(true);
        InvalidStateStoreException exception = (InvalidStateStoreException)Assert.assertThrows(InvalidStateStoreException.class, () -> this.provider.stores(StoreQueryParameters.fromNameAndType((String)"window-store", (QueryableStoreType)QueryableStoreTypes.timestampedWindowStore())));
        MatcherAssert.assertThat((Object)exception.getMessage(), (Matcher)Matchers.is((Object)"Cannot get state store window-store because the queryable store type [class org.apache.kafka.streams.state.QueryableStoreTypes$TimestampedWindowStoreType] does not accept the actual store type [class org.apache.kafka.streams.state.internals.MeteredWindowStore]."));
    }

    @Test
    public void shouldFindTimestampedWindowStoresAsWindowStore() {
        this.mockThread(true);
        List windowStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"timestamped-window-store", (QueryableStoreType)QueryableStoreTypes.windowStore()));
        Assert.assertEquals((long)2L, (long)windowStores.size());
        for (ReadOnlyWindowStore store : windowStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyWindowStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(TimestampedWindowStore.class)));
        }
    }

    @Test
    public void shouldFindSessionStores() {
        this.mockThread(true);
        List sessionStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"session-store", (QueryableStoreType)QueryableStoreTypes.sessionStore()));
        Assert.assertEquals((long)2L, (long)sessionStores.size());
        for (ReadOnlySessionStore store : sessionStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlySessionStore.class));
        }
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfKVStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("kv-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfTsKVStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("timestamped-kv-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType((String)"timestamped-kv-store", (QueryableStoreType)QueryableStoreTypes.timestampedKeyValueStore()));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfWindowStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("window-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType((String)"window-store", (QueryableStoreType)QueryableStoreTypes.windowStore()));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfTsWindowStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("timestamped-window-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType((String)"timestamped-window-store", (QueryableStoreType)QueryableStoreTypes.timestampedWindowStore()));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfSessionStoreClosed() {
        this.mockThread(true);
        this.taskOne.getStore("session-store").close();
        this.provider.stores(StoreQueryParameters.fromNameAndType((String)"session-store", (QueryableStoreType)QueryableStoreTypes.sessionStore()));
    }

    @Test
    public void shouldReturnEmptyListIfNoStoresFoundWithName() {
        this.mockThread(true);
        Assert.assertEquals(Collections.emptyList(), (Object)this.provider.stores(StoreQueryParameters.fromNameAndType((String)"not-a-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore())));
    }

    @Test
    public void shouldReturnSingleStoreForPartition() {
        this.mockThread(true);
        List kvStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(0)));
        Assert.assertEquals((long)1L, (long)kvStores.size());
        for (ReadOnlyKeyValueStore store : kvStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
        kvStores = this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(1)));
        Assert.assertEquals((long)1L, (long)kvStores.size());
        for (ReadOnlyKeyValueStore store : kvStores) {
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.instanceOf(ReadOnlyKeyValueStore.class));
            MatcherAssert.assertThat((Object)store, (Matcher)Matchers.not((Matcher)Matchers.instanceOf(TimestampedKeyValueStore.class)));
        }
    }

    @Test
    public void shouldReturnEmptyListForInvalidPartitions() {
        this.mockThread(true);
        Assert.assertEquals(Collections.emptyList(), (Object)this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()).withPartition(Integer.valueOf(2))));
    }

    @Test(expected=InvalidStateStoreException.class)
    public void shouldThrowInvalidStoreExceptionIfNotAllStoresAvailable() {
        this.mockThread(false);
        this.provider.stores(StoreQueryParameters.fromNameAndType((String)"kv-store", (QueryableStoreType)QueryableStoreTypes.keyValueStore()));
    }

    private StreamTask createStreamsTask(StreamsConfig streamsConfig, MockClientSupplier clientSupplier, ProcessorTopology topology, TaskId taskId) {
        Metrics metrics = new Metrics();
        LogContext logContext = new LogContext("test-stream-task ");
        Set<TopicPartition> partitions = Collections.singleton(new TopicPartition("topic", taskId.partition));
        ProcessorStateManager stateManager = new ProcessorStateManager(taskId, Task.TaskType.ACTIVE, StreamThread.eosEnabled((StreamsConfig)streamsConfig), logContext, this.stateDirectory, (ChangelogRegister)new StoreChangelogReader((Time)new MockTime(), streamsConfig, logContext, (Admin)clientSupplier.adminClient, clientSupplier.restoreConsumer, (StateRestoreListener)new MockStateRestoreListener()), topology.storeToChangelogTopic(), partitions);
        RecordCollectorImpl recordCollector = new RecordCollectorImpl(logContext, taskId, new StreamsProducer(streamsConfig, "threadId", (KafkaClientSupplier)clientSupplier, new TaskId(0, 0), UUID.randomUUID(), logContext), streamsConfig.defaultProductionExceptionHandler(), (StreamsMetricsImpl)new MockStreamsMetrics(metrics));
        MockStreamsMetrics streamsMetrics = new MockStreamsMetrics(metrics);
        ProcessorContextImpl context = new ProcessorContextImpl(taskId, streamsConfig, stateManager, (StreamsMetricsImpl)streamsMetrics, null);
        return new StreamTask(taskId, partitions, topology, clientSupplier.consumer, streamsConfig, (StreamsMetricsImpl)streamsMetrics, this.stateDirectory, (ThreadCache)EasyMock.createNiceMock(ThreadCache.class), (Time)new MockTime(), stateManager, (RecordCollector)recordCollector, (InternalProcessorContext)context);
    }

    private void mockThread(boolean initialized) {
        EasyMock.expect((Object)this.threadMock.isRunning()).andReturn((Object)initialized);
        EasyMock.expect((Object)this.threadMock.allTasks()).andStubReturn(this.tasks);
        EasyMock.expect((Object)this.threadMock.activeTaskMap()).andStubReturn(this.tasks);
        EasyMock.expect((Object)this.threadMock.activeTasks()).andStubReturn(new ArrayList<Task>(this.tasks.values()));
        EasyMock.expect((Object)this.threadMock.state()).andReturn((Object)(initialized ? StreamThread.State.RUNNING : StreamThread.State.PARTITIONS_ASSIGNED)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.threadMock});
    }

    private void configureClients(MockClientSupplier clientSupplier, String topic) {
        List<PartitionInfo> partitions = Arrays.asList(new PartitionInfo(topic, 0, null, null, null), new PartitionInfo(topic, 1, null, null, null));
        clientSupplier.restoreConsumer.updatePartitions(topic, partitions);
        TopicPartition tp1 = new TopicPartition(topic, 0);
        TopicPartition tp2 = new TopicPartition(topic, 1);
        clientSupplier.restoreConsumer.assign(Arrays.asList(tp1, tp2));
        HashMap<TopicPartition, Long> offsets = new HashMap<TopicPartition, Long>();
        offsets.put(tp1, 0L);
        offsets.put(tp2, 0L);
        clientSupplier.restoreConsumer.updateBeginningOffsets(offsets);
        clientSupplier.restoreConsumer.updateEndOffsets(offsets);
        clientSupplier.adminClient.updateBeginningOffsets(offsets);
        clientSupplier.adminClient.updateEndOffsets(offsets);
    }
}

