package org.apache.kafka.streams.processor.internals;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractTaskTest.class */
public class AbstractTaskTest {
    private final TaskId id = new TaskId(0, 0);
    private StateDirectory stateDirectory = (StateDirectory) EasyMock.createMock(StateDirectory.class);

    @Before
    public void before() {
        EasyMock.expect(this.stateDirectory.directoryForTask(this.id)).andReturn(TestUtils.tempDirectory());
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() throws Exception {
        createTask(mockConsumer(new AuthorizationException("blah")), Collections.emptyList()).updateOffsetLimits();
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() throws Exception {
        createTask(mockConsumer(new KafkaException("blah")), Collections.emptyList()).updateOffsetLimits();
    }

    @Test(expected = WakeupException.class)
    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() throws Exception {
        createTask(mockConsumer(new WakeupException()), Collections.emptyList()).updateOffsetLimits();
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        StateStore stateStore = (StateStore) EasyMock.createNiceMock(StateStore.class);
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.id, 5))).andReturn(false);
        EasyMock.replay(new Object[]{this.stateDirectory});
        try {
            createTask(consumer, Collections.singletonList(stateStore)).initializeStateStores();
            Assert.fail("Should have thrown LockException");
        } catch (LockException e) {
        }
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() throws IOException {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        createTask(consumer, Collections.emptyList()).initializeStateStores();
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    private AbstractTask createTask(Consumer consumer, List<StateStore> list) {
        Properties properties = new Properties();
        properties.put("application.id", "app-id");
        properties.put("bootstrap.servers", "dummyhost:9092");
        return new AbstractTask(this.id, "app", Collections.singletonList(new TopicPartition("t", 0)), new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), list, Collections.emptyMap(), Collections.emptyList()), consumer, new StoreChangelogReader(consumer, Time.SYSTEM, 5000L), false, this.stateDirectory, new StreamsConfig(properties)) { // from class: org.apache.kafka.streams.processor.internals.AbstractTaskTest.1
            public void resume() {
            }

            public void commit() {
            }

            public void suspend() {
            }

            public void close(boolean z, boolean z2) {
            }

            public boolean initialize() {
                return false;
            }

            boolean process() {
                return false;
            }

            boolean maybePunctuate() {
                return false;
            }

            boolean commitNeeded() {
                return false;
            }
        };
    }

    private Consumer mockConsumer(final RuntimeException runtimeException) {
        return new MockConsumer(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.AbstractTaskTest.2
            public OffsetAndMetadata committed(TopicPartition topicPartition) {
                throw runtimeException;
            }
        };
    }
}
