/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
import org.apache.kafka.streams.processor.internals.ProcessorStateManager;
import org.apache.kafka.streams.processor.internals.ProcessorTopology;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.StandbyTask;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTaskTest;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;

@RunWith(value=EasyMockRunner.class)
public class StandbyTaskTest {
    private final String threadName = "threadName";
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private final String storeName1 = "store1";
    private final String storeName2 = "store2";
    private final String applicationId = "test-application";
    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store1");
    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic((String)"test-application", (String)"store2");
    private final TopicPartition partition = new TopicPartition(this.storeChangelogTopicName1, 0);
    private final MockKeyValueStore store1 = (MockKeyValueStore)new MockKeyValueStoreBuilder("store1", false).build();
    private final MockKeyValueStore store2 = (MockKeyValueStore)new MockKeyValueStoreBuilder("store2", true).build();
    private final ProcessorTopology topology = ProcessorTopologyFactories.withLocalStores(Arrays.asList(this.store1, this.store2), Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"store1", (Object)this.storeChangelogTopicName1), Utils.mkEntry((Object)"store2", (Object)this.storeChangelogTopicName2)}));
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "threadName", "latest", (Time)new MockTime());
    private File baseDir;
    private StreamsConfig config;
    private StateDirectory stateDirectory;
    private StandbyTask task;
    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer(new IntegerSerializer(), new IntegerSerializer());
    @Mock(type=MockType.NICE)
    private ProcessorStateManager stateManager;

    private StreamsConfig createConfig(File baseDir) throws IOException {
        return new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"buffered.records.per.partition", (Object)"3"), Utils.mkEntry((Object)"state.dir", (Object)baseDir.getCanonicalPath()), Utils.mkEntry((Object)"default.timestamp.extractor", (Object)MockTimestampExtractor.class.getName())})));
    }

    @Before
    public void setup() throws Exception {
        EasyMock.expect((Object)this.stateManager.taskId()).andStubReturn((Object)this.taskId);
        EasyMock.expect((Object)this.stateManager.taskType()).andStubReturn((Object)Task.TaskType.STANDBY);
        this.restoreStateConsumer.reset();
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName1, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])));
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName2, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])));
        this.baseDir = TestUtils.tempDirectory();
        this.config = this.createConfig(this.baseDir);
        this.stateDirectory = new StateDirectory(this.config, (Time)new MockTime(), true);
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null) {
            block3: {
                try {
                    this.task.suspend();
                }
                catch (IllegalStateException maybeSwallow) {
                    if (maybeSwallow.getMessage().startsWith("Illegal state CLOSED while suspending standby task")) break block3;
                    throw maybeSwallow;
                }
            }
            this.task.closeDirty();
            this.task = null;
        }
        Utils.delete((File)this.baseDir);
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
        this.stateDirectory = (StateDirectory)EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect((Object)this.stateDirectory.lock(this.taskId)).andReturn((Object)false);
        EasyMock.expect((Object)this.stateManager.taskType()).andStubReturn((Object)Task.TaskType.STANDBY);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory, this.stateManager});
        this.task = this.createStandbyTask();
        Assert.assertThrows(LockException.class, () -> ((StandbyTask)this.task).initializeIfNeeded());
        this.task = null;
    }

    @Test
    public void shouldTransitToRunningAfterInitialization() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.registerStateStores((List)EasyMock.anyObject(), (InternalProcessorContext)EasyMock.anyObject());
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        Assert.assertEquals((Object)Task.State.CREATED, (Object)this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeClean();
        Assert.assertThrows(IllegalStateException.class, () -> ((StandbyTask)this.task).prepareCommit());
    }

    @Test
    public void shouldFlushAndCheckpointStateManagerOnCommit() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.flush();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L)).andReturn(Collections.singletonMap(this.partition, 11000L)).andReturn(Collections.singletonMap(this.partition, 11000L));
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L));
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        Assert.assertEquals(Collections.singletonMap(this.partition, 50L), (Object)this.task.changelogOffsets());
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotFlushAndThrowOnCloseDirty() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Flush should not be called"))).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)((Object)new AssertionError((Object)"Checkpoint should not be called"))).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldSuspendAndCommitBeforeCloseClean() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 60L));
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldRequireSuspendingCreatedTasksBeforeClose() {
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.CREATED));
        Assert.assertThrows(IllegalStateException.class, () -> this.task.closeClean());
        this.task.suspend();
        this.task.closeClean();
    }

    @Test
    public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L)).andReturn(Collections.singletonMap(this.partition, 10100L)).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        Assert.assertFalse((boolean)this.task.commitNeeded());
        Assert.assertTrue((boolean)this.task.commitNeeded());
        this.task.prepareCommit();
        this.task.postCommit(true);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanError() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assert.assertThrows(RuntimeException.class, () -> this.task.closeClean());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        EasyMock.reset((Object[])new Object[]{this.stateManager});
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow((Throwable)new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.prepareCommit();
        Assert.assertThrows(RuntimeException.class, () -> this.task.postCommit(true));
        Assert.assertEquals((Object)Task.State.RUNNING, (Object)this.task.state());
        double expectedCloseTaskMetric = 0.0;
        this.verifyCloseTaskMetric(0.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        EasyMock.reset((Object[])new Object[]{this.stateManager});
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeClean();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.expect((Object)this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
    }

    @Test
    public void shouldCloseStateManagerOnTaskCreated() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
    }

    @Test
    public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.stateManager.baseDir()).andReturn((Object)this.baseDir);
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.config = new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once")})));
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
    }

    @Test
    public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect((Object)this.stateManager.baseDir()).andReturn((Object)this.baseDir);
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        MetricName metricName = this.setupCloseTaskMetric();
        this.config = new StreamsConfig((Map)Utils.mkProperties((Map)Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)"application.id", (Object)"test-application"), Utils.mkEntry((Object)"bootstrap.servers", (Object)"localhost:2171"), Utils.mkEntry((Object)"processing.guarantee", (Object)"exactly_once_beta")})));
        this.task = this.createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        double expectedCloseTaskMetric = 1.0;
        this.verifyCloseTaskMetric(1.0, this.streamsMetrics, metricName);
        EasyMock.verify((Object[])new Object[]{this.stateManager});
        Assert.assertEquals((Object)Task.State.CLOSED, (Object)this.task.state());
    }

    @Test
    public void shouldRecycleTask() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.recycle();
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        Assert.assertThrows(IllegalStateException.class, () -> this.task.closeCleanAndRecycleState());
        this.task.initializeIfNeeded();
        Assert.assertThrows(IllegalStateException.class, () -> this.task.closeCleanAndRecycleState());
        this.task.suspend();
        this.task.closeCleanAndRecycleState();
        MatcherAssert.assertThat(this.getTaskMetrics(), (Matcher)Matchers.empty());
        EasyMock.verify((Object[])new Object[]{this.stateManager});
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        EasyMock.expect((Object)this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        this.task = this.createStandbyTask();
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.RUNNING));
        this.task.suspend();
        MatcherAssert.assertThat((Object)this.task.state(), (Matcher)CoreMatchers.equalTo((Object)Task.State.SUSPENDED));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        Logger log = new LogContext().logger(StreamTaskTest.class);
        this.task = this.createStandbyTask();
        this.task.maybeInitTaskTimeoutOrThrow(0L, null, log);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), null, log);
        Assert.assertThrows(TimeoutException.class, () -> this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null, log));
    }

    @Test
    public void shouldCLearTaskTimeout() {
        EasyMock.replay((Object[])new Object[]{this.stateManager});
        Logger log = new LogContext().logger(StreamTaskTest.class);
        this.task = this.createStandbyTask();
        this.task.maybeInitTaskTimeoutOrThrow(0L, null, log);
        this.task.clearTaskTimeout(log);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), null, log);
    }

    private StandbyTask createStandbyTask() {
        ThreadCache cache = new ThreadCache(new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), 0L, this.streamsMetrics);
        ProcessorContextImpl context = new ProcessorContextImpl(this.taskId, this.config, this.stateManager, this.streamsMetrics, cache);
        return new StandbyTask(this.taskId, Collections.singleton(this.partition), this.topology, this.config, this.streamsMetrics, this.stateManager, this.stateDirectory, cache, (InternalProcessorContext)context);
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        Sensor sensor = this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]);
        sensor.add(metricName, (MeasurableStat)new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double expected, StreamsMetricsImpl streamsMetrics, MetricName metricName) {
        KafkaMetric metric = (KafkaMetric)streamsMetrics.metrics().get(metricName);
        double totalCloses = metric.measurable().measure(metric.config(), System.currentTimeMillis());
        MatcherAssert.assertThat((Object)totalCloses, (Matcher)CoreMatchers.equalTo((Object)expected));
    }

    private List<MetricName> getTaskMetrics() {
        return this.streamsMetrics.metrics().keySet().stream().filter(m -> m.tags().containsKey("task-id")).collect(Collectors.toList());
    }
}

