package org.apache.bookkeeper.stream.storage.impl.sc;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.bookkeeper.clients.utils.NetUtils;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.testing.MoreAsserts;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.cluster.ClusterAssignmentData;
import org.apache.bookkeeper.stream.proto.cluster.ServerAssignmentData;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainer;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerFactory;
import org.apache.bookkeeper.stream.storage.api.sc.StorageContainerRegistry;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.cluster.ZkClusterMetadataStore;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.distributedlog.ZooKeeperClusterTestCase;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/sc/ZkStorageContainerManagerTest.class */
public class ZkStorageContainerManagerTest extends ZooKeeperClusterTestCase {
    private static final int NUM_STORAGE_CONTAINERS = 32;

    @Rule
    public final TestName runtime = new TestName();
    private final Endpoint myEndpoint = Endpoint.newBuilder().setHostname("127.0.0.1").setPort(4181).build();
    private CuratorFramework curatorClient;
    private StorageContainerFactory mockScFactory;
    private StorageContainerRegistry scRegistry;
    private ZkClusterMetadataStore clusterMetadataStore;
    private ZkStorageContainerManager scManager;

    @Before
    public void setup() {
        this.curatorClient = CuratorFrameworkFactory.newClient(zkServers, new ExponentialBackoffRetry(200, 10, 5000));
        this.curatorClient.start();
        this.clusterMetadataStore = (ZkClusterMetadataStore) Mockito.spy(new ZkClusterMetadataStore(this.curatorClient, zkServers, "/" + this.runtime.getMethodName()));
        this.clusterMetadataStore.initializeCluster(NUM_STORAGE_CONTAINERS);
        this.mockScFactory = (StorageContainerFactory) Mockito.mock(StorageContainerFactory.class);
        this.scRegistry = (StorageContainerRegistry) Mockito.spy(new StorageContainerRegistryImpl(this.mockScFactory));
        this.scManager = new ZkStorageContainerManager(this.myEndpoint, new StorageConfiguration(new CompositeConfiguration()).setClusterControllerScheduleInterval(1L, TimeUnit.SECONDS), this.clusterMetadataStore, this.scRegistry, NullStatsLogger.INSTANCE);
    }

    @After
    public void teardown() {
        if (null != this.scManager) {
            this.scManager.close();
        }
        if (null != this.curatorClient) {
            this.curatorClient.close();
        }
        if (null != this.clusterMetadataStore) {
            this.clusterMetadataStore.close();
        }
    }

    private static StorageContainer createStorageContainer(long j, CompletableFuture<StorageContainer> completableFuture, CompletableFuture<Void> completableFuture2) {
        StorageContainer storageContainer = (StorageContainer) Mockito.mock(StorageContainer.class);
        Mockito.when(Long.valueOf(storageContainer.getId())).thenReturn(Long.valueOf(j));
        Mockito.when(storageContainer.start()).thenReturn(completableFuture);
        Mockito.when(storageContainer.stop()).thenReturn(completableFuture2);
        return storageContainer;
    }

    @Test
    public void testBasicOps() throws Exception {
        this.scManager.start();
        long j = 11;
        long j2 = 22;
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        CompletableFuture completableFuture3 = new CompletableFuture();
        CompletableFuture completableFuture4 = new CompletableFuture();
        StorageContainer createStorageContainer = createStorageContainer(11L, completableFuture, completableFuture2);
        Mockito.when(this.mockScFactory.createStorageContainer(ArgumentMatchers.eq(11L))).thenReturn(createStorageContainer);
        StorageContainer createStorageContainer2 = createStorageContainer(22L, completableFuture3, completableFuture4);
        Mockito.when(this.mockScFactory.createStorageContainer(ArgumentMatchers.eq(22L))).thenReturn(createStorageContainer2);
        this.clusterMetadataStore.updateClusterAssignmentData(ClusterAssignmentData.newBuilder().putServers(NetUtils.endpointToString(this.myEndpoint), ServerAssignmentData.newBuilder().addContainers(11L).build()).build());
        completableFuture.complete(createStorageContainer);
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).times(1))).startStorageContainer(ArgumentMatchers.eq(11L));
        MoreAsserts.assertUtil(obj -> {
            return this.scManager.getLiveContainers().size() >= 1;
        }, () -> {
            return null;
        });
        Assert.assertEquals(1L, this.scManager.getLiveContainers().size());
        Assert.assertTrue(this.scManager.getLiveContainers().containsKey(11L));
        this.clusterMetadataStore.updateClusterAssignmentData(ClusterAssignmentData.newBuilder().putServers(NetUtils.endpointToString(this.myEndpoint), ServerAssignmentData.newBuilder().addContainers(22L).build()).build());
        FutureUtils.complete(completableFuture2, (Object) null);
        completableFuture3.complete(createStorageContainer2);
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).times(1))).stopStorageContainer(ArgumentMatchers.eq(11L), (StorageContainer) ArgumentMatchers.same(createStorageContainer));
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).times(1))).startStorageContainer(ArgumentMatchers.eq(22L));
        MoreAsserts.assertUtil(obj2 -> {
            return !this.scManager.getLiveContainers().containsKey(Long.valueOf(j)) && this.scManager.getLiveContainers().containsKey(Long.valueOf(j2));
        }, () -> {
            return null;
        });
        Assert.assertEquals(1L, this.scManager.getLiveContainers().size());
        Assert.assertFalse(this.scManager.getLiveContainers().containsKey(11L));
        Assert.assertTrue(this.scManager.getLiveContainers().containsKey(22L));
    }

    @Test
    public void testShutdownPendingStartStorageContainer() throws Exception {
        this.scManager.start();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        StorageContainer createStorageContainer = createStorageContainer(11L, completableFuture, completableFuture2);
        Mockito.when(this.mockScFactory.createStorageContainer(ArgumentMatchers.eq(11L))).thenReturn(createStorageContainer);
        this.clusterMetadataStore.updateClusterAssignmentData(ClusterAssignmentData.newBuilder().putServers(NetUtils.endpointToString(this.myEndpoint), ServerAssignmentData.newBuilder().addContainers(11L).build()).build());
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).times(1))).startStorageContainer(ArgumentMatchers.eq(11L));
        Assert.assertEquals(0L, this.scManager.getLiveContainers().size());
        Assert.assertEquals(1L, this.scManager.getPendingStartStopContainers().size());
        Assert.assertTrue(this.scManager.getPendingStartStopContainers().contains(11L));
        this.clusterMetadataStore.updateClusterAssignmentData(ClusterAssignmentData.newBuilder().build());
        Thread.sleep(200L);
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).times(0))).stopStorageContainer(ArgumentMatchers.eq(11L), (StorageContainer) ArgumentMatchers.same(createStorageContainer));
        Assert.assertEquals(1L, this.scManager.getPendingStartStopContainers().size());
        Assert.assertTrue(this.scManager.getPendingStartStopContainers().contains(11L));
        FutureUtils.complete(completableFuture, createStorageContainer);
        FutureUtils.complete(completableFuture2, (Object) null);
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).times(1))).stopStorageContainer(ArgumentMatchers.eq(11L), (StorageContainer) ArgumentMatchers.same(createStorageContainer));
        MoreAsserts.assertUtil(obj -> {
            return this.scManager.getPendingStartStopContainers().size() == 0;
        }, () -> {
            return null;
        });
        Assert.assertEquals(0L, this.scManager.getLiveContainers().size());
        Assert.assertEquals(0L, this.scManager.getPendingStartStopContainers().size());
    }

    @Test
    public void testStartContainerOnFailures() throws Exception {
        this.scManager.close();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        CompletableFuture completableFuture = new CompletableFuture();
        StorageContainer createStorageContainer = createStorageContainer(11L, completableFuture, FutureUtils.Void());
        this.mockScFactory = j -> {
            return atomicBoolean.get() ? createStorageContainer : createStorageContainer(j, FutureUtils.exception(new Exception("Failed to start")), FutureUtils.Void());
        };
        this.scRegistry = (StorageContainerRegistry) Mockito.spy(new StorageContainerRegistryImpl(this.mockScFactory));
        this.scManager = new ZkStorageContainerManager(this.myEndpoint, new StorageConfiguration(new CompositeConfiguration()).setClusterControllerScheduleInterval(1L, TimeUnit.SECONDS), this.clusterMetadataStore, this.scRegistry, NullStatsLogger.INSTANCE);
        this.scManager.start();
        this.clusterMetadataStore.updateClusterAssignmentData(ClusterAssignmentData.newBuilder().putServers(NetUtils.endpointToString(this.myEndpoint), ServerAssignmentData.newBuilder().addContainers(11L).build()).build());
        ((StorageContainerRegistry) Mockito.verify(this.scRegistry, Mockito.timeout(10000L).atLeastOnce())).startStorageContainer(ArgumentMatchers.eq(11L));
        Assert.assertEquals(0L, this.scManager.getLiveContainers().size());
        atomicBoolean.set(true);
        FutureUtils.complete(completableFuture, createStorageContainer);
        MoreAsserts.assertUtil(obj -> {
            return this.scManager.getLiveContainers().size() >= 1;
        }, () -> {
            return null;
        });
        Assert.assertEquals(1L, this.scManager.getLiveContainers().size());
        Assert.assertTrue(this.scManager.getLiveContainers().containsKey(11L));
    }
}
