package org.apache.kafka.streams.processor.internals;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.errors.LockException;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.processor.StateRestoreCallback;
import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
import org.apache.kafka.test.MockProcessorNode;
import org.apache.kafka.test.NoOpProcessorContext;
import org.apache.kafka.test.NoOpReadOnlyStore;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.class */
public class GlobalStateManagerImplTest {
    private GlobalStateManagerImpl stateManager;
    private NoOpProcessorContext context;
    private StateDirectory stateDirectory;
    private String stateDirPath;
    private NoOpReadOnlyStore<Object, Object> store1;
    private NoOpReadOnlyStore store2;
    private MockConsumer<byte[], byte[]> consumer;
    private File checkpointFile;
    private final TopicPartition t1 = new TopicPartition("t1", 1);
    private final TopicPartition t2 = new TopicPartition("t2", 1);
    private final TheStateRestoreCallback stateRestoreCallback = new TheStateRestoreCallback();

    /* loaded from: input_file:org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest$TheStateRestoreCallback.class */
    private static class TheStateRestoreCallback implements StateRestoreCallback {
        private final List<KeyValue<byte[], byte[]>> restored;

        private TheStateRestoreCallback() {
            this.restored = new ArrayList();
        }

        public void restore(byte[] bArr, byte[] bArr2) {
            this.restored.add(KeyValue.pair(bArr, bArr2));
        }
    }

    @Before
    public void before() throws IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("t1-store", "t1");
        hashMap.put("t2-store", "t2");
        HashMap hashMap2 = new HashMap();
        this.store1 = new NoOpReadOnlyStore<>("t1-store");
        hashMap2.put(this.store1, new MockProcessorNode(-1L));
        this.store2 = new NoOpReadOnlyStore("t2-store");
        hashMap2.put(this.store2, new MockProcessorNode(-1L));
        ProcessorTopology processorTopology = new ProcessorTopology(Collections.emptyList(), Collections.emptyMap(), Collections.emptyMap(), Collections.emptyList(), hashMap, Arrays.asList(this.store1, this.store2));
        this.context = new NoOpProcessorContext();
        this.stateDirPath = TestUtils.tempDirectory().getPath();
        this.stateDirectory = new StateDirectory("appId", this.stateDirPath);
        this.consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
        this.stateManager = new GlobalStateManagerImpl(processorTopology, this.consumer, this.stateDirectory);
        this.checkpointFile = new File(this.stateManager.baseDir(), ".checkpoint");
    }

    @After
    public void after() throws IOException {
        this.stateDirectory.unlockGlobalState();
    }

    @Test
    public void shouldLockGlobalStateDirectory() throws Exception {
        this.stateManager.initialize(this.context);
        Assert.assertTrue(new File(this.stateDirectory.globalStateDir(), ".lock").exists());
    }

    @Test(expected = LockException.class)
    public void shouldThrowStreamsExceptionIfCantGetLock() throws Exception {
        StateDirectory stateDirectory = new StateDirectory("appId", this.stateDirPath);
        try {
            stateDirectory.lockGlobalState(1);
            this.stateManager.initialize(this.context);
            stateDirectory.unlockGlobalState();
        } catch (Throwable th) {
            stateDirectory.unlockGlobalState();
            throw th;
        }
    }

    @Test
    public void shouldReadCheckpointOffsets() throws Exception {
        Map<TopicPartition, Long> writeCheckpoint = writeCheckpoint();
        this.stateManager.initialize(this.context);
        Assert.assertEquals(writeCheckpoint, this.stateManager.checkpointedOffsets());
    }

    @Test
    public void shouldDeleteCheckpointFileAfteLoaded() throws Exception {
        writeCheckpoint();
        this.stateManager.initialize(this.context);
        Assert.assertFalse(this.checkpointFile.exists());
    }

    @Test(expected = StreamsException.class)
    public void shouldThrowStreamsExceptionIfFailedToReadCheckpointedOffsets() throws Exception {
        FileOutputStream fileOutputStream = new FileOutputStream(new File(this.stateManager.baseDir(), ".checkpoint"));
        Throwable th = null;
        try {
            try {
                fileOutputStream.write("0\n1\nblah".getBytes());
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                this.stateManager.initialize(this.context);
            } finally {
            }
        } catch (Throwable th3) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldInitializeStateStores() throws Exception {
        this.stateManager.initialize(this.context);
        Assert.assertTrue(this.store1.initialized);
        Assert.assertTrue(this.store2.initialized);
    }

    @Test
    public void shouldReturnInitializedStoreNames() throws Exception {
        Assert.assertEquals(Utils.mkSet(new String[]{this.store1.name(), this.store2.name()}), this.stateManager.initialize(this.context));
    }

    @Test
    public void shouldThrowIllegalArgumentIfTryingToRegisterStoreThatIsNotGlobal() throws Exception {
        this.stateManager.initialize(this.context);
        try {
            this.stateManager.register(new NoOpReadOnlyStore("not-in-topology"), false, new TheStateRestoreCallback());
            Assert.fail("should have raised an illegal argument exception as store is not in the topology");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldThrowIllegalArgumenExceptionIfAttemptingToRegisterStoreTwice() throws Exception {
        this.stateManager.initialize(this.context);
        initializeConsumer(2L, 1L, this.t1);
        this.stateManager.register(this.store1, false, new TheStateRestoreCallback());
        try {
            this.stateManager.register(this.store1, false, new TheStateRestoreCallback());
            Assert.fail("should have raised an illegal argument exception as store has already been registered");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldThrowStreamsExceptionIfNoPartitionsFoundForStore() throws Exception {
        this.stateManager.initialize(this.context);
        try {
            this.stateManager.register(this.store1, false, new TheStateRestoreCallback());
            Assert.fail("Should have raised a StreamsException as there are no partition for the store");
        } catch (StreamsException e) {
        }
    }

    @Test
    public void shouldRestoreRecordsUpToHighwatermark() throws Exception {
        initializeConsumer(2L, 1L, this.t1);
        this.stateManager.initialize(this.context);
        this.stateManager.register(this.store1, false, new TheStateRestoreCallback());
        Assert.assertEquals(2L, r0.restored.size());
    }

    @Test
    public void shouldRestoreRecordsFromCheckpointToHighwatermark() throws Exception {
        initializeConsumer(5L, 6L, this.t1);
        new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint")).write(Collections.singletonMap(this.t1, 6L));
        this.stateManager.initialize(this.context);
        this.stateManager.register(this.store1, false, new TheStateRestoreCallback());
        Assert.assertEquals(5L, r0.restored.size());
    }

    @Test
    public void shouldFlushStateStores() throws Exception {
        this.stateManager.initialize(this.context);
        TheStateRestoreCallback theStateRestoreCallback = new TheStateRestoreCallback();
        initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(this.store1, false, theStateRestoreCallback);
        initializeConsumer(1L, 1L, this.t2);
        this.stateManager.register(this.store2, false, theStateRestoreCallback);
        this.stateManager.flush(this.context);
        Assert.assertTrue(this.store1.flushed);
        Assert.assertTrue(this.store2.flushed);
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreFlushFailed() throws Exception {
        this.stateManager.initialize(this.context);
        TheStateRestoreCallback theStateRestoreCallback = new TheStateRestoreCallback();
        initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(new NoOpReadOnlyStore(this.store1.name()) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.1
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void flush() {
                throw new RuntimeException("KABOOM!");
            }
        }, false, theStateRestoreCallback);
        this.stateManager.flush(this.context);
    }

    @Test
    public void shouldCloseStateStores() throws Exception {
        this.stateManager.initialize(this.context);
        TheStateRestoreCallback theStateRestoreCallback = new TheStateRestoreCallback();
        initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(this.store1, false, theStateRestoreCallback);
        initializeConsumer(1L, 1L, this.t2);
        this.stateManager.register(this.store2, false, theStateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
        Assert.assertFalse(this.store1.isOpen());
        Assert.assertFalse(this.store2.isOpen());
    }

    @Test
    public void shouldWriteCheckpointsOnClose() throws Exception {
        this.stateManager.initialize(this.context);
        TheStateRestoreCallback theStateRestoreCallback = new TheStateRestoreCallback();
        initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(this.store1, false, theStateRestoreCallback);
        Map singletonMap = Collections.singletonMap(this.t1, 25L);
        this.stateManager.close(singletonMap);
        Assert.assertEquals(singletonMap, new OffsetCheckpoint(new File(this.stateManager.baseDir(), ".checkpoint")).read());
    }

    @Test(expected = ProcessorStateException.class)
    public void shouldThrowProcessorStateStoreExceptionIfStoreCloseFailed() throws Exception {
        this.stateManager.initialize(this.context);
        initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(new NoOpReadOnlyStore(this.store1.name()) { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.2
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void close() {
                throw new RuntimeException("KABOOM!");
            }
        }, false, this.stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
    }

    @Test
    public void shouldThrowIllegalArgumentExceptionIfCallbackIsNull() throws Exception {
        this.stateManager.initialize(this.context);
        try {
            this.stateManager.register(this.store1, false, (StateRestoreCallback) null);
            Assert.fail("should have thrown due to null callback");
        } catch (IllegalArgumentException e) {
        }
    }

    @Test
    public void shouldUnlockGlobalStateDirectoryOnClose() throws Exception {
        this.stateManager.initialize(this.context);
        this.stateManager.close(Collections.emptyMap());
        StateDirectory stateDirectory = new StateDirectory("appId", this.stateDirPath);
        try {
            Assert.assertTrue(stateDirectory.lockGlobalState(1));
            stateDirectory.unlockGlobalState();
        } catch (Throwable th) {
            stateDirectory.unlockGlobalState();
            throw th;
        }
    }

    @Test
    public void shouldNotCloseStoresIfCloseAlreadyCalled() throws Exception {
        this.stateManager.initialize(this.context);
        initializeConsumer(1L, 1L, this.t1);
        this.stateManager.register(new NoOpReadOnlyStore("t1-store") { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.3
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void close() {
                if (!isOpen()) {
                    throw new RuntimeException("store already closed");
                }
                super.close();
            }
        }, false, this.stateRestoreCallback);
        this.stateManager.close(Collections.emptyMap());
        this.stateManager.close(Collections.emptyMap());
    }

    @Test
    public void shouldAttemptToCloseAllStoresEvenWhenSomeException() throws Exception {
        this.stateManager.initialize(this.context);
        initializeConsumer(1L, 1L, this.t1);
        initializeConsumer(1L, 1L, this.t2);
        NoOpReadOnlyStore noOpReadOnlyStore = new NoOpReadOnlyStore("t1-store") { // from class: org.apache.kafka.streams.processor.internals.GlobalStateManagerImplTest.4
            @Override // org.apache.kafka.test.NoOpReadOnlyStore
            public void close() {
                super.close();
                throw new RuntimeException("KABOOM!");
            }
        };
        this.stateManager.register(noOpReadOnlyStore, false, this.stateRestoreCallback);
        this.stateManager.register(this.store2, false, this.stateRestoreCallback);
        try {
            this.stateManager.close(Collections.emptyMap());
        } catch (ProcessorStateException e) {
        }
        Assert.assertFalse(noOpReadOnlyStore.isOpen());
        Assert.assertFalse(this.store2.isOpen());
    }

    @Test
    public void shouldReleaseLockIfExceptionWhenLoadingCheckpoints() throws Exception {
        FileOutputStream fileOutputStream = new FileOutputStream(new File(this.stateManager.baseDir(), ".checkpoint"));
        Throwable th = null;
        try {
            try {
                fileOutputStream.write("0\n1\nblah".getBytes());
                if (fileOutputStream != null) {
                    if (0 != 0) {
                        try {
                            fileOutputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        fileOutputStream.close();
                    }
                }
                try {
                    this.stateManager.initialize(this.context);
                } catch (StreamsException e) {
                }
                StateDirectory stateDirectory = new StateDirectory("appId", this.stateDirPath);
                try {
                    Assert.assertTrue(stateDirectory.lockGlobalState(1));
                    stateDirectory.unlockGlobalState();
                } catch (Throwable th3) {
                    stateDirectory.unlockGlobalState();
                    throw th3;
                }
            } finally {
            }
        } catch (Throwable th4) {
            if (fileOutputStream != null) {
                if (th != null) {
                    try {
                        fileOutputStream.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    fileOutputStream.close();
                }
            }
            throw th4;
        }
    }

    private void initializeConsumer(long j, long j2, TopicPartition topicPartition) {
        HashMap hashMap = new HashMap();
        hashMap.put(topicPartition, 1L);
        HashMap hashMap2 = new HashMap();
        hashMap2.put(topicPartition, Long.valueOf((j2 + j) - 1));
        this.consumer.updatePartitions(topicPartition.topic(), Collections.singletonList(new PartitionInfo(topicPartition.topic(), topicPartition.partition(), (Node) null, (Node[]) null, (Node[]) null)));
        this.consumer.assign(Collections.singletonList(topicPartition));
        this.consumer.updateEndOffsets(hashMap2);
        this.consumer.updateBeginningOffsets(hashMap);
        for (int i = 0; i < j; i++) {
            this.consumer.addRecord(new ConsumerRecord(topicPartition.topic(), topicPartition.partition(), j2 + i, "key".getBytes(), "value".getBytes()));
        }
    }

    private Map<TopicPartition, Long> writeCheckpoint() throws IOException {
        OffsetCheckpoint offsetCheckpoint = new OffsetCheckpoint(this.checkpointFile);
        Map<TopicPartition, Long> singletonMap = Collections.singletonMap(this.t1, 1L);
        offsetCheckpoint.write(singletonMap);
        return singletonMap;
    }
}
