/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.processor.StateRestoreListener;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.internals.AbstractTask;
import org.apache.kafka.streams.processor.internals.ChangelogReader;
import org.apache.kafka.streams.processor.internals.ProcessorTopologyFactories;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StoreChangelogReader;
import org.apache.kafka.test.InternalMockProcessorContext;
import org.apache.kafka.test.MockRestoreCallback;
import org.apache.kafka.test.MockStateRestoreListener;
import org.apache.kafka.test.TestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class AbstractTaskTest {
    private final TaskId id = new TaskId(0, 0);
    private StateDirectory stateDirectory = (StateDirectory)EasyMock.createMock(StateDirectory.class);
    private final TopicPartition storeTopicPartition1 = new TopicPartition("t1", 0);
    private final TopicPartition storeTopicPartition2 = new TopicPartition("t2", 0);
    private final TopicPartition storeTopicPartition3 = new TopicPartition("t3", 0);
    private final TopicPartition storeTopicPartition4 = new TopicPartition("t4", 0);
    private final Collection<TopicPartition> storeTopicPartitions = Utils.mkSet((Object[])new TopicPartition[]{this.storeTopicPartition1, this.storeTopicPartition2, this.storeTopicPartition3, this.storeTopicPartition4});

    @Before
    public void before() {
        EasyMock.expect((Object)this.stateDirectory.directoryForTask(this.id)).andReturn((Object)TestUtils.tempDirectory());
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenAuthorizationException() {
        Consumer consumer = this.mockConsumer((RuntimeException)new AuthorizationException("blah"));
        AbstractTask task = this.createTask(consumer, Collections.emptyMap());
        task.updateOffsetLimits();
    }

    @Test(expected=ProcessorStateException.class)
    public void shouldThrowProcessorStateExceptionOnInitializeOffsetsWhenKafkaException() {
        Consumer consumer = this.mockConsumer((RuntimeException)new KafkaException("blah"));
        AbstractTask task = this.createTask(consumer, Collections.emptyMap());
        task.updateOffsetLimits();
    }

    @Test(expected=WakeupException.class)
    public void shouldThrowWakeupExceptionOnInitializeOffsetsWhenWakeupException() {
        Consumer consumer = this.mockConsumer((RuntimeException)new WakeupException());
        AbstractTask task = this.createTask(consumer, Collections.emptyMap());
        task.updateOffsetLimits();
    }

    @Test
    public void shouldThrowLockExceptionIfFailedToLockStateDirectoryWhenTopologyHasStores() throws IOException {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        StateStore store = (StateStore)EasyMock.createNiceMock(StateStore.class);
        EasyMock.expect((Object)store.name()).andReturn((Object)"dummy-store-name").anyTimes();
        EasyMock.replay((Object[])new Object[]{store});
        EasyMock.expect((Object)this.stateDirectory.lock(this.id)).andReturn((Object)false);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        AbstractTask task = this.createTask(consumer, Collections.singletonMap(store, "dummy"));
        try {
            task.registerStateStores();
            Assert.fail((String)"Should have thrown LockException");
        }
        catch (LockException lockException) {
            // empty catch block
        }
    }

    @Test
    public void shouldNotAttemptToLockIfNoStores() {
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        EasyMock.replay((Object[])new Object[]{this.stateDirectory});
        AbstractTask task = this.createTask(consumer, Collections.emptyMap());
        task.registerStateStores();
        EasyMock.verify((Object[])new Object[]{this.stateDirectory});
    }

    @Test
    public void shouldDeleteAndRecreateStoreDirectoryOnReinitialize() throws IOException {
        StreamsConfig streamsConfig = new StreamsConfig((Map)new Properties(){
            {
                this.put("application.id", "app-id");
                this.put("bootstrap.servers", "localhost:9092");
                this.put("state.dir", TestUtils.tempDirectory().getAbsolutePath());
            }
        });
        Consumer consumer = (Consumer)EasyMock.createNiceMock(Consumer.class);
        final StateStore store1 = (StateStore)EasyMock.createNiceMock(StateStore.class);
        final StateStore store2 = (StateStore)EasyMock.createNiceMock(StateStore.class);
        final StateStore store3 = (StateStore)EasyMock.createNiceMock(StateStore.class);
        final StateStore store4 = (StateStore)EasyMock.createNiceMock(StateStore.class);
        String storeName1 = "storeName1";
        String storeName2 = "storeName2";
        String storeName3 = "storeName3";
        String storeName4 = "storeName4";
        EasyMock.expect((Object)store1.name()).andReturn((Object)"storeName1").anyTimes();
        EasyMock.replay((Object[])new Object[]{store1});
        EasyMock.expect((Object)store2.name()).andReturn((Object)"storeName2").anyTimes();
        EasyMock.replay((Object[])new Object[]{store2});
        EasyMock.expect((Object)store3.name()).andReturn((Object)"storeName3").anyTimes();
        EasyMock.replay((Object[])new Object[]{store3});
        EasyMock.expect((Object)store4.name()).andReturn((Object)"storeName4").anyTimes();
        EasyMock.replay((Object[])new Object[]{store4});
        StateDirectory stateDirectory = new StateDirectory(streamsConfig, (Time)new MockTime());
        AbstractTask task = this.createTask(consumer, (Map<StateStore, String>)new HashMap<StateStore, String>(){
            {
                this.put(store1, AbstractTaskTest.this.storeTopicPartition1.topic());
                this.put(store2, AbstractTaskTest.this.storeTopicPartition2.topic());
                this.put(store3, AbstractTaskTest.this.storeTopicPartition3.topic());
                this.put(store4, AbstractTaskTest.this.storeTopicPartition4.topic());
            }
        }, stateDirectory);
        String taskDir = stateDirectory.directoryForTask(task.id).getAbsolutePath();
        File storeDirectory1 = new File(taskDir + File.separator + "rocksdb" + File.separator + "storeName1");
        File storeDirectory2 = new File(taskDir + File.separator + "rocksdb" + File.separator + "storeName2");
        File storeDirectory3 = new File(taskDir + File.separator + "storeName3");
        File storeDirectory4 = new File(taskDir + File.separator + "storeName4");
        File testFile1 = new File(storeDirectory1.getAbsolutePath() + File.separator + "testFile");
        File testFile2 = new File(storeDirectory2.getAbsolutePath() + File.separator + "testFile");
        File testFile3 = new File(storeDirectory3.getAbsolutePath() + File.separator + "testFile");
        File testFile4 = new File(storeDirectory4.getAbsolutePath() + File.separator + "testFile");
        storeDirectory1.mkdirs();
        storeDirectory2.mkdirs();
        storeDirectory3.mkdirs();
        storeDirectory4.mkdirs();
        testFile1.createNewFile();
        Assert.assertTrue((boolean)testFile1.exists());
        testFile2.createNewFile();
        Assert.assertTrue((boolean)testFile2.exists());
        testFile3.createNewFile();
        Assert.assertTrue((boolean)testFile3.exists());
        testFile4.createNewFile();
        Assert.assertTrue((boolean)testFile4.exists());
        task.processorContext = new InternalMockProcessorContext(stateDirectory.directoryForTask(task.id), streamsConfig);
        task.stateMgr.register(store1, (StateRestoreCallback)new MockRestoreCallback());
        task.stateMgr.register(store2, (StateRestoreCallback)new MockRestoreCallback());
        task.stateMgr.register(store3, (StateRestoreCallback)new MockRestoreCallback());
        task.stateMgr.register(store4, (StateRestoreCallback)new MockRestoreCallback());
        task.reinitializeStateStoresForPartitions((Collection)Utils.mkSet((Object[])new TopicPartition[]{this.storeTopicPartition1, this.storeTopicPartition3}));
        Assert.assertFalse((boolean)testFile1.exists());
        Assert.assertTrue((boolean)testFile2.exists());
        Assert.assertFalse((boolean)testFile3.exists());
        Assert.assertTrue((boolean)testFile4.exists());
    }

    private AbstractTask createTask(Consumer consumer, Map<StateStore, String> stateStoresToChangelogTopics) {
        return this.createTask(consumer, stateStoresToChangelogTopics, this.stateDirectory);
    }

    private AbstractTask createTask(Consumer consumer, Map<StateStore, String> stateStoresToChangelogTopics, StateDirectory stateDirectory) {
        Properties properties = new Properties();
        properties.put("application.id", "app");
        properties.put("bootstrap.servers", "dummyhost:9092");
        StreamsConfig config = new StreamsConfig((Map)properties);
        HashMap<String, String> storeNamesToChangelogTopics = new HashMap<String, String>(stateStoresToChangelogTopics.size());
        for (Map.Entry<StateStore, String> e : stateStoresToChangelogTopics.entrySet()) {
            storeNamesToChangelogTopics.put(e.getKey().name(), e.getValue());
        }
        return new AbstractTask(this.id, this.storeTopicPartitions, ProcessorTopologyFactories.withLocalStores(new ArrayList<StateStore>(stateStoresToChangelogTopics.keySet()), storeNamesToChangelogTopics), consumer, (ChangelogReader)new StoreChangelogReader(consumer, Duration.ZERO, (StateRestoreListener)new MockStateRestoreListener(), new LogContext("stream-task-test ")), false, stateDirectory, config){

            public void resume() {
            }

            public void commit() {
            }

            public void suspend() {
            }

            public void close(boolean clean, boolean isZombie) {
            }

            public void closeSuspended(boolean clean, boolean isZombie, RuntimeException e) {
            }

            public boolean initializeStateStores() {
                return false;
            }

            public void initializeTopology() {
            }
        };
    }

    private Consumer mockConsumer(final RuntimeException toThrow) {
        return new MockConsumer(OffsetResetStrategy.EARLIEST){

            public OffsetAndMetadata committed(TopicPartition partition) {
                throw toThrow;
            }
        };
    }
}

