package org.apache.kafka.streams.processor.internals;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Set;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.InvalidOffsetException;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.internals.InternalNameProvider;
import org.apache.kafka.streams.kstream.internals.KTableSource;
import org.apache.kafka.streams.kstream.internals.KeyValueStoreMaterializer;
import org.apache.kafka.streams.kstream.internals.MaterializedInternal;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.internals.GlobalStreamThread;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.IsInstanceOf;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.class */
public class GlobalStreamThreadTest {
    private GlobalStreamThread globalStreamThread;
    private StreamsConfig config;
    private static final String GLOBAL_STORE_TOPIC_NAME = "foo";
    private static final String GLOBAL_STORE_NAME = "bar";
    private final InternalTopologyBuilder builder = new InternalTopologyBuilder();
    private final MockConsumer<byte[], byte[]> mockConsumer = new MockConsumer<>(OffsetResetStrategy.NONE);
    private final MockTime time = new MockTime();
    private final MockStateRestoreListener stateRestoreListener = new MockStateRestoreListener();
    private final TopicPartition topicPartition = new TopicPartition(GLOBAL_STORE_TOPIC_NAME, 0);

    @Before
    public void before() {
        MaterializedInternal materializedInternal = new MaterializedInternal(Materialized.with((Serde) null, (Serde) null));
        materializedInternal.generateStoreNameIfNeeded(new InternalNameProvider() { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.1
            public String newProcessorName(String str) {
                return "processorName";
            }

            public String newStoreName(String str) {
                return GlobalStreamThreadTest.GLOBAL_STORE_NAME;
            }
        }, "store-");
        this.builder.addGlobalStore(new KeyValueStoreMaterializer(materializedInternal).materialize().withLoggingDisabled(), "sourceName", (TimestampExtractor) null, (Deserializer) null, (Deserializer) null, GLOBAL_STORE_TOPIC_NAME, "processorName", new KTableSource(GLOBAL_STORE_NAME));
        HashMap hashMap = new HashMap();
        hashMap.put("bootstrap.servers", "blah");
        hashMap.put("application.id", "blah");
        hashMap.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
        this.config = new StreamsConfig(hashMap);
        this.globalStreamThread = new GlobalStreamThread(this.builder.rewriteTopology(this.config).buildGlobalStateTopology(), this.config, this.mockConsumer, new StateDirectory(this.config, this.time), 0L, new Metrics(), new MockTime(), "clientId", this.stateRestoreListener);
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfThereIsAStreamsException() {
        try {
            this.globalStreamThread.start();
            Assert.fail("Should have thrown StreamsException if start up failed");
        } catch (StreamsException e) {
        }
        Assert.assertFalse(this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldThrowStreamsExceptionOnStartupIfExceptionOccurred() {
        this.globalStreamThread = new GlobalStreamThread(this.builder.buildGlobalStateTopology(), this.config, new MockConsumer(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.2
            public List<PartitionInfo> partitionsFor(String str) {
                throw new RuntimeException("KABOOM!");
            }
        }, new StateDirectory(this.config, this.time), 0L, new Metrics(), new MockTime(), "clientId", this.stateRestoreListener);
        try {
            this.globalStreamThread.start();
            Assert.fail("Should have thrown StreamsException if start up failed");
        } catch (StreamsException e) {
            MatcherAssert.assertThat(e.getCause(), IsInstanceOf.instanceOf(RuntimeException.class));
            MatcherAssert.assertThat(e.getCause().getMessage(), CoreMatchers.equalTo("KABOOM!"));
        }
        Assert.assertFalse(this.globalStreamThread.stillRunning());
    }

    @Test
    public void shouldBeRunningAfterSuccessfulStart() {
        initializeConsumer();
        this.globalStreamThread.start();
        Assert.assertTrue(this.globalStreamThread.stillRunning());
    }

    @Test(timeout = IntegrationTestUtils.DEFAULT_TIMEOUT)
    public void shouldStopRunningWhenClosedByUser() throws InterruptedException {
        initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertEquals(GlobalStreamThread.State.DEAD, this.globalStreamThread.state());
    }

    @Test
    public void shouldCloseStateStoresOnClose() throws InterruptedException {
        initializeConsumer();
        this.globalStreamThread.start();
        StateStore stateStore = (StateStore) this.builder.globalStateStores().get(GLOBAL_STORE_NAME);
        Assert.assertTrue(stateStore.isOpen());
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertFalse(stateStore.isOpen());
    }

    @Test
    public void shouldTransitionToDeadOnClose() throws InterruptedException {
        initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        Assert.assertEquals(GlobalStreamThread.State.DEAD, this.globalStreamThread.state());
    }

    @Test
    public void shouldStayDeadAfterTwoCloses() throws InterruptedException {
        initializeConsumer();
        this.globalStreamThread.start();
        this.globalStreamThread.shutdown();
        this.globalStreamThread.join();
        this.globalStreamThread.shutdown();
        Assert.assertEquals(GlobalStreamThread.State.DEAD, this.globalStreamThread.state());
    }

    @Test
    public void shouldTransitionToRunningOnStart() throws InterruptedException {
        initializeConsumer();
        this.globalStreamThread.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.3
            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.globalStreamThread.state() == GlobalStreamThread.State.RUNNING;
            }
        }, 10000L, "Thread never started.");
        this.globalStreamThread.shutdown();
    }

    @Test
    public void shouldDieOnInvalidOffsetException() throws Exception {
        initializeConsumer();
        this.globalStreamThread.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.4
            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.globalStreamThread.state() == GlobalStreamThread.State.RUNNING;
            }
        }, 10000L, "Thread never started.");
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 1L));
        this.mockConsumer.addRecord(new ConsumerRecord(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.5
            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.mockConsumer.position(GlobalStreamThreadTest.this.topicPartition) == 1;
            }
        }, 10000L, "Input record never consumed");
        this.mockConsumer.setException(new InvalidOffsetException("Try Again!") { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.6
            public Set<TopicPartition> partitions() {
                return Collections.singleton(GlobalStreamThreadTest.this.topicPartition);
            }
        });
        this.mockConsumer.addRecord(new ConsumerRecord(GLOBAL_STORE_TOPIC_NAME, 0, 0L, "K1".getBytes(), "V1".getBytes()));
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.GlobalStreamThreadTest.7
            public boolean conditionMet() {
                return GlobalStreamThreadTest.this.globalStreamThread.state() == GlobalStreamThread.State.DEAD;
            }
        }, 10000L, "GlobalStreamThread should have died.");
    }

    private void initializeConsumer() {
        this.mockConsumer.updatePartitions(GLOBAL_STORE_TOPIC_NAME, Collections.singletonList(new PartitionInfo(GLOBAL_STORE_TOPIC_NAME, 0, (Node) null, new Node[0], new Node[0])));
        this.mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.topicPartition, 0L));
        this.mockConsumer.updateEndOffsets(Collections.singletonMap(this.topicPartition, 0L));
        this.mockConsumer.assign(Collections.singleton(this.topicPartition));
    }
}
