package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
import java.util.regex.Pattern;
import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.internals.PartitionAssignor;
import org.apache.kafka.clients.producer.MockProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsMetrics;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockClientSupplier;
import org.apache.kafka.test.MockInternalTopicManager;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest.class */
public class StreamThreadTest {
    private static final String TOPIC = "topic";
    private final String clientId = "clientId";
    private final String applicationId = "stream-thread-test";
    private final MockTime mockTime = new MockTime();
    private final Metrics metrics = new Metrics();
    private final MockClientSupplier clientSupplier = new MockClientSupplier();
    private UUID processId = UUID.randomUUID();
    private final KStreamBuilder builder = new KStreamBuilder();
    private final StreamsConfig config = new StreamsConfig(configProps(false));
    private final String stateDir = TestUtils.tempDirectory().getPath();
    private final StateDirectory stateDirectory = new StateDirectory("applicationId", this.stateDir, this.mockTime);
    private final ChangelogReader changelogReader = new StoreChangelogReader(this.clientSupplier.restoreConsumer);
    private final TopicPartition t1p1 = new TopicPartition("topic1", 1);
    private final TopicPartition t1p2 = new TopicPartition("topic1", 2);
    private final TopicPartition t2p1 = new TopicPartition("topic2", 1);
    private final TopicPartition t2p2 = new TopicPartition("topic2", 2);
    private final TopicPartition t3p1 = new TopicPartition("topic3", 1);
    private final TopicPartition t3p2 = new TopicPartition("topic3", 2);
    private final List<PartitionInfo> infos = Arrays.asList(new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]));
    private final Cluster metadata = new Cluster("cluster", Collections.singleton(Node.noNode()), this.infos, Collections.emptySet(), Collections.emptySet());
    private final PartitionAssignor.Subscription subscription = new PartitionAssignor.Subscription(Arrays.asList("topic1", "topic2", "topic3"), subscriptionUserData());
    private final TaskId task1 = new TaskId(0, 1);
    private final TaskId task2 = new TaskId(0, 2);
    private final TaskId task3 = new TaskId(1, 1);
    private final TaskId task4 = new TaskId(1, 2);
    private final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0));
    private final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1));

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$MockStreamsPartitionAssignor.class */
    public class MockStreamsPartitionAssignor extends StreamPartitionAssignor {
        private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment;
        private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment;

        MockStreamsPartitionAssignor(StreamThreadTest streamThreadTest, Map<TaskId, Set<TopicPartition>> map) {
            this(map, Collections.emptyMap());
        }

        MockStreamsPartitionAssignor(Map<TaskId, Set<TopicPartition>> map, Map<TaskId, Set<TopicPartition>> map2) {
            this.activeTaskAssignment = map;
            this.standbyTaskAssignment = map2;
        }

        Map<TaskId, Set<TopicPartition>> activeTasks() {
            return this.activeTaskAssignment;
        }

        Map<TaskId, Set<TopicPartition>> standbyTasks() {
            return this.standbyTaskAssignment;
        }

        public void close() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$StateListenerStub.class */
    private static class StateListenerStub implements StreamThread.StateListener {
        int numChanges;
        ThreadStateTransitionValidator oldState;
        ThreadStateTransitionValidator newState;

        private StateListenerStub() {
            this.numChanges = 0;
            this.oldState = null;
            this.newState = null;
        }

        public void onChange(Thread thread, ThreadStateTransitionValidator threadStateTransitionValidator, ThreadStateTransitionValidator threadStateTransitionValidator2) {
            this.numChanges++;
            if (this.newState != null && this.newState != threadStateTransitionValidator2) {
                throw new RuntimeException("State mismatch " + threadStateTransitionValidator2 + " different from " + this.newState);
            }
            this.oldState = threadStateTransitionValidator2;
            this.newState = threadStateTransitionValidator;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/StreamThreadTest$TestStreamTask.class */
    private class TestStreamTask extends StreamTask {
        boolean committed;
        private boolean suspended;
        private boolean closed;
        private boolean closedStateManager;

        TestStreamTask(TaskId taskId, String str, Collection<TopicPartition> collection, ProcessorTopology processorTopology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, StreamsConfig streamsConfig, StreamsMetrics streamsMetrics, StateDirectory stateDirectory, ChangelogReader changelogReader) {
            super(taskId, str, collection, processorTopology, consumer, changelogReader, streamsConfig, streamsMetrics, stateDirectory, new ThreadCache("", 0L, streamsMetrics), new MockTime(), producer);
            this.committed = false;
        }

        void commit(boolean z) {
            super.commit(z);
            this.committed = true;
        }

        protected void updateOffsetLimits() {
        }

        public void resume() {
            if (!this.suspended || this.closed) {
                throw new IllegalStateException("Should not resume task that is not suspended or already closed.");
            }
            super.resume();
            this.suspended = false;
        }

        void suspend(boolean z) {
            if (this.suspended || this.closed) {
                throw new IllegalStateException("Should not suspend task that is already suspended or closed.");
            }
            super.suspend(z);
            this.suspended = true;
        }

        public void close(boolean z, boolean z2) {
            if (this.closed && z) {
                throw new IllegalStateException("Should not close task that is already closed.");
            }
            super.close(z, z2);
            this.closed = true;
        }

        public void closeSuspended(boolean z, boolean z2, RuntimeException runtimeException) {
            if (this.closed && z) {
                throw new IllegalStateException("Should not close task that is not suspended or already closed.");
            }
            super.closeSuspended(z, z2, runtimeException);
            this.closed = true;
        }

        void closeStateManager(boolean z) {
            super.closeStateManager(z);
            this.closedStateManager = true;
        }
    }

    @Before
    public void setUp() throws Exception {
        this.processId = UUID.randomUUID();
    }

    private ByteBuffer subscriptionUserData() {
        UUID randomUUID = UUID.randomUUID();
        ByteBuffer allocate = ByteBuffer.allocate(28);
        allocate.putInt(1);
        allocate.putLong(randomUUID.getMostSignificantBits());
        allocate.putLong(randomUUID.getLeastSignificantBits());
        allocate.putInt(0);
        allocate.putInt(0);
        allocate.rewind();
        return allocate;
    }

    private Properties configProps(final boolean z) {
        return new Properties() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.1
            {
                setProperty("application.id", "stream-thread-test");
                setProperty("bootstrap.servers", "localhost:2171");
                setProperty("buffered.records.per.partition", "3");
                setProperty("default.timestamp.extractor", MockTimestampExtractor.class.getName());
                setProperty("state.dir", TestUtils.tempDirectory().getAbsolutePath());
                if (z) {
                    setProperty("processing.guarantee", "exactly_once");
                }
            }
        };
    }

    @Test
    public void testPartitionAssignmentChangeForSingleGroup() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        StreamThread streamThread = getStreamThread();
        final HashMap hashMap = new HashMap();
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.2
            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return hashMap;
            }
        });
        StateListenerStub stateListenerStub = new StateListenerStub();
        streamThread.setStateListener(stateListenerStub);
        Assert.assertEquals(streamThread.state(), StreamThread.State.CREATED);
        ConsumerRebalanceListener consumerRebalanceListener = streamThread.rebalanceListener;
        streamThread.setState(StreamThread.State.RUNNING);
        Assert.assertTrue(streamThread.tasks().isEmpty());
        consumerRebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertEquals(streamThread.state(), StreamThread.State.PARTITIONS_REVOKED);
        List singletonList = Collections.singletonList(this.t1p1);
        HashSet hashSet = new HashSet(Collections.singleton(this.t1p1));
        hashMap.put(new TaskId(0, 1), hashSet);
        consumerRebalanceListener.onPartitionsAssigned(singletonList);
        Assert.assertEquals(stateListenerStub.numChanges, 3L);
        Assert.assertEquals(StreamThread.State.PARTITIONS_REVOKED, stateListenerStub.oldState);
        streamThread.runOnce(-1L);
        Assert.assertEquals(streamThread.state(), StreamThread.State.RUNNING);
        Assert.assertTrue(streamThread.tasks().containsKey(this.task1));
        Assert.assertEquals(hashSet, ((StreamTask) streamThread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(1L, streamThread.tasks().size());
        hashMap.clear();
        consumerRebalanceListener.onPartitionsRevoked(singletonList);
        Assert.assertFalse(streamThread.tasks().containsKey(this.task1));
        Assert.assertEquals(0L, streamThread.tasks().size());
        List singletonList2 = Collections.singletonList(this.t1p2);
        HashSet hashSet2 = new HashSet(Collections.singleton(this.t1p2));
        hashMap.put(new TaskId(0, 2), hashSet2);
        consumerRebalanceListener.onPartitionsAssigned(singletonList2);
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().containsKey(this.task2));
        Assert.assertEquals(hashSet2, ((StreamTask) streamThread.tasks().get(this.task2)).partitions());
        Assert.assertEquals(1L, streamThread.tasks().size());
        hashMap.clear();
        consumerRebalanceListener.onPartitionsRevoked(singletonList2);
        List asList = Arrays.asList(this.t1p1, this.t1p2);
        HashSet hashSet3 = new HashSet(Collections.singleton(this.t1p1));
        HashSet hashSet4 = new HashSet(Collections.singleton(this.t1p2));
        hashMap.put(new TaskId(0, 1), hashSet3);
        hashMap.put(new TaskId(0, 2), hashSet4);
        consumerRebalanceListener.onPartitionsAssigned(asList);
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().containsKey(this.task1));
        Assert.assertTrue(streamThread.tasks().containsKey(this.task2));
        Assert.assertEquals(hashSet3, ((StreamTask) streamThread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(hashSet4, ((StreamTask) streamThread.tasks().get(this.task2)).partitions());
        Assert.assertEquals(2L, streamThread.tasks().size());
        consumerRebalanceListener.onPartitionsRevoked(asList);
        consumerRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().isEmpty());
        streamThread.close();
        Assert.assertTrue(streamThread.state() == StreamThread.State.PENDING_SHUTDOWN);
    }

    @Test
    public void testPartitionAssignmentChangeForMultipleGroups() throws Exception {
        this.builder.addSource("source1", new String[]{"topic1"});
        this.builder.addSource("source2", new String[]{"topic2"});
        this.builder.addSource("source3", new String[]{"topic3"});
        this.builder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source2", "source3"});
        StreamThread streamThread = getStreamThread();
        final HashMap hashMap = new HashMap();
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.3
            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return hashMap;
            }
        });
        streamThread.setStateListener(new StateListenerStub());
        Assert.assertEquals(streamThread.state(), StreamThread.State.CREATED);
        ConsumerRebalanceListener consumerRebalanceListener = streamThread.rebalanceListener;
        streamThread.setState(StreamThread.State.RUNNING);
        Assert.assertTrue(streamThread.tasks().isEmpty());
        consumerRebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertEquals(streamThread.state(), StreamThread.State.PARTITIONS_REVOKED);
        List asList = Arrays.asList(this.t2p1, this.t2p2, this.t3p1, this.t3p2);
        HashSet hashSet = new HashSet(Arrays.asList(this.t2p1, this.t3p1));
        HashSet hashSet2 = new HashSet(Arrays.asList(this.t2p2, this.t3p2));
        hashMap.put(new TaskId(1, 1), hashSet);
        hashMap.put(new TaskId(1, 2), hashSet2);
        consumerRebalanceListener.onPartitionsAssigned(asList);
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().containsKey(this.task3));
        Assert.assertTrue(streamThread.tasks().containsKey(this.task4));
        Assert.assertEquals(hashSet, ((StreamTask) streamThread.tasks().get(this.task3)).partitions());
        Assert.assertEquals(hashSet2, ((StreamTask) streamThread.tasks().get(this.task4)).partitions());
        Assert.assertEquals(2L, streamThread.tasks().size());
        consumerRebalanceListener.onPartitionsRevoked(asList);
        List asList2 = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        HashSet hashSet3 = new HashSet(Collections.singleton(this.t1p1));
        HashSet hashSet4 = new HashSet(Arrays.asList(this.t2p1, this.t3p1));
        hashMap.put(new TaskId(0, 1), hashSet3);
        hashMap.remove(this.task4);
        consumerRebalanceListener.onPartitionsAssigned(asList2);
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().containsKey(this.task1));
        Assert.assertTrue(streamThread.tasks().containsKey(this.task3));
        Assert.assertEquals(hashSet3, ((StreamTask) streamThread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(hashSet4, ((StreamTask) streamThread.tasks().get(this.task3)).partitions());
        Assert.assertEquals(2L, streamThread.tasks().size());
        consumerRebalanceListener.onPartitionsRevoked(asList2);
        List asList3 = Arrays.asList(this.t1p1, this.t2p1, this.t3p1);
        HashSet hashSet5 = new HashSet(Collections.singleton(this.t1p1));
        HashSet hashSet6 = new HashSet(Arrays.asList(this.t2p1, this.t3p1));
        consumerRebalanceListener.onPartitionsAssigned(asList3);
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().containsKey(this.task1));
        Assert.assertTrue(streamThread.tasks().containsKey(this.task3));
        Assert.assertEquals(hashSet5, ((StreamTask) streamThread.tasks().get(this.task1)).partitions());
        Assert.assertEquals(hashSet6, ((StreamTask) streamThread.tasks().get(this.task3)).partitions());
        Assert.assertEquals(2L, streamThread.tasks().size());
        consumerRebalanceListener.onPartitionsRevoked(asList3);
        consumerRebalanceListener.onPartitionsAssigned(Collections.emptyList());
        streamThread.runOnce(-1L);
        Assert.assertTrue(streamThread.tasks().isEmpty());
        streamThread.close();
        Assert.assertEquals(streamThread.state(), StreamThread.State.PENDING_SHUTDOWN);
    }

    @Test
    public void testStateChangeStartClose() throws InterruptedException {
        final StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        streamThread.setStateListener(new StateListenerStub());
        streamThread.start();
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.4
            public boolean conditionMet() {
                return streamThread.state() == StreamThread.State.RUNNING;
            }
        }, 10000L, "Thread never started.");
        streamThread.close();
        Assert.assertEquals(streamThread.state(), StreamThread.State.PENDING_SHUTDOWN);
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.5
            public boolean conditionMet() {
                return streamThread.state() == StreamThread.State.DEAD;
            }
        }, 10000L, "Thread never shut down.");
        streamThread.close();
        Assert.assertEquals(streamThread.state(), StreamThread.State.DEAD);
    }

    @Test
    public void testHandingOverTaskFromOneToAnotherThread() throws InterruptedException {
        this.builder.addStateStore(Stores.create("store").withByteArrayKeys().withByteArrayValues().persistent().build(), new String[0]);
        this.builder.addSource("source", new String[]{TOPIC});
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        TopicPartition topicPartition2 = new TopicPartition(TOPIC, 1);
        this.clientSupplier.consumer.assign(Arrays.asList(topicPartition, topicPartition2));
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 0L);
        hashMap.put(topicPartition2, 0L);
        this.clientSupplier.consumer.updateBeginningOffsets(hashMap);
        final StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId1", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        StreamThread streamThread2 = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId2", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        Map singletonMap = Collections.singletonMap(new TaskId(0, 0), this.task0Assignment);
        Map singletonMap2 = Collections.singletonMap(new TaskId(0, 1), this.task1Assignment);
        HashMap hashMap2 = new HashMap(singletonMap);
        HashMap hashMap3 = new HashMap(singletonMap2);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap2));
        streamThread2.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap3));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread2.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
        streamThread2.rebalanceListener.onPartitionsRevoked(Collections.EMPTY_SET);
        streamThread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        streamThread.runOnce(-1L);
        streamThread2.rebalanceListener.onPartitionsAssigned(this.task1Assignment);
        streamThread2.runOnce(-1L);
        HashSet hashSet = new HashSet();
        hashSet.addAll(streamThread.tasks().keySet());
        HashSet hashSet2 = new HashSet();
        hashSet2.addAll(streamThread2.tasks().keySet());
        streamThread.rebalanceListener.onPartitionsRevoked(this.task0Assignment);
        streamThread2.rebalanceListener.onPartitionsRevoked(this.task1Assignment);
        Assert.assertThat(streamThread.prevActiveTasks(), CoreMatchers.equalTo(hashSet));
        Assert.assertThat(streamThread2.prevActiveTasks(), CoreMatchers.equalTo(hashSet2));
        hashMap2.clear();
        hashMap2.putAll(singletonMap2);
        hashMap3.clear();
        hashMap3.putAll(singletonMap);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.6
            @Override // java.lang.Runnable
            public void run() {
                streamThread.rebalanceListener.onPartitionsAssigned(StreamThreadTest.this.task1Assignment);
                streamThread.runOnce(-1L);
            }
        });
        thread.start();
        streamThread2.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        streamThread2.runOnce(-1L);
        thread.join();
        Assert.assertThat(streamThread.tasks().keySet(), CoreMatchers.equalTo(hashSet2));
        Assert.assertThat(streamThread2.tasks().keySet(), CoreMatchers.equalTo(hashSet));
    }

    @Test
    public void testMetrics() {
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        String str = "thread." + streamThread.threadClientId();
        Map singletonMap = Collections.singletonMap("client-id", streamThread.threadClientId());
        Assert.assertNotNull(this.metrics.getSensor(str + ".commit-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".poll-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".process-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".punctuate-latency"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".task-created"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".task-closed"));
        Assert.assertNotNull(this.metrics.getSensor(str + ".skipped-records"));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-avg", "stream-metrics", "The average commit time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-latency-max", "stream-metrics", "The maximum commit time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("commit-rate", "stream-metrics", "The average per-second number of commit calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-avg", "stream-metrics", "The average poll time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-latency-max", "stream-metrics", "The maximum poll time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("poll-rate", "stream-metrics", "The average per-second number of record-poll calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-avg", "stream-metrics", "The average process time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-latency-max", "stream-metrics", "The maximum process time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("process-rate", "stream-metrics", "The average per-second number of process calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-avg", "stream-metrics", "The average punctuate time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-latency-max", "stream-metrics", "The maximum punctuate time in ms", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("punctuate-rate", "stream-metrics", "The average per-second number of punctuate calls", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-created-rate", "stream-metrics", "The average per-second number of newly created tasks", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("task-closed-rate", "stream-metrics", "The average per-second number of closed tasks", singletonMap)));
        Assert.assertNotNull(this.metrics.metrics().get(this.metrics.metricName("skipped-records-rate", "stream-metrics", "The average per-second number of skipped records.", singletonMap)));
    }

    @Test
    public void testMaybeCommit() throws IOException, InterruptedException {
        File file = Files.createTempDirectory("test", new FileAttribute[0]).toFile();
        try {
            Properties configProps = configProps(false);
            configProps.setProperty("state.dir", file.getCanonicalPath());
            configProps.setProperty("commit.interval.ms", Long.toString(1000L));
            StreamsConfig streamsConfig = new StreamsConfig(configProps);
            this.builder.addSource("source1", new String[]{"topic1"});
            StreamThread streamThread = new StreamThread(this.builder, streamsConfig, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.7
                public void maybeCommit(long j) {
                    super.maybeCommit(j);
                }

                protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                    return new TestStreamTask(taskId, this.applicationId, collection, this.builder.build(Integer.valueOf(taskId.topicGroupId)), this.consumer, StreamThreadTest.this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), this.stateDirectory, this.storeChangelogReader);
                }
            };
            initPartitionGrouper(streamsConfig, streamThread, this.clientSupplier);
            ConsumerRebalanceListener consumerRebalanceListener = streamThread.rebalanceListener;
            List emptyList = Collections.emptyList();
            List asList = Arrays.asList(this.t1p1, this.t1p2);
            streamThread.setState(StreamThread.State.RUNNING);
            streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
            consumerRebalanceListener.onPartitionsRevoked(emptyList);
            consumerRebalanceListener.onPartitionsAssigned(asList);
            streamThread.runOnce(-1L);
            Assert.assertEquals(2L, streamThread.tasks().size());
            this.mockTime.sleep(990L);
            streamThread.maybeCommit(this.mockTime.milliseconds());
            Iterator it = streamThread.tasks().values().iterator();
            while (it.hasNext()) {
                Assert.assertFalse(((TestStreamTask) ((StreamTask) it.next())).committed);
            }
            this.mockTime.sleep(11L);
            streamThread.maybeCommit(this.mockTime.milliseconds());
            for (StreamTask streamTask : streamThread.tasks().values()) {
                Assert.assertTrue(((TestStreamTask) streamTask).committed);
                ((TestStreamTask) streamTask).committed = false;
            }
            this.mockTime.sleep(990L);
            streamThread.maybeCommit(this.mockTime.milliseconds());
            Iterator it2 = streamThread.tasks().values().iterator();
            while (it2.hasNext()) {
                Assert.assertFalse(((TestStreamTask) ((StreamTask) it2.next())).committed);
            }
            this.mockTime.sleep(11L);
            streamThread.maybeCommit(this.mockTime.milliseconds());
            for (StreamTask streamTask2 : streamThread.tasks().values()) {
                Assert.assertTrue(((TestStreamTask) streamTask2).committed);
                ((TestStreamTask) streamTask2).committed = false;
            }
        } finally {
            Utils.delete(file);
        }
    }

    @Test
    public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        hashMap.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        Assert.assertEquals(1L, this.clientSupplier.producers.size());
        Producer producer = this.clientSupplier.producers.get(0);
        Assert.assertSame(producer, streamThread.threadProducer);
        Iterator it = streamThread.tasks().values().iterator();
        while (it.hasNext()) {
            Assert.assertSame(producer, ((StreamTask) it.next()).recordCollector().producer());
        }
        Assert.assertSame(this.clientSupplier.consumer, streamThread.consumer);
        Assert.assertSame(this.clientSupplier.restoreConsumer, streamThread.restoreConsumer);
    }

    @Test
    public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        MockClientSupplier mockClientSupplier = new MockClientSupplier("stream-thread-test");
        StreamThread streamThread = new StreamThread(this.builder, new StreamsConfig(configProps(true)), mockClientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        hashMap.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        hashMap.put(new TaskId(0, 2), Collections.singleton(new TopicPartition("someTopic", 2)));
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        HashSet hashSet = new HashSet();
        Collections.addAll(hashSet, new TopicPartition("someTopic", 0), new TopicPartition("someTopic", 2));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(hashSet);
        streamThread.runOnce(-1L);
        Assert.assertNull(streamThread.threadProducer);
        Assert.assertEquals(streamThread.tasks().size(), mockClientSupplier.producers.size());
        Iterator<MockProducer> it = mockClientSupplier.producers.iterator();
        Iterator it2 = streamThread.tasks().values().iterator();
        while (it2.hasNext()) {
            Assert.assertSame(it.next(), ((StreamTask) it2.next()).recordCollector().producer());
        }
        Assert.assertSame(mockClientSupplier.consumer, streamThread.consumer);
        Assert.assertSame(mockClientSupplier.restoreConsumer, streamThread.restoreConsumer);
    }

    @Test
    public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        StreamThread streamThread = new StreamThread(this.builder, new StreamsConfig(configProps(true)), new MockClientSupplier("stream-thread-test"), "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        hashMap.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        streamThread.close();
        streamThread.run();
        Iterator it = streamThread.tasks().values().iterator();
        while (it.hasNext()) {
            Assert.assertTrue(((StreamTask) it.next()).recordCollector().producer().closed());
        }
    }

    @Test
    public void shouldCloseThreadProducerOnCloseIfEosDisabled() {
        this.builder.addSource("source1", new String[]{"someTopic"});
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap hashMap = new HashMap();
        hashMap.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0)));
        hashMap.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1)));
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0)));
        streamThread.close();
        streamThread.run();
        Assert.assertTrue(streamThread.threadProducer.closed());
    }

    @Test
    public void shouldNotNullPointerWhenStandbyTasksAssignedAndNoStateStoresForTopology() throws Exception {
        this.builder.addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.8
            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return Collections.singletonMap(new TaskId(0, 0), Utils.mkSet(new TopicPartition[]{new TopicPartition(StreamThreadTest.TOPIC, 0)}));
            }
        });
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
    }

    @Test
    public void shouldNotCloseSuspendedTaskswice() throws Exception {
        this.builder.addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet(new TopicPartition[]{new TopicPartition(TOPIC, 0)}), this.builder.build(0), this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), this.mockTime), this.changelogReader);
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.9
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return testStreamTask;
            }
        };
        final HashSet hashSet = new HashSet();
        hashSet.add(new TopicPartition(TOPIC, 0));
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.10
            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return new HashMap<TaskId, Set<TopicPartition>>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.10.1
                    {
                        put(new TaskId(0, 0), hashSet);
                    }
                };
            }
        });
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.setState(StreamThread.State.PARTITIONS_REVOKED);
        streamThread.rebalanceListener.onPartitionsAssigned(hashSet);
        streamThread.runOnce(-1L);
        streamThread.rebalanceListener.onPartitionsRevoked(hashSet);
        Assert.assertTrue(testStreamTask.suspended);
        Assert.assertFalse(testStreamTask.closed);
        hashSet.clear();
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        Assert.assertTrue(testStreamTask.closed);
    }

    @Test
    public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(new String[]{"t1"}).groupByKey().count("count-one");
        kStreamBuilder.stream(new String[]{"t2"}).groupByKey().count("count-two");
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, (Node) null, new Node[0], new Node[0])));
        mockConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, (Node) null, new Node[0], new Node[0])));
        TopicPartition topicPartition = new TopicPartition("stream-thread-test-count-one-changelog", 0);
        TopicPartition topicPartition2 = new TopicPartition("stream-thread-test-count-two-changelog", 0);
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 0L);
        hashMap.put(topicPartition2, 0L);
        mockConsumer.updateBeginningOffsets(hashMap);
        final HashMap hashMap2 = new HashMap();
        hashMap2.put(new TaskId(0, 0), Utils.mkSet(new TopicPartition[]{new TopicPartition("t1", 0)}));
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.11
            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return hashMap2;
            }
        });
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        streamThread.runOnce(-1L);
        Assert.assertThat(mockConsumer.assignment(), CoreMatchers.equalTo(Utils.mkSet(new TopicPartition[]{topicPartition})));
        hashMap2.put(new TaskId(1, 0), Utils.mkSet(new TopicPartition[]{new TopicPartition("t2", 0)}));
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.emptyList());
        streamThread.runOnce(-1L);
        Assert.assertThat(mockConsumer.assignment(), CoreMatchers.equalTo(Utils.mkSet(new TopicPartition[]{topicPartition, topicPartition2})));
    }

    @Test
    public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(new String[]{"t1"}).groupByKey().count("count-one");
        kStreamBuilder.stream(new String[]{"t2"}).groupByKey().count("count-two");
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        MockConsumer<byte[], byte[]> mockConsumer = this.clientSupplier.restoreConsumer;
        mockConsumer.updatePartitions("stream-thread-test-count-one-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", 0, (Node) null, new Node[0], new Node[0])));
        mockConsumer.updatePartitions("stream-thread-test-count-two-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", 0, (Node) null, new Node[0], new Node[0])));
        HashMap hashMap = new HashMap();
        hashMap.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L);
        hashMap.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L);
        mockConsumer.updateEndOffsets(hashMap);
        mockConsumer.updateBeginningOffsets(hashMap);
        final HashMap hashMap2 = new HashMap();
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        Set mkSet = Utils.mkSet(new TopicPartition[]{topicPartition});
        hashMap2.put(new TaskId(0, 0), mkSet);
        final HashMap hashMap3 = new HashMap();
        TopicPartition topicPartition2 = new TopicPartition("t2", 0);
        Set mkSet2 = Utils.mkSet(new TopicPartition[]{topicPartition2});
        hashMap3.put(new TaskId(1, 0), mkSet2);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition2, 0L));
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.12
            Map<TaskId, Set<TopicPartition>> standbyTasks() {
                return hashMap2;
            }

            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return hashMap3;
            }
        });
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign(mkSet2);
        streamThread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(new TopicPartition[]{topicPartition2}));
        streamThread.runOnce(-1L);
        hashMap2.clear();
        hashMap3.clear();
        hashMap2.put(new TaskId(1, 0), Utils.mkSet(new TopicPartition[]{topicPartition2}));
        hashMap3.put(new TaskId(0, 0), Utils.mkSet(new TopicPartition[]{topicPartition}));
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign(mkSet);
        streamThread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(new TopicPartition[]{topicPartition}));
    }

    @Test
    public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(Pattern.compile("t.*")).to("out");
        final HashMap hashMap = new HashMap();
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.13
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                TestStreamTask testStreamTask = new TestStreamTask(taskId, this.applicationId, collection, this.builder.build(Integer.valueOf(taskId.topicGroupId)), this.consumer, StreamThreadTest.this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), this.stateDirectory, this.storeChangelogReader);
                hashMap.put(collection, testStreamTask);
                return testStreamTask;
            }
        };
        final HashMap hashMap2 = new HashMap();
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        HashSet hashSet = new HashSet();
        hashSet.add(topicPartition);
        TaskId taskId = new TaskId(0, 0);
        hashMap2.put(taskId, hashSet);
        streamThread.setPartitionAssignor(new StreamPartitionAssignor() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.14
            Map<TaskId, Set<TopicPartition>> activeTasks() {
                return hashMap2;
            }
        });
        StreamPartitionAssignor.SubscriptionUpdates subscriptionUpdates = new StreamPartitionAssignor.SubscriptionUpdates();
        Field declaredField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions");
        declaredField.setAccessible(true);
        Set set = (Set) declaredField.get(subscriptionUpdates);
        set.add(topicPartition.topic());
        kStreamBuilder.updateSubscriptions(subscriptionUpdates, (String) null);
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(hashSet);
        streamThread.runOnce(-1L);
        TestStreamTask testStreamTask = (TestStreamTask) hashMap.get(hashSet);
        Assert.assertThat(testStreamTask.id(), CoreMatchers.is(taskId));
        hashSet.add(new TopicPartition("t2", 0));
        set.add("t2");
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(hashSet);
        Assert.assertTrue("task should have been closed as assignment has changed", testStreamTask.closed);
        Assert.assertTrue("tasks state manager should have been closed as assignment has changed", testStreamTask.closedStateManager);
        Assert.assertThat(((TestStreamTask) hashMap.get(hashSet)).id(), CoreMatchers.is(taskId));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws Exception {
        this.builder.addSource("source", new String[]{TOPIC}).addSink("sink", "dummyTopic", new String[]{"source"});
        MockClientSupplier mockClientSupplier = new MockClientSupplier("stream-thread-test");
        final StreamThread streamThread = new StreamThread(this.builder, new StreamsConfig(configProps(true)), mockClientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        MockConsumer<byte[], byte[]> mockConsumer = mockClientSupplier.consumer;
        mockConsumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, (Node) null, (Node[]) null, (Node[]) null)));
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, this.task0Assignment);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        streamThread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        streamThread.runOnce(-1L);
        Assert.assertThat(Integer.valueOf(streamThread.tasks().size()), CoreMatchers.equalTo(1));
        final MockProducer mockProducer = mockClientSupplier.producers.get(0);
        mockConsumer.updateBeginningOffsets(Collections.singletonMap(this.task0Assignment.iterator().next(), 0L));
        mockConsumer.unsubscribe();
        mockConsumer.assign(this.task0Assignment);
        mockConsumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, new byte[0], new byte[0]));
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        streamThread.runOnce(-1L);
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.15
            public boolean conditionMet() {
                return mockProducer.history().size() == 1;
            }
        }, "StreamsThread did not produce output record.");
        Assert.assertFalse(mockProducer.transactionCommitted());
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.16
            public boolean conditionMet() {
                return mockProducer.commitCount() == 1;
            }
        }, "StreamsThread did not commit transaction.");
        mockProducer.fenceProducer();
        this.mockTime.sleep(this.config.getLong("commit.interval.ms").longValue() + 1);
        mockConsumer.addRecord(new ConsumerRecord(TOPIC, 0, 0L, new byte[0], new byte[0]));
        streamThread.runOnce(-1L);
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.17
            public boolean conditionMet() {
                return streamThread.tasks().isEmpty();
            }
        }, "StreamsThread did not remove fenced zombie task.");
        Assert.assertThat(Long.valueOf(mockProducer.commitCount()), CoreMatchers.equalTo(1L));
    }

    @Test
    public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() throws Exception {
        this.builder.addSource("name", new String[]{TOPIC}).addSink("out", "output", new String[0]);
        MockClientSupplier mockClientSupplier = new MockClientSupplier("stream-thread-test");
        StreamThread streamThread = new StreamThread(this.builder, new StreamsConfig(configProps(true)), mockClientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), new MockTime(), new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        HashMap hashMap = new HashMap();
        hashMap.put(this.task1, this.task0Assignment);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        streamThread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        streamThread.runOnce(-1L);
        Assert.assertThat(Integer.valueOf(streamThread.tasks().size()), CoreMatchers.equalTo(1));
        streamThread.rebalanceListener.onPartitionsRevoked((Collection) null);
        mockClientSupplier.producers.get(0).fenceProducer();
        streamThread.rebalanceListener.onPartitionsAssigned(this.task0Assignment);
        Assert.assertTrue(streamThread.tasks().isEmpty());
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(new String[]{"t1"}).groupByKey();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet(new TopicPartition[]{new TopicPartition("t1", 0)}), kStreamBuilder.build(0), this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), this.mockTime), this.changelogReader) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.18
            @Override // org.apache.kafka.streams.processor.internals.StreamThreadTest.TestStreamTask
            public void close(boolean z, boolean z2) {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.19
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return testStreamTask;
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(testStreamTask.id(), testStreamTask.partitions);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
        streamThread.close();
        streamThread.join();
        Assert.assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskFlushDuringShutdown() throws Exception {
        MockStateStoreSupplier.MockStateStore mockStateStore = new MockStateStoreSupplier.MockStateStore("foo", false);
        this.builder.stream(new String[]{"t1"}).groupByKey().count(new MockStateStoreSupplier(mockStateStore));
        TopicPartition topicPartition = new TopicPartition("t1", 0);
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet(new TopicPartition[]{topicPartition}), this.builder.build(0), this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(this.metrics), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), this.mockTime), this.changelogReader) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.20
            public void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.21
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return testStreamTask;
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(testStreamTask.id(), testStreamTask.partitions);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        this.clientSupplier.consumer.assign(testStreamTask.partitions);
        this.clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(topicPartition, 0L));
        streamThread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
        streamThread.runOnce(-1L);
        Assert.assertTrue(mockStateStore.isOpen());
        streamThread.shutdown(true);
        Assert.assertFalse("task shouldn't have been committed as there was an exception during shutdown", testStreamTask.committed);
        Assert.assertFalse(mockStateStore.isOpen());
    }

    @Test
    public void shouldCaptureCommitFailedExceptionOnTaskSuspension() throws Exception {
        this.builder.stream(new String[]{"t1"});
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet(new TopicPartition[]{new TopicPartition("t1", 0)}), this.builder.build(0), this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), this.mockTime), this.changelogReader) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.22
            public void suspend() {
                throw new CommitFailedException();
            }
        };
        StreamThread streamThread = new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.23
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return testStreamTask;
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(testStreamTask.id(), testStreamTask.partitions);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
        streamThread.runOnce(-1L);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertFalse(testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(new String[]{"t1"}).groupByKey();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet(new TopicPartition[]{new TopicPartition("t1", 0)}), kStreamBuilder.build(0), this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), this.mockTime), this.changelogReader) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.24
            public void suspend() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.25
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return testStreamTask;
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(testStreamTask.id(), testStreamTask.partitions);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
        streamThread.runOnce(-1L);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertFalse(testStreamTask.committed);
    }

    @Test
    public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(new String[]{"t1"}).groupByKey();
        final TestStreamTask testStreamTask = new TestStreamTask(new TaskId(0, 0), "stream-thread-test", Utils.mkSet(new TopicPartition[]{new TopicPartition("t1", 0)}), kStreamBuilder.build(0), this.clientSupplier.consumer, this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), new StateDirectory("stream-thread-test", this.config.getString("state.dir"), this.mockTime), this.changelogReader) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.26
            protected void flushState() {
                throw new RuntimeException("KABOOM!");
            }
        };
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.27
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return testStreamTask;
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(testStreamTask.id(), testStreamTask.partitions);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        streamThread.rebalanceListener.onPartitionsAssigned(testStreamTask.partitions);
        streamThread.runOnce(-1L);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        Assert.assertFalse(testStreamTask.committed);
    }

    @Test
    public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.addSource("source", Pattern.compile("t.*"));
        topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), new String[]{"source"});
        StreamThread streamThread = new StreamThread(topologyBuilder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, this.mockTime, new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory);
        StreamPartitionAssignor streamPartitionAssignor = new StreamPartitionAssignor();
        HashMap hashMap = new HashMap();
        hashMap.put("__stream.thread.instance__", streamThread);
        hashMap.put("num.standby.replicas", 0);
        streamPartitionAssignor.configure(hashMap);
        streamThread.setPartitionAssignor(streamPartitionAssignor);
        Field declaredField = topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics");
        declaredField.setAccessible(true);
        Map map = (Map) declaredField.get(topologyBuilder);
        ArrayList arrayList = new ArrayList();
        TopicPartition topicPartition = new TopicPartition("topic-1", 0);
        TopicPartition topicPartition2 = new TopicPartition("topic-2", 0);
        TopicPartition topicPartition3 = new TopicPartition("topic-3", 0);
        TaskId taskId = new TaskId(0, 0);
        TaskId taskId2 = new TaskId(0, 0);
        TaskId taskId3 = new TaskId(0, 0);
        List mkList = Utils.mkList(new TaskId[]{taskId});
        HashMap hashMap2 = new HashMap();
        AssignmentInfo assignmentInfo = new AssignmentInfo(mkList, hashMap2, new HashMap());
        arrayList.addAll(Utils.mkList(new TopicPartition[]{topicPartition}));
        streamPartitionAssignor.onAssignment(new PartitionAssignor.Assignment(arrayList, assignmentInfo.encode()));
        Assert.assertTrue(((List) map.get("source")).size() == 1);
        Assert.assertTrue(((List) map.get("source")).contains("topic-1"));
        arrayList.clear();
        AssignmentInfo assignmentInfo2 = new AssignmentInfo(Arrays.asList(taskId, taskId2), hashMap2, new HashMap());
        arrayList.addAll(Arrays.asList(topicPartition, topicPartition2));
        streamPartitionAssignor.onAssignment(new PartitionAssignor.Assignment(arrayList, assignmentInfo2.encode()));
        Assert.assertTrue(((List) map.get("source")).size() == 2);
        Assert.assertTrue(((List) map.get("source")).contains("topic-1"));
        Assert.assertTrue(((List) map.get("source")).contains("topic-2"));
        arrayList.clear();
        AssignmentInfo assignmentInfo3 = new AssignmentInfo(Arrays.asList(taskId, taskId2, taskId3), hashMap2, new HashMap());
        arrayList.addAll(Arrays.asList(topicPartition, topicPartition2, topicPartition3));
        streamPartitionAssignor.onAssignment(new PartitionAssignor.Assignment(arrayList, assignmentInfo3.encode()));
        Assert.assertTrue(((List) map.get("source")).size() == 3);
        Assert.assertTrue(((List) map.get("source")).contains("topic-1"));
        Assert.assertTrue(((List) map.get("source")).contains("topic-2"));
        Assert.assertTrue(((List) map.get("source")).contains("topic-3"));
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnTaskSuspend() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory mockStateDirInteractions = mockStateDirInteractions(taskId);
        setupTest(taskId, mockStateDirInteractions).rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        EasyMock.verify(new Object[]{mockStateDirInteractions});
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnTaskCloseForSuspendedTask() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory mockStateDirInteractions = mockStateDirInteractions(taskId);
        StreamThread streamThread = setupTest(taskId, mockStateDirInteractions);
        streamThread.close();
        streamThread.shutdown(true);
        EasyMock.verify(new Object[]{mockStateDirInteractions});
    }

    private StreamThread setupTest(final TaskId taskId, StateDirectory stateDirectory) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.table(TOPIC, TOPIC);
        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
        TopicPartition topicPartition = new TopicPartition(TOPIC, 0);
        Set mkSet = Utils.mkSet(new TopicPartition[]{topicPartition});
        Map singletonMap = Collections.singletonMap(topicPartition, 0L);
        mockClientSupplier.restoreConsumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, (Node) null, (Node[]) null, (Node[]) null)));
        mockClientSupplier.restoreConsumer.updateBeginningOffsets(singletonMap);
        mockClientSupplier.restoreConsumer.updateEndOffsets(singletonMap);
        mockClientSupplier.consumer.assign(mkSet);
        mockClientSupplier.consumer.updateBeginningOffsets(singletonMap);
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, mockClientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), new MockTime(), new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.28
            protected StreamTask createStreamTask(TaskId taskId2, Collection<TopicPartition> collection) {
                return new TestStreamTask(taskId, this.applicationId, collection, this.builder.build(0), mockClientSupplier.consumer, mockClientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), this.stateDirectory, this.storeChangelogReader) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.28.1
                    {
                        StreamThreadTest streamThreadTest = StreamThreadTest.this;
                    }

                    public void suspend() {
                        throw new RuntimeException("KABOOM!!!");
                    }
                };
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(taskId, mkSet);
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(this, hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
        streamThread.rebalanceListener.onPartitionsAssigned(mkSet);
        streamThread.runOnce(-1L);
        return streamThread;
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskSuspend() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory mockStateDirInteractions = mockStateDirInteractions(taskId);
        setupStandbyTest(taskId, mockStateDirInteractions).rebalanceListener.onPartitionsRevoked(Collections.emptyList());
        EasyMock.verify(new Object[]{mockStateDirInteractions});
    }

    @Test
    public void shouldReleaseStateDirLockIfFailureOnStandbyTaskCloseForUnassignedSuspendedStandbyTask() throws Exception {
        TaskId taskId = new TaskId(0, 0);
        StateDirectory mockStateDirInteractions = mockStateDirInteractions(taskId);
        StreamThread streamThread = setupStandbyTest(taskId, mockStateDirInteractions);
        streamThread.close();
        streamThread.shutdown(true);
        EasyMock.verify(new Object[]{mockStateDirInteractions});
    }

    private StateDirectory mockStateDirInteractions(TaskId taskId) throws IOException {
        StateDirectory stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(stateDirectory.lock((TaskId) EasyMock.eq(taskId), EasyMock.anyInt()))).andReturn(true);
        EasyMock.expect(stateDirectory.directoryForTask(taskId)).andReturn(new File(this.stateDir));
        EasyMock.expect(Boolean.valueOf(stateDirectory.unlock(taskId))).andReturn(true);
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{stateDirectory});
        return stateDirectory;
    }

    private StreamThread setupStandbyTest(final TaskId taskId, StateDirectory stateDirectory) {
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.setApplicationId("stream-thread-test");
        kStreamBuilder.stream(new String[]{"topic1"}).groupByKey().count("store");
        final MockClientSupplier mockClientSupplier = new MockClientSupplier();
        mockClientSupplier.restoreConsumer.updatePartitions("stream-thread-test-store-changelog", Collections.singletonList(new PartitionInfo("stream-thread-test-store-changelog", 0, (Node) null, (Node[]) null, (Node[]) null)));
        mockClientSupplier.restoreConsumer.updateBeginningOffsets(new HashMap<TopicPartition, Long>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.29
            {
                put(new TopicPartition("stream-thread-test-store-changelog", 0), 0L);
            }
        });
        mockClientSupplier.restoreConsumer.updateEndOffsets(new HashMap<TopicPartition, Long>() { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.30
            {
                put(new TopicPartition("stream-thread-test-store-changelog", 0), 0L);
            }
        });
        StreamThread streamThread = new StreamThread(kStreamBuilder, this.config, mockClientSupplier, "stream-thread-test", "clientId", this.processId, new Metrics(), new MockTime(), new StreamsMetadataState(kStreamBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.31
            protected StandbyTask createStandbyTask(TaskId taskId2, Collection<TopicPartition> collection) {
                return new StandbyTask(taskId, this.applicationId, collection, this.builder.build(0), mockClientSupplier.consumer, new StoreChangelogReader(getName(), mockClientSupplier.restoreConsumer), StreamThreadTest.this.config, new StreamsMetricsImpl(new Metrics(), "groupName", Collections.emptyMap()), this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.31.1
                    public void suspend() {
                        throw new RuntimeException("KABOOM!!!");
                    }

                    public void commit() {
                        throw new RuntimeException("KABOOM!!!");
                    }
                };
            }
        };
        HashMap hashMap = new HashMap();
        hashMap.put(taskId, Collections.singleton(new TopicPartition(TOPIC, 0)));
        streamThread.setPartitionAssignor(new MockStreamsPartitionAssignor(Collections.emptyMap(), hashMap));
        streamThread.setState(StreamThread.State.RUNNING);
        streamThread.rebalanceListener.onPartitionsRevoked(Collections.emptySet());
        streamThread.rebalanceListener.onPartitionsAssigned(Collections.emptySet());
        streamThread.runOnce(-1L);
        return streamThread;
    }

    private void initPartitionGrouper(StreamsConfig streamsConfig, StreamThread streamThread, MockClientSupplier mockClientSupplier) {
        StreamPartitionAssignor streamPartitionAssignor = new StreamPartitionAssignor();
        streamPartitionAssignor.configure(streamsConfig.getConsumerConfigs(streamThread, streamThread.applicationId, streamThread.clientId));
        streamPartitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer));
        streamPartitionAssignor.onAssignment((PartitionAssignor.Assignment) streamPartitionAssignor.assign(this.metadata, Collections.singletonMap("client", this.subscription)).get("client"));
    }

    private StreamThread getStreamThread() {
        return new StreamThread(this.builder, this.config, this.clientSupplier, "stream-thread-test", "clientId", this.processId, this.metrics, Time.SYSTEM, new StreamsMetadataState(this.builder, StreamsMetadataState.UNKNOWN_HOST), 0L, this.stateDirectory) { // from class: org.apache.kafka.streams.processor.internals.StreamThreadTest.32
            protected StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
                return new TestStreamTask(taskId, this.applicationId, collection, this.builder.build(Integer.valueOf(taskId.topicGroupId)), this.consumer, StreamThreadTest.this.clientSupplier.getProducer(new HashMap()), this.config, new MockStreamsMetrics(new Metrics()), this.stateDirectory, this.storeChangelogReader);
            }
        };
    }
}
