package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/AbstractTaskTest.class */
public class AbstractTaskTest {
    private final TaskId id = new TaskId(0, 0);
    private StateDirectory stateDirectory = (StateDirectory) EasyMock.createMock(StateDirectory.class);
    private final TopicPartition storeTopicPartition1 = new TopicPartition("t1", 0);
    private final TopicPartition storeTopicPartition2 = new TopicPartition("t2", 0);
    private final TopicPartition storeTopicPartition3 = new TopicPartition("t3", 0);
    private final TopicPartition storeTopicPartition4 = new TopicPartition("t4", 0);
    private final Collection<TopicPartition> storeTopicPartitions = Utils.mkSet(new TopicPartition[]{this.storeTopicPartition1, this.storeTopicPartition2, this.storeTopicPartition3, this.storeTopicPartition4});

    @Before
    public void before() {
        EasyMock.expect(this.stateDirectory.directoryForTask(this.id)).andReturn(TestUtils.tempDirectory());
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() {
        createTask(mockConsumer(new AuthorizationException("blah")), Collections.emptyMap()).updateOffsetLimits();
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() {
        createTask(mockConsumer(new KafkaException("blah")), Collections.emptyMap()).updateOffsetLimits();
    }

    @Test(expected = WakeupException.class)
    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
        createTask(mockConsumer(new WakeupException()), Collections.emptyMap()).updateOffsetLimits();
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        StateStore stateStore = (StateStore) EasyMock.createNiceMock(StateStore.class);
        EasyMock.expect(stateStore.name()).andReturn("dummy-store-name").anyTimes();
        EasyMock.replay(new Object[]{stateStore});
        EasyMock.expect(Boolean.valueOf(this.stateDirectory.lock(this.id))).andReturn(false);
        EasyMock.replay(new Object[]{this.stateDirectory});
        try {
            createTask(consumer, Collections.singletonMap(stateStore, "dummy")).registerStateStores();
            Assert.fail("Should have thrown LockException");
        } catch (LockException e) {
        }
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() {
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        EasyMock.replay(new Object[]{this.stateDirectory});
        createTask(consumer, Collections.emptyMap()).registerStateStores();
        EasyMock.verify(new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOException {
        StreamsConfig streamsConfig = new StreamsConfig(new Properties() { // from class: org.apache.kafka.streams.processor.internals.AbstractTaskTest.1
            {
                put("application.id", "app-id");
                put("bootstrap.servers", "localhost:9092");
                put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
            }
        });
        Consumer consumer = (Consumer) EasyMock.createNiceMock(Consumer.class);
        final StateStore stateStore = (StateStore) EasyMock.createNiceMock(StateStore.class);
        final StateStore stateStore2 = (StateStore) EasyMock.createNiceMock(StateStore.class);
        final StateStore stateStore3 = (StateStore) EasyMock.createNiceMock(StateStore.class);
        final StateStore stateStore4 = (StateStore) EasyMock.createNiceMock(StateStore.class);
        EasyMock.expect(stateStore.name()).andReturn("storeName1").anyTimes();
        EasyMock.replay(new Object[]{stateStore});
        EasyMock.expect(stateStore2.name()).andReturn("storeName2").anyTimes();
        EasyMock.replay(new Object[]{stateStore2});
        EasyMock.expect(stateStore3.name()).andReturn("storeName3").anyTimes();
        EasyMock.replay(new Object[]{stateStore3});
        EasyMock.expect(stateStore4.name()).andReturn("storeName4").anyTimes();
        EasyMock.replay(new Object[]{stateStore4});
        StateDirectory stateDirectory = new StateDirectory(streamsConfig, new MockTime(), true);
        AbstractTask createTask = createTask(consumer, new HashMap<StateStore, String>() { // from class: org.apache.kafka.streams.processor.internals.AbstractTaskTest.2
            {
                put(stateStore, AbstractTaskTest.this.storeTopicPartition1.topic());
                put(stateStore2, AbstractTaskTest.this.storeTopicPartition2.topic());
                put(stateStore3, AbstractTaskTest.this.storeTopicPartition3.topic());
                put(stateStore4, AbstractTaskTest.this.storeTopicPartition4.topic());
            }
        }, stateDirectory);
        String absolutePath = stateDirectory.directoryForTask(createTask.id).getAbsolutePath();
        File file = new File(absolutePath + File.separator + "rocksdb" + File.separator + "storeName1");
        File file2 = new File(absolutePath + File.separator + "rocksdb" + File.separator + "storeName2");
        File file3 = new File(absolutePath + File.separator + "storeName3");
        File file4 = new File(absolutePath + File.separator + "storeName4");
        File file5 = new File(file.getAbsolutePath() + File.separator + "testFile");
        File file6 = new File(file2.getAbsolutePath() + File.separator + "testFile");
        File file7 = new File(file3.getAbsolutePath() + File.separator + "testFile");
        File file8 = new File(file4.getAbsolutePath() + File.separator + "testFile");
        file.mkdirs();
        file2.mkdirs();
        file3.mkdirs();
        file4.mkdirs();
        file5.createNewFile();
        Assert.assertTrue(file5.exists());
        file6.createNewFile();
        Assert.assertTrue(file6.exists());
        file7.createNewFile();
        Assert.assertTrue(file7.exists());
        file8.createNewFile();
        Assert.assertTrue(file8.exists());
        createTask.processorContext = new InternalMockProcessorContext(stateDirectory.directoryForTask(createTask.id), streamsConfig);
        createTask.stateMgr.register(stateStore, new MockRestoreCallback());
        createTask.stateMgr.register(stateStore2, new MockRestoreCallback());
        createTask.stateMgr.register(stateStore3, new MockRestoreCallback());
        createTask.stateMgr.register(stateStore4, new MockRestoreCallback());
        createTask.reinitializeStateStoresForPartitions(Utils.mkSet(new TopicPartition[]{this.storeTopicPartition1, this.storeTopicPartition3}));
        Assert.assertFalse(file5.exists());
        Assert.assertTrue(file6.exists());
        Assert.assertFalse(file7.exists());
        Assert.assertTrue(file8.exists());
    }

    private AbstractTask createTask(Consumer consumer, Map<StateStore, String> map) {
        return createTask(consumer, map, this.stateDirectory);
    }

    private AbstractTask createTask(Consumer consumer, Map<StateStore, String> map, StateDirectory stateDirectory) {
        Properties properties = new Properties();
        properties.put("application.id", "app");
        properties.put("bootstrap.servers", "dummyhost:9092");
        StreamsConfig streamsConfig = new StreamsConfig(properties);
        HashMap hashMap = new HashMap(map.size());
        for (Map.Entry<StateStore, String> entry : map.entrySet()) {
            hashMap.put(entry.getKey().name(), entry.getValue());
        }
        return new AbstractTask(this.id, this.storeTopicPartitions, ProcessorTopologyFactories.withLocalStores(new ArrayList(map.keySet()), hashMap), consumer, new StoreChangelogReader(consumer, Duration.ZERO, new MockStateRestoreListener(), new LogContext("stream-task-test ")), false, stateDirectory, streamsConfig) { // from class: org.apache.kafka.streams.processor.internals.AbstractTaskTest.3
            public void resume() {
            }

            public void commit() {
            }

            public void suspend() {
            }

            public void close(boolean z, boolean z2) {
            }

            public void closeSuspended(boolean z, boolean z2, RuntimeException runtimeException) {
            }

            public boolean initializeStateStores() {
                return false;
            }

            public void initializeTopology() {
            }
        };
    }

    private Consumer mockConsumer(final RuntimeException runtimeException) {
        return new MockConsumer(OffsetResetStrategy.EARLIEST) { // from class: org.apache.kafka.streams.processor.internals.AbstractTaskTest.4
            public OffsetAndMetadata committed(TopicPartition topicPartition) {
                throw runtimeException;
            }
        };
    }
}
