/*
 * Decompiled with CFR 0.152.
 */
package org.apache.distributedlog.impl.metadata;

import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.distributedlog.DLMTestUtil;
import org.apache.distributedlog.DistributedLogConfiguration;
import org.apache.distributedlog.DistributedLogConstants;
import org.apache.distributedlog.LogSegmentMetadata;
import org.apache.distributedlog.TestZooKeeperClientBuilder;
import org.apache.distributedlog.ZooKeeperClient;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.apache.distributedlog.api.MetadataAccessor;
import org.apache.distributedlog.api.namespace.Namespace;
import org.apache.distributedlog.api.namespace.NamespaceBuilder;
import org.apache.distributedlog.exceptions.LockingException;
import org.apache.distributedlog.exceptions.LogExistsException;
import org.apache.distributedlog.exceptions.LogNotFoundException;
import org.apache.distributedlog.exceptions.ZKException;
import org.apache.distributedlog.impl.metadata.BKDLConfig;
import org.apache.distributedlog.impl.metadata.ZKLogStreamMetadataStore;
import org.apache.distributedlog.metadata.DLMetadata;
import org.apache.distributedlog.metadata.LogMetadata;
import org.apache.distributedlog.metadata.LogMetadataForWriter;
import org.apache.distributedlog.util.DLUtils;
import org.apache.distributedlog.util.Utils;
import org.apache.pulsar.shade.com.google.common.collect.Lists;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.common.util.OrderedScheduler;
import org.apache.pulsar.shade.org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.pulsar.shade.org.apache.bookkeeper.util.ZkUtils;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.LongVersion;
import org.apache.pulsar.shade.org.apache.bookkeeper.versioning.Versioned;
import org.apache.pulsar.shade.org.apache.zookeeper.AsyncCallback;
import org.apache.pulsar.shade.org.apache.zookeeper.CreateMode;
import org.apache.pulsar.shade.org.apache.zookeeper.KeeperException;
import org.apache.pulsar.shade.org.apache.zookeeper.Transaction;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooDefs;
import org.apache.pulsar.shade.org.apache.zookeeper.ZooKeeper;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestZKLogStreamMetadataStore
extends ZooKeeperClusterTestCase {
    private static final Logger logger = LoggerFactory.getLogger(TestZKLogStreamMetadataStore.class);
    private static final int sessionTimeoutMs = 30000;
    @Rule
    public TestName testName = new TestName();
    private ZooKeeperClient zkc;
    private URI uri;
    private OrderedScheduler scheduler;
    private ZKLogStreamMetadataStore metadataStore;

    private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier) throws Exception {
        String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
        String logSegmentsPath = logRootPath + "/ledgers";
        String maxTxIdPath = logRootPath + "/maxtxid";
        String lockPath = logRootPath + "/lock";
        String readLockPath = logRootPath + "/readLock";
        String versionPath = logRootPath + "/version";
        String allocationPath = logRootPath + "/allocation";
        Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0], zk.getDefaultACL(), CreateMode.PERSISTENT);
        Transaction txn = zk.get().transaction();
        txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(0L), zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L), zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES, zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES, zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(versionPath, ZKLogStreamMetadataStore.intToBytes(-1), zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES, zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.commit();
    }

    private static void createLog(ZooKeeperClient zk, URI uri, String logName, String logIdentifier, int numSegments) throws Exception {
        String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
        String logSegmentsPath = logRootPath + "/ledgers";
        String maxTxIdPath = logRootPath + "/maxtxid";
        String lockPath = logRootPath + "/lock";
        String readLockPath = logRootPath + "/readLock";
        String versionPath = logRootPath + "/version";
        String allocationPath = logRootPath + "/allocation";
        Utils.zkCreateFullPathOptimistic(zk, logRootPath, new byte[0], zk.getDefaultACL(), CreateMode.PERSISTENT);
        Transaction txn = zk.get().transaction();
        txn.create(logSegmentsPath, DLUtils.serializeLogSegmentSequenceNumber(0L), zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(maxTxIdPath, DLUtils.serializeTransactionId(0L), zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(lockPath, DistributedLogConstants.EMPTY_BYTES, zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(readLockPath, DistributedLogConstants.EMPTY_BYTES, zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(versionPath, ZKLogStreamMetadataStore.intToBytes(-1), zk.getDefaultACL(), CreateMode.PERSISTENT);
        txn.create(allocationPath, DistributedLogConstants.EMPTY_BYTES, zk.getDefaultACL(), CreateMode.PERSISTENT);
        for (int i = 0; i < numSegments; ++i) {
            LogSegmentMetadata segment = DLMTestUtil.completedLogSegment(logSegmentsPath, (long)i + 1L, 1L + (long)i * 1000L, (long)(i + 1) * 1000L, 1000, (long)i + 1L, 999L, 0L);
            txn.create(segment.getZkPath(), segment.getFinalisedData().getBytes(StandardCharsets.UTF_8), zk.getDefaultACL(), CreateMode.PERSISTENT);
        }
        txn.commit();
    }

    @Before
    public void setup() throws Exception {
        this.zkc = TestZooKeeperClientBuilder.newBuilder().name("zkc").uri(DLMTestUtil.createDLMURI(zkPort, "/")).sessionTimeoutMs(30000).build();
        this.uri = DLMTestUtil.createDLMURI(zkPort, "");
        try {
            ZkUtils.createFullPathOptimistic(this.zkc.get(), this.uri.getPath(), new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
        catch (KeeperException.NodeExistsException nee) {
            logger.debug("The namespace uri already exists.");
        }
        this.scheduler = (OrderedScheduler)OrderedScheduler.newSchedulerBuilder().name("test-scheduler").numThreads(1).build();
        this.metadataStore = new ZKLogStreamMetadataStore("test-logstream-metadata-store", new DistributedLogConfiguration(), this.zkc, this.scheduler, NullStatsLogger.INSTANCE);
    }

    @After
    public void teardown() throws Exception {
        if (null != this.metadataStore) {
            this.metadataStore.close();
        }
        if (null != this.scheduler) {
            this.scheduler.shutdown();
        }
        this.zkc.close();
    }

    @Test(timeout=60000L)
    public void testCheckLogMetadataPathsWithAllocator() throws Exception {
        String logRootPath = "/" + this.testName.getMethodName();
        List<Versioned<byte[]>> metadatas = Utils.ioResult(ZKLogStreamMetadataStore.checkLogMetadataPaths(this.zkc.get(), logRootPath, true));
        Assert.assertEquals((String)"Should have 8 paths", (long)8L, (long)metadatas.size());
        for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
            Assert.assertNull((Object)path.getValue());
            Assert.assertNull((Object)path.getVersion());
        }
    }

    @Test(timeout=60000L)
    public void testCheckLogMetadataPathsWithoutAllocator() throws Exception {
        String logRootPath = "/" + this.testName.getMethodName();
        List<Versioned<byte[]>> metadatas = Utils.ioResult(ZKLogStreamMetadataStore.checkLogMetadataPaths(this.zkc.get(), logRootPath, false));
        Assert.assertEquals((String)"Should have 7 paths", (long)7L, (long)metadatas.size());
        for (Versioned<byte[]> path : metadatas.subList(2, metadatas.size())) {
            Assert.assertNull((Object)path.getValue());
            Assert.assertNull((Object)path.getVersion());
        }
    }

    private void testCreateLogMetadataWithMissingPaths(URI uri, String logName, String logIdentifier, List<String> pathsToDelete, boolean ownAllocator, boolean createLogFirst) throws Exception {
        if (createLogFirst) {
            TestZKLogStreamMetadataStore.createLog(this.zkc, uri, logName, logIdentifier);
        }
        for (String path : pathsToDelete) {
            this.zkc.get().delete(path, -1);
        }
        LogMetadataForWriter logMetadata = Utils.ioResult(ZKLogStreamMetadataStore.getLog(uri, logName, logIdentifier, this.zkc, ownAllocator, true));
        String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
        List<Versioned<byte[]>> metadatas = Utils.ioResult(ZKLogStreamMetadataStore.checkLogMetadataPaths(this.zkc.get(), logRootPath, ownAllocator));
        if (ownAllocator) {
            Assert.assertEquals((String)("Should have 8 paths : ownAllocator = " + ownAllocator), (long)8L, (long)metadatas.size());
        } else {
            Assert.assertEquals((String)("Should have 7 paths : ownAllocator = " + ownAllocator), (long)7L, (long)metadatas.size());
        }
        for (Versioned<byte[]> metadata : metadatas) {
            Assert.assertTrue((boolean)ZKLogStreamMetadataStore.pathExists(metadata));
            Assert.assertTrue((((LongVersion)metadata.getVersion()).getLongVersion() >= 0L ? 1 : 0) != 0);
        }
        Versioned<byte[]> logSegmentsData = logMetadata.getMaxLSSNData();
        Assert.assertEquals((long)0L, (long)DLUtils.deserializeLogSegmentSequenceNumber(logSegmentsData.getValue()));
        Versioned<byte[]> maxTxIdData = logMetadata.getMaxTxIdData();
        Assert.assertEquals((long)0L, (long)DLUtils.deserializeTransactionId(maxTxIdData.getValue()));
        if (ownAllocator) {
            Versioned<byte[]> allocationData = logMetadata.getAllocationData();
            Assert.assertEquals((long)0L, (long)allocationData.getValue().length);
        }
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingLogSegmentsPath() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/ledgers");
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, false, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingMaxTxIdPath() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/maxtxid");
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, false, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingLockPath() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/lock");
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, false, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingReadLockPath() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/readLock");
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, false, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingVersionPath() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/version");
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, false, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingAllocatorPath() throws Exception {
        URI uri = DLMTestUtil.createDLMURI(zkPort, "");
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/allocation");
        this.testCreateLogMetadataWithMissingPaths(uri, logName, logIdentifier, pathsToDelete, true, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataMissingAllPath() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        ArrayList<String> pathsToDelete = Lists.newArrayList(logRootPath + "/ledgers", logRootPath + "/maxtxid", logRootPath + "/lock", logRootPath + "/readLock", logRootPath + "/version", logRootPath + "/allocation");
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, true, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataOnExistedLog() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        ArrayList<String> pathsToDelete = Lists.newArrayList();
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, true, true);
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadata() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        ArrayList<String> pathsToDelete = Lists.newArrayList();
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, true, false);
    }

    @Test(timeout=60000L, expected=LogNotFoundException.class)
    public void testCreateLogMetadataWithCreateIfNotExistsSetToFalse() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        Utils.ioResult(ZKLogStreamMetadataStore.getLog(this.uri, logName, logIdentifier, this.zkc, true, false));
    }

    @Test(timeout=60000L)
    public void testCreateLogMetadataWithCustomMetadata() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        ArrayList<String> pathsToDelete = Lists.newArrayList();
        DLMetadata.create(new BKDLConfig(zkServers, "/ledgers")).update(this.uri);
        Namespace namespace = NamespaceBuilder.newBuilder().conf(new DistributedLogConfiguration()).uri(this.uri).build();
        MetadataAccessor accessor = namespace.getNamespaceDriver().getMetadataAccessor(logName);
        accessor.createOrUpdateMetadata(logName.getBytes("UTF-8"));
        accessor.close();
        this.testCreateLogMetadataWithMissingPaths(this.uri, logName, logIdentifier, pathsToDelete, true, false);
    }

    @Test(timeout=60000L, expected=LogNotFoundException.class)
    public void testGetLogSegmentsLogNotFound() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        String logSegmentsPath = LogMetadata.getLogSegmentsPath(this.uri, logName, logIdentifier);
        FutureUtils.result(ZKLogStreamMetadataStore.getLogSegments(this.zkc, logSegmentsPath));
    }

    @Test(timeout=60000L)
    public void testGetLogSegmentsZKExceptions() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        ZooKeeper mockZk = (ZooKeeper)Mockito.mock(ZooKeeper.class);
        ZooKeeperClient mockZkc = (ZooKeeperClient)Mockito.mock(ZooKeeperClient.class);
        Mockito.when((Object)mockZkc.get()).thenReturn((Object)mockZk);
        ((ZooKeeper)Mockito.doAnswer(invocationOnMock -> {
            String path = (String)invocationOnMock.getArguments()[0];
            AsyncCallback.Children2Callback callback = (AsyncCallback.Children2Callback)invocationOnMock.getArguments()[2];
            callback.processResult(KeeperException.Code.BADVERSION.intValue(), path, null, null, null);
            return null;
        }).when((Object)mockZk)).getChildren(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), (AsyncCallback.Children2Callback)ArgumentMatchers.any(AsyncCallback.Children2Callback.class), ArgumentMatchers.any());
        String logSegmentsPath = LogMetadata.getLogSegmentsPath(this.uri, logName, logIdentifier);
        try {
            FutureUtils.result(ZKLogStreamMetadataStore.getLogSegments(mockZkc, logSegmentsPath));
            Assert.fail((String)"Should fail to get log segments when encountering zk exceptions");
        }
        catch (ZKException zke) {
            Assert.assertEquals((Object)KeeperException.Code.BADVERSION, (Object)zke.getKeeperExceptionCode());
        }
    }

    @Test(timeout=60000L)
    public void testGetLogSegments() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        TestZKLogStreamMetadataStore.createLog(this.zkc, this.uri, logName, logIdentifier, 5);
        List<LogSegmentMetadata> segments = FutureUtils.result(ZKLogStreamMetadataStore.getLogSegments(this.zkc, LogMetadata.getLogSegmentsPath(this.uri, logName, logIdentifier)));
        Assert.assertEquals((long)5L, (long)segments.size());
        for (int i = 0; i < 5; ++i) {
            Assert.assertEquals((long)(1L + (long)i), (long)segments.get(i).getLogSegmentSequenceNumber());
        }
    }

    @Test(timeout=60000L)
    public void testGetMissingPathsRecursive() throws Exception {
        List<String> missingPaths = FutureUtils.result(ZKLogStreamMetadataStore.getMissingPaths(this.zkc, this.uri, "path_missing/to/log"));
        Assert.assertEquals(Lists.newArrayList(this.uri.getPath() + "/path_missing/to/log", this.uri.getPath() + "/path_missing/to", this.uri.getPath() + "/path_missing"), missingPaths);
    }

    @Test(timeout=60000L)
    public void testGetMissingPathsRecursive2() throws Exception {
        String path = this.uri.getPath() + "/path_missing2/to/log";
        ZkUtils.createFullPathOptimistic(this.zkc.get(), path, DistributedLogConstants.EMPTY_BYTES, this.zkc.getDefaultACL(), CreateMode.PERSISTENT);
        List<String> missingPaths = FutureUtils.result(ZKLogStreamMetadataStore.getMissingPaths(this.zkc, this.uri, "path_missing2/to/log"));
        Assert.assertEquals(Collections.emptyList(), missingPaths);
    }

    @Test(timeout=60000L)
    public void testGetMissingPathsFailure() throws Exception {
        ZooKeeper mockZk = (ZooKeeper)Mockito.mock(ZooKeeper.class);
        ZooKeeperClient mockZkc = (ZooKeeperClient)Mockito.mock(ZooKeeperClient.class);
        Mockito.when((Object)mockZkc.get()).thenReturn((Object)mockZk);
        ((ZooKeeper)Mockito.doAnswer(invocationOnMock -> {
            String path = (String)invocationOnMock.getArguments()[0];
            AsyncCallback.StatCallback callback = (AsyncCallback.StatCallback)invocationOnMock.getArguments()[2];
            callback.processResult(KeeperException.Code.BADVERSION.intValue(), path, null, null);
            return null;
        }).when((Object)mockZk)).exists(ArgumentMatchers.anyString(), ArgumentMatchers.anyBoolean(), (AsyncCallback.StatCallback)ArgumentMatchers.any(AsyncCallback.StatCallback.class), ArgumentMatchers.any());
        try {
            FutureUtils.result(ZKLogStreamMetadataStore.getMissingPaths(mockZkc, this.uri, "path_failure/to/log_failure"));
            Assert.fail((String)"Should fail on getting missing paths on zookeeper exceptions.");
        }
        catch (ZKException zke) {
            Assert.assertEquals((Object)KeeperException.Code.BADVERSION, (Object)zke.getKeeperExceptionCode());
        }
    }

    @Test(timeout=60000L)
    public void testRenameLog() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        int numSegments = 5;
        TestZKLogStreamMetadataStore.createLog(this.zkc, this.uri, logName, logIdentifier, numSegments);
        String newLogName = "path_rename/to/new/" + logName;
        FutureUtils.result(this.metadataStore.renameLog(this.uri, logName, newLogName));
    }

    @Test(timeout=60000L, expected=LogExistsException.class)
    public void testRenameLogExists() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        int numSegments = 5;
        TestZKLogStreamMetadataStore.createLog(this.zkc, this.uri, logName, logIdentifier, numSegments);
        String newLogName = "path_rename_exists/to/new/" + logName;
        TestZKLogStreamMetadataStore.createLog(this.zkc, this.uri, newLogName, logIdentifier, 3);
        FutureUtils.result(this.metadataStore.renameLog(this.uri, logName, newLogName));
    }

    @Test(timeout=60000L, expected=LockingException.class)
    public void testRenameLockedLog() throws Exception {
        String logName = this.testName.getMethodName();
        String logIdentifier = "<default>";
        int numSegments = 5;
        TestZKLogStreamMetadataStore.createLog(this.zkc, this.uri, logName, logIdentifier, numSegments);
        String logRootPath = LogMetadata.getLogRootPath(this.uri, logName, logIdentifier);
        String lockPath = logRootPath + "/lock";
        this.zkc.get().create(lockPath + "/test", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        String newLogName = "path_rename_locked/to/new/" + logName;
        FutureUtils.result(this.metadataStore.renameLog(this.uri, logName, newLogName));
    }
}

