package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.CumulativeSum;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.Task;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
import org.apache.kafka.streams.state.internals.ThreadCache;
import org.apache.kafka.test.MockKeyValueStore;
import org.apache.kafka.test.MockKeyValueStoreBuilder;
import org.apache.kafka.test.MockRestoreConsumer;
import org.apache.kafka.test.MockTimestampExtractor;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(EasyMockRunner.class)
/* loaded from: input_file:org/apache/kafka/streams/processor/internals/StandbyTaskTest.class */
public class StandbyTaskTest {
    private File baseDir;
    private StreamsConfig config;
    private StateDirectory stateDirectory;
    private StandbyTask task;

    @Mock(type = MockType.NICE)
    private ProcessorStateManager stateManager;
    private final String threadName = "threadName";
    private final String threadId = Thread.currentThread().getName();
    private final TaskId taskId = new TaskId(0, 0);
    private final String storeName1 = "store1";
    private final String storeName2 = "store2";
    private final String applicationId = "test-application";
    private final String storeChangelogTopicName1 = ProcessorStateManager.storeChangelogTopic("test-application", "store1");
    private final String storeChangelogTopicName2 = ProcessorStateManager.storeChangelogTopic("test-application", "store2");
    private final TopicPartition partition = new TopicPartition(this.storeChangelogTopicName1, 0);
    private final MockKeyValueStore store1 = (MockKeyValueStore) new MockKeyValueStoreBuilder("store1", false).m175build();
    private final MockKeyValueStore store2 = (MockKeyValueStore) new MockKeyValueStoreBuilder("store2", true).m175build();
    private final ProcessorTopology topology = ProcessorTopologyFactories.withLocalStores(Arrays.asList(this.store1, this.store2), Utils.mkMap(new Map.Entry[]{Utils.mkEntry("store1", this.storeChangelogTopicName1), Utils.mkEntry("store2", this.storeChangelogTopicName2)}));
    private final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(new Metrics(), "threadName", "latest", new MockTime());
    private final MockRestoreConsumer<Integer, Integer> restoreStateConsumer = new MockRestoreConsumer<>(new IntegerSerializer(), new IntegerSerializer());

    private StreamsConfig createConfig(File file) throws IOException {
        return new StreamsConfig(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "test-application"), Utils.mkEntry("bootstrap.servers", "localhost:2171"), Utils.mkEntry("buffered.records.per.partition", "3"), Utils.mkEntry("state.dir", file.getCanonicalPath()), Utils.mkEntry("default.timestamp.extractor", MockTimestampExtractor.class.getName())})));
    }

    @Before
    public void setup() throws Exception {
        EasyMock.expect(this.stateManager.taskId()).andStubReturn(this.taskId);
        EasyMock.expect(this.stateManager.taskType()).andStubReturn(Task.TaskType.STANDBY);
        this.restoreStateConsumer.reset();
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName1, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName1, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName1, 2, Node.noNode(), new Node[0], new Node[0])));
        this.restoreStateConsumer.updatePartitions(this.storeChangelogTopicName2, Arrays.asList(new PartitionInfo(this.storeChangelogTopicName2, 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo(this.storeChangelogTopicName2, 2, Node.noNode(), new Node[0], new Node[0])));
        this.baseDir = TestUtils.tempDirectory();
        this.config = createConfig(this.baseDir);
        this.stateDirectory = new StateDirectory(this.config, new MockTime(), true);
    }

    @After
    public void cleanup() throws IOException {
        if (this.task != null) {
            try {
                this.task.suspend();
            } catch (IllegalStateException e) {
                if (!e.getMessage().startsWith("Illegal state CLOSED while suspending standby task")) {
                    throw e;
                }
            }
            this.task.closeDirty();
            this.task = null;
        }
        Utils.delete(this.baseDir);
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectory() throws IOException {
        this.stateDirectory = (StateDirectory) EasyMock.createNiceMock(StateDirectory.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.taskId))).andReturn(false);
        EasyMock.expect(this.stateManager.taskType()).andStubReturn(Task.TaskType.STANDBY);
        EasyMock.replay(new Object[]{this.stateDirectory, this.stateManager});
        this.task = createStandbyTask();
        Assert.assertThrows(LockException.class, () -> {
            this.task.initializeIfNeeded();
        });
        this.task = null;
    }

    @Test
    public void shouldTransitToRunningAfterInitialization() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.registerStateStores((List) EasyMock.anyObject(), (InternalProcessorContext) EasyMock.anyObject());
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        Assert.assertEquals(Task.State.CREATED, this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        this.task.initializeIfNeeded();
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowIfCommittingOnIllegalState() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.suspend();
        this.task.closeClean();
        StandbyTask standbyTask = this.task;
        standbyTask.getClass();
        Assert.assertThrows(IllegalStateException.class, standbyTask::prepareCommit);
    }

    @Test
    public void shouldFlushAndCheckpointStateManagerOnCommit() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.flush();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L)).andReturn(Collections.singletonMap(this.partition, 11000L)).andReturn(Collections.singletonMap(this.partition, 11000L));
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.prepareCommit();
        this.task.postCommit(false);
        this.task.prepareCommit();
        this.task.postCommit(false);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldReturnStateManagerChangelogOffsets() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L));
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        Assert.assertEquals(Collections.singletonMap(this.partition, 50L), this.task.changelogOffsets());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotFlushAndThrowOnCloseDirty() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new ProcessorStateException("KABOOM!")).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall().andThrow(new AssertionError("Flush should not be called")).anyTimes();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new AssertionError("Checkpoint should not be called")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldNotThrowFromStateManagerCloseInCloseDirty() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldSuspendAndCommitBeforeCloseClean() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().once();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 60L));
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.prepareCommit();
        this.task.postCommit(true);
        this.task.closeClean();
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldRequireSuspendingCreatedTasksBeforeClose() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.CREATED));
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.closeClean();
        });
        this.task.suspend();
        this.task.closeClean();
    }

    @Test
    public void shouldOnlyNeedCommitWhenChangelogOffsetChanged() {
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L)).andReturn(Collections.singletonMap(this.partition, 10100L)).anyTimes();
        this.stateManager.flush();
        EasyMock.expectLastCall();
        this.stateManager.checkpoint();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        Assert.assertFalse(this.task.commitNeeded());
        Assert.assertTrue(this.task.commitNeeded());
        this.task.prepareCommit();
        this.task.postCommit(true);
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanError() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.close();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        Assert.assertThrows(RuntimeException.class, () -> {
            this.task.closeClean();
        });
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.singleton(this.partition)).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldThrowOnCloseCleanCheckpointError() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andReturn(Collections.singletonMap(this.partition, 50L));
        this.stateManager.checkpoint();
        EasyMock.expectLastCall().andThrow(new RuntimeException("KABOOM!")).anyTimes();
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.prepareCommit();
        Assert.assertThrows(RuntimeException.class, () -> {
            this.task.postCommit(true);
        });
        Assert.assertEquals(Task.State.RUNNING, this.task.state());
        verifyCloseTaskMetric(0.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        EasyMock.reset(new Object[]{this.stateManager});
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
    }

    @Test
    public void shouldUnregisterMetricsInCloseClean() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeClean();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void shouldUnregisterMetricsInCloseDirty() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.expect(this.stateManager.changelogPartitions()).andReturn(Collections.emptySet()).anyTimes();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        this.task.suspend();
        this.task.closeDirty();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
    }

    @Test
    public void shouldCloseStateManagerOnTaskCreated() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.task = createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
    }

    @Test
    public void shouldDeleteStateDirOnTaskCreatedAndEosAlphaUncleanClose() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect(this.stateManager.baseDir()).andReturn(this.baseDir);
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.config = new StreamsConfig(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "test-application"), Utils.mkEntry("bootstrap.servers", "localhost:2171"), Utils.mkEntry("processing.guarantee", "exactly_once")})));
        this.task = createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
    }

    @Test
    public void shouldDeleteStateDirOnTaskCreatedAndEosBetaUncleanClose() {
        this.stateManager.close();
        EasyMock.expectLastCall();
        EasyMock.expect(this.stateManager.baseDir()).andReturn(this.baseDir);
        EasyMock.replay(new Object[]{this.stateManager});
        MetricName metricName = setupCloseTaskMetric();
        this.config = new StreamsConfig(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", "test-application"), Utils.mkEntry("bootstrap.servers", "localhost:2171"), Utils.mkEntry("processing.guarantee", "exactly_once_beta")})));
        this.task = createStandbyTask();
        this.task.suspend();
        this.task.closeDirty();
        verifyCloseTaskMetric(1.0d, this.streamsMetrics, metricName);
        EasyMock.verify(new Object[]{this.stateManager});
        Assert.assertEquals(Task.State.CLOSED, this.task.state());
    }

    @Test
    public void shouldRecycleTask() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        this.stateManager.recycle();
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.closeCleanAndRecycleState();
        });
        this.task.initializeIfNeeded();
        Assert.assertThrows(IllegalStateException.class, () -> {
            this.task.closeCleanAndRecycleState();
        });
        this.task.suspend();
        this.task.closeCleanAndRecycleState();
        MatcherAssert.assertThat(getTaskMetrics(), Matchers.empty());
        EasyMock.verify(new Object[]{this.stateManager});
    }

    @Test
    public void shouldAlwaysSuspendCreatedTasks() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.CREATED));
        this.task.suspend();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldAlwaysSuspendRunningTasks() {
        EasyMock.expect(this.stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.initializeIfNeeded();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.RUNNING));
        this.task.suspend();
        MatcherAssert.assertThat(this.task.state(), CoreMatchers.equalTo(Task.State.SUSPENDED));
    }

    @Test
    public void shouldInitTaskTimeoutAndEventuallyThrow() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.maybeInitTaskTimeoutOrThrow(0L, (Exception) null);
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).toMillis(), (Exception) null);
        Assert.assertThrows(TimeoutException.class, () -> {
            this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), (Exception) null);
        });
    }

    @Test
    public void shouldCLearTaskTimeout() {
        EasyMock.replay(new Object[]{this.stateManager});
        this.task = createStandbyTask();
        this.task.maybeInitTaskTimeoutOrThrow(0L, (Exception) null);
        this.task.clearTaskTimeout();
        this.task.maybeInitTaskTimeoutOrThrow(Duration.ofMinutes(5L).plus(Duration.ofMillis(1L)).toMillis(), (Exception) null);
    }

    private StandbyTask createStandbyTask() {
        ThreadCache threadCache = new ThreadCache(new LogContext(String.format("stream-thread [%s] ", Thread.currentThread().getName())), 0L, this.streamsMetrics);
        return new StandbyTask(this.taskId, Collections.singleton(this.partition), this.topology, this.config, this.streamsMetrics, this.stateManager, this.stateDirectory, threadCache, new ProcessorContextImpl(this.taskId, this.config, this.stateManager, this.streamsMetrics, threadCache));
    }

    private MetricName setupCloseTaskMetric() {
        MetricName metricName = new MetricName("name", "group", "description", Collections.emptyMap());
        this.streamsMetrics.threadLevelSensor(this.threadId, "task-closed", Sensor.RecordingLevel.INFO, new Sensor[0]).add(metricName, new CumulativeSum());
        return metricName;
    }

    private void verifyCloseTaskMetric(double d, StreamsMetricsImpl streamsMetricsImpl, MetricName metricName) {
        KafkaMetric kafkaMetric = (KafkaMetric) streamsMetricsImpl.metrics().get(metricName);
        MatcherAssert.assertThat(Double.valueOf(kafkaMetric.measurable().measure(kafkaMetric.config(), System.currentTimeMillis())), CoreMatchers.equalTo(Double.valueOf(d)));
    }

    private List<MetricName> getTaskMetrics() {
        return (List) this.streamsMetrics.metrics().keySet().stream().filter(metricName -> {
            return metricName.tags().containsKey("task-id");
        }).collect(Collectors.toList());
    }
}
