package org.apache.bookkeeper.stream.storage.impl;

import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ClientInterceptor;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.bookkeeper.clients.impl.container.StorageContainerClientInterceptor;
import org.apache.bookkeeper.clients.impl.internal.api.StorageServerClientManager;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.common.grpc.proxy.ProxyHandlerRegistry;
import org.apache.bookkeeper.common.util.ListenableFutures;
import org.apache.bookkeeper.statelib.api.mvcc.MVCCAsyncStore;
import org.apache.bookkeeper.stats.NullStatsLogger;
import org.apache.bookkeeper.stream.proto.NamespaceConfiguration;
import org.apache.bookkeeper.stream.proto.StreamName;
import org.apache.bookkeeper.stream.proto.StreamProperties;
import org.apache.bookkeeper.stream.proto.common.Endpoint;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.DeleteRangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.PutResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeRequest;
import org.apache.bookkeeper.stream.proto.kv.rpc.RangeResponse;
import org.apache.bookkeeper.stream.proto.kv.rpc.ResponseHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.RoutingHeader;
import org.apache.bookkeeper.stream.proto.kv.rpc.TableServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.CreateStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.DeleteStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesRequest;
import org.apache.bookkeeper.stream.proto.storage.GetActiveRangesResponse;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceRequest;
import org.apache.bookkeeper.stream.proto.storage.GetNamespaceResponse;
import org.apache.bookkeeper.stream.proto.storage.GetStreamRequest;
import org.apache.bookkeeper.stream.proto.storage.GetStreamResponse;
import org.apache.bookkeeper.stream.proto.storage.MetaRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.RootRangeServiceGrpc;
import org.apache.bookkeeper.stream.proto.storage.StatusCode;
import org.apache.bookkeeper.stream.protocol.ProtocolConstants;
import org.apache.bookkeeper.stream.protocol.util.ProtoUtils;
import org.apache.bookkeeper.stream.storage.StorageResources;
import org.apache.bookkeeper.stream.storage.api.metadata.RangeStoreService;
import org.apache.bookkeeper.stream.storage.api.service.RangeStoreServiceFactory;
import org.apache.bookkeeper.stream.storage.conf.StorageConfiguration;
import org.apache.bookkeeper.stream.storage.impl.grpc.GrpcServices;
import org.apache.bookkeeper.stream.storage.impl.sc.LocalStorageContainerManager;
import org.apache.bookkeeper.stream.storage.impl.service.RangeStoreContainerServiceFactoryImpl;
import org.apache.bookkeeper.stream.storage.impl.store.MVCCStoreFactory;
import org.apache.commons.configuration.CompositeConfiguration;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/bookkeeper/stream/storage/impl/TestStorageContainerStoreImpl.class */
public class TestStorageContainerStoreImpl {
    private final CompositeConfiguration compConf = new CompositeConfiguration();
    private final StorageConfiguration storageConf = new StorageConfiguration(this.compConf);
    private final NamespaceConfiguration namespaceConf = NamespaceConfiguration.newBuilder().setDefaultStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build();
    private final StorageResources resources = StorageResources.create();
    private RangeStoreService mockRangeStoreService;
    private StorageContainerStoreImpl rangeStore;
    private Server server;
    private Channel channel;
    private TableServiceGrpc.TableServiceFutureStub tableService;
    private RootRangeServiceGrpc.RootRangeServiceFutureStub rootRangeService;
    private MetaRangeServiceGrpc.MetaRangeServiceFutureStub metaRangeService;
    private long scId;
    private static final Logger log = LoggerFactory.getLogger(TestStorageContainerStoreImpl.class);
    private static final StreamProperties streamProps = StreamProperties.newBuilder().setStorageContainerId(System.currentTimeMillis()).setStreamId(System.currentTimeMillis()).setStreamName("test-create-add-stream-request").setStreamConf(ProtocolConstants.DEFAULT_STREAM_CONF).build();
    private static final ByteString TEST_ROUTING_KEY = ByteString.copyFromUtf8("test-routing-key");
    private static final ByteString TEST_KEY = ByteString.copyFromUtf8("test-key");
    private static final ByteString TEST_VAL = ByteString.copyFromUtf8("test-val");
    private static final RoutingHeader TEST_ROUTING_HEADER = RoutingHeader.newBuilder().setRKey(TEST_ROUTING_KEY).setStreamId(1234).setRangeId(1256).build();
    private static final ResponseHeader TEST_RESP_HEADER = ResponseHeader.newBuilder().setRoutingHeader(TEST_ROUTING_HEADER).build();

    private static StreamName createStreamName(String str) {
        return StreamName.newBuilder().setNamespaceName(str + "_col").setStreamName(str + "_stream").build();
    }

    private static Endpoint createEndpoint(String str, int i) {
        return Endpoint.newBuilder().setHostname(str).setPort(i).build();
    }

    private static PutRequest createPutRequest() {
        return PutRequest.newBuilder().setHeader(TEST_ROUTING_HEADER).setKey(TEST_KEY).setValue(TEST_VAL).build();
    }

    private static PutResponse createPutResponse(StatusCode statusCode) {
        return PutResponse.newBuilder().setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER).setCode(statusCode).build()).build();
    }

    private static RangeRequest createRangeRequest() {
        return RangeRequest.newBuilder().setHeader(TEST_ROUTING_HEADER).setKey(TEST_KEY).build();
    }

    private static RangeResponse createRangeResponse(StatusCode statusCode) {
        return RangeResponse.newBuilder().setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER).setCode(statusCode).build()).setCount(0L).build();
    }

    private static DeleteRangeRequest createDeleteRequest() {
        return DeleteRangeRequest.newBuilder().setHeader(TEST_ROUTING_HEADER).setKey(TEST_KEY).build();
    }

    private static DeleteRangeResponse createDeleteResponse(StatusCode statusCode) {
        return DeleteRangeResponse.newBuilder().setHeader(ResponseHeader.newBuilder(TEST_RESP_HEADER).setCode(statusCode).build()).setDeleted(0L).build();
    }

    @Before
    public void setUp() throws Exception {
        Endpoint createEndpoint = createEndpoint("127.0.0.1", 0);
        MVCCStoreFactory mVCCStoreFactory = (MVCCStoreFactory) Mockito.mock(MVCCStoreFactory.class);
        Mockito.when(mVCCStoreFactory.openStore(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong(), ArgumentMatchers.anyInt())).thenReturn(FutureUtils.value((MVCCAsyncStore) Mockito.mock(MVCCAsyncStore.class)));
        Mockito.when(mVCCStoreFactory.closeStores(ArgumentMatchers.anyLong())).thenReturn(FutureUtils.Void());
        RangeStoreServiceFactory rangeStoreServiceFactory = (RangeStoreServiceFactory) Mockito.mock(RangeStoreServiceFactory.class);
        this.mockRangeStoreService = (RangeStoreService) Mockito.mock(RangeStoreService.class);
        Mockito.when(this.mockRangeStoreService.start()).thenReturn(FutureUtils.Void());
        Mockito.when(this.mockRangeStoreService.stop()).thenReturn(FutureUtils.Void());
        Mockito.when(rangeStoreServiceFactory.createService(ArgumentMatchers.anyLong())).thenReturn(this.mockRangeStoreService);
        this.rangeStore = new StorageContainerStoreImpl(this.storageConf, (storageConfiguration, storageContainerRegistry) -> {
            return new LocalStorageContainerManager(createEndpoint, storageConfiguration, storageContainerRegistry, 2);
        }, new RangeStoreContainerServiceFactoryImpl(rangeStoreServiceFactory), (StorageServerClientManager) null, NullStatsLogger.INSTANCE);
        this.rangeStore.start();
        Collection create = GrpcServices.create((RangeStoreService) null);
        ProxyHandlerRegistry.Builder newBuilder = ProxyHandlerRegistry.newBuilder();
        create.forEach(serverServiceDefinition -> {
            newBuilder.addService(serverServiceDefinition);
        });
        this.server = InProcessServerBuilder.forName("test-server").fallbackHandlerRegistry(newBuilder.setChannelFinder(this.rangeStore).build()).directExecutor().build().start();
        this.channel = InProcessChannelBuilder.forName("test-server").usePlaintext().build();
        this.scId = ThreadLocalRandom.current().nextInt(2);
        this.channel = ClientInterceptors.intercept(this.channel, new ClientInterceptor[]{new StorageContainerClientInterceptor(this.scId)});
        this.tableService = TableServiceGrpc.newFutureStub(this.channel);
        this.metaRangeService = MetaRangeServiceGrpc.newFutureStub(this.channel);
        this.rootRangeService = RootRangeServiceGrpc.newFutureStub(this.channel);
    }

    @After
    public void tearDown() {
        if (null != this.channel && (this.channel instanceof ManagedChannel)) {
            this.channel.shutdown();
        }
        if (null != this.server) {
            this.server.shutdown();
        }
        if (null != this.rangeStore) {
            this.rangeStore.close();
        }
    }

    private <T> void verifyNotFoundException(CompletableFuture<T> completableFuture, Status status) throws InterruptedException {
        try {
            completableFuture.get();
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof StatusRuntimeException);
            Assert.assertEquals(status, e.getCause().getStatus());
        }
    }

    @Test
    public void testCreateNamespaceNoRootStorageContainerStore() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.rootRangeService.createNamespace(ProtoUtils.createCreateNamespaceRequest("test-create-namespace-no-root-storage-container-store", this.namespaceConf))), Status.NOT_FOUND);
    }

    @Test
    public void testDeleteNamespaceNoRootStorageContainerStore() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.rootRangeService.deleteNamespace(ProtoUtils.createDeleteNamespaceRequest("test-delete-namespace-no-root-storage-container-store"))), Status.NOT_FOUND);
    }

    @Test
    public void testGetNamespaceNoRootStorageContainerStore() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.rootRangeService.getNamespace(ProtoUtils.createGetNamespaceRequest("test-get-namespace-no-root-storage-container-store"))), Status.NOT_FOUND);
    }

    @Test
    public void testCreateNamespaceMockRootStorageContainerStore() throws Exception {
        CreateNamespaceResponse build = CreateNamespaceResponse.newBuilder().setCode(StatusCode.NAMESPACE_EXISTS).build();
        CreateNamespaceRequest createCreateNamespaceRequest = ProtoUtils.createCreateNamespaceRequest("test-create-namespace-mock-root-storage-container-store", this.namespaceConf);
        Mockito.when(this.mockRangeStoreService.createNamespace(createCreateNamespaceRequest)).thenReturn(CompletableFuture.completedFuture(build));
        Assert.assertTrue(build == ListenableFutures.fromListenableFuture(this.rootRangeService.createNamespace(createCreateNamespaceRequest)).get());
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).createNamespace(createCreateNamespaceRequest);
    }

    @Test
    public void testDeleteNamespaceMockRootStorageContainerStore() throws Exception {
        DeleteNamespaceResponse build = DeleteNamespaceResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build();
        DeleteNamespaceRequest createDeleteNamespaceRequest = ProtoUtils.createDeleteNamespaceRequest("test-delete-namespace-no-root-storage-container-store");
        Mockito.when(this.mockRangeStoreService.deleteNamespace(createDeleteNamespaceRequest)).thenReturn(CompletableFuture.completedFuture(build));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.rootRangeService.deleteNamespace(createDeleteNamespaceRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).deleteNamespace(createDeleteNamespaceRequest);
        Assert.assertTrue(build == fromListenableFuture.get());
    }

    @Test
    public void testGetNamespaceMockRootStorageContainerStore() throws Exception {
        GetNamespaceResponse build = GetNamespaceResponse.newBuilder().setCode(StatusCode.NAMESPACE_NOT_FOUND).build();
        GetNamespaceRequest createGetNamespaceRequest = ProtoUtils.createGetNamespaceRequest("test-get-namespace-no-root-storage-container-store");
        Mockito.when(this.mockRangeStoreService.getNamespace(createGetNamespaceRequest)).thenReturn(CompletableFuture.completedFuture(build));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.rootRangeService.getNamespace(createGetNamespaceRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).getNamespace(createGetNamespaceRequest);
        Assert.assertTrue(build == fromListenableFuture.get());
    }

    @Test
    public void testCreateStreamNoRootStorageContainerStore() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.rootRangeService.createStream(ProtoUtils.createCreateStreamRequest("test-create-namespace-no-root-storage-container-store", "test-create-namespace-no-root-storage-container-store", ProtocolConstants.DEFAULT_STREAM_CONF))), Status.NOT_FOUND);
    }

    @Test
    public void testDeleteStreamNoRootStorageContainerStore() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.rootRangeService.deleteStream(ProtoUtils.createDeleteStreamRequest("test-delete-namespace-no-root-storage-container-store", "test-delete-namespace-no-root-storage-container-store"))), Status.NOT_FOUND);
    }

    @Test
    public void testGetStreamNoRootStorageContainerStore() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.rootRangeService.getStream(ProtoUtils.createGetStreamRequest("test-get-namespace-no-root-storage-container-store", "test-get-namespace-no-root-storage-container-store"))), Status.NOT_FOUND);
    }

    @Test
    public void testCreateStreamMockRootStorageContainerStore() throws Exception {
        CreateStreamResponse build = CreateStreamResponse.newBuilder().setCode(StatusCode.STREAM_EXISTS).build();
        CreateStreamRequest createCreateStreamRequest = ProtoUtils.createCreateStreamRequest("test-create-namespace-mock-root-storage-container-store", "test-create-namespace-mock-root-storage-container-store", ProtocolConstants.DEFAULT_STREAM_CONF);
        Mockito.when(this.mockRangeStoreService.createStream(createCreateStreamRequest)).thenReturn(CompletableFuture.completedFuture(build));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.rootRangeService.createStream(createCreateStreamRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).createStream(createCreateStreamRequest);
        Assert.assertTrue(build == fromListenableFuture.get());
    }

    @Test
    public void testDeleteStreamMockRootStorageContainerStore() throws Exception {
        DeleteStreamResponse build = DeleteStreamResponse.newBuilder().setCode(StatusCode.STREAM_NOT_FOUND).build();
        DeleteStreamRequest createDeleteStreamRequest = ProtoUtils.createDeleteStreamRequest("test-delete-namespace-no-root-storage-container-store", "test-delete-namespace-no-root-storage-container-store");
        Mockito.when(this.mockRangeStoreService.deleteStream(createDeleteStreamRequest)).thenReturn(CompletableFuture.completedFuture(build));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.rootRangeService.deleteStream(createDeleteStreamRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).deleteStream(createDeleteStreamRequest);
        Assert.assertTrue(build == fromListenableFuture.get());
    }

    @Test
    public void testGetStreamMockRootStorageContainerStore() throws Exception {
        GetStreamResponse build = GetStreamResponse.newBuilder().setCode(StatusCode.STREAM_NOT_FOUND).build();
        GetStreamRequest createGetStreamRequest = ProtoUtils.createGetStreamRequest("test-get-namespace-no-root-storage-container-store", "test-get-namespace-no-root-storage-container-store");
        Mockito.when(this.mockRangeStoreService.getStream(createGetStreamRequest)).thenReturn(CompletableFuture.completedFuture(build));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.rootRangeService.getStream(createGetStreamRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).getStream(createGetStreamRequest);
        Assert.assertTrue(build == fromListenableFuture.get());
    }

    @Test
    public void testGetActiveRangesNoManager() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.metaRangeService.getActiveRanges(ProtoUtils.createGetActiveRangesRequest(34L))), Status.NOT_FOUND);
    }

    @Test
    public void testGetActiveRangesMockManager() throws Exception {
        GetActiveRangesResponse build = GetActiveRangesResponse.newBuilder().setCode(StatusCode.STREAM_NOT_FOUND).build();
        GetActiveRangesRequest createGetActiveRangesRequest = ProtoUtils.createGetActiveRangesRequest(34L);
        Mockito.when(this.mockRangeStoreService.getActiveRanges(createGetActiveRangesRequest)).thenReturn(CompletableFuture.completedFuture(build));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.metaRangeService.getActiveRanges(createGetActiveRangesRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).getActiveRanges(createGetActiveRangesRequest);
        Assert.assertTrue(build == fromListenableFuture.get());
    }

    @Test
    public void testPutNoStorageContainer() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.tableService.put(createPutRequest())), Status.NOT_FOUND);
    }

    @Test
    public void testDeleteNoStorageContainer() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.tableService.delete(createDeleteRequest())), Status.NOT_FOUND);
    }

    @Test
    public void testRangeNoStorageContainer() throws Exception {
        this.rangeStore.getRegistry().stopStorageContainer(this.scId).join();
        verifyNotFoundException(ListenableFutures.fromListenableFuture(this.tableService.range(createRangeRequest())), Status.NOT_FOUND);
    }

    @Test
    public void testRangeMockStorageContainer() throws Exception {
        RangeResponse createRangeResponse = createRangeResponse(StatusCode.SUCCESS);
        RangeRequest createRangeRequest = createRangeRequest();
        Mockito.when(this.mockRangeStoreService.range(createRangeRequest)).thenReturn(CompletableFuture.completedFuture(createRangeResponse));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.tableService.range(createRangeRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).range((RangeRequest) ArgumentMatchers.eq(createRangeRequest));
        Assert.assertTrue(createRangeResponse == fromListenableFuture.get());
    }

    @Test
    public void testDeleteMockStorageContainer() throws Exception {
        DeleteRangeResponse createDeleteResponse = createDeleteResponse(StatusCode.SUCCESS);
        DeleteRangeRequest createDeleteRequest = createDeleteRequest();
        Mockito.when(this.mockRangeStoreService.delete(createDeleteRequest)).thenReturn(CompletableFuture.completedFuture(createDeleteResponse));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.tableService.delete(createDeleteRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).delete((DeleteRangeRequest) ArgumentMatchers.eq(createDeleteRequest));
        Assert.assertTrue(createDeleteResponse == fromListenableFuture.get());
    }

    @Test
    public void testPutMockStorageContainer() throws Exception {
        PutResponse createPutResponse = createPutResponse(StatusCode.SUCCESS);
        PutRequest createPutRequest = createPutRequest();
        Mockito.when(this.mockRangeStoreService.put(createPutRequest)).thenReturn(CompletableFuture.completedFuture(createPutResponse));
        CompletableFuture fromListenableFuture = ListenableFutures.fromListenableFuture(this.tableService.put(createPutRequest));
        ((RangeStoreService) Mockito.verify(this.mockRangeStoreService, Mockito.times(1))).put((PutRequest) ArgumentMatchers.eq(createPutRequest));
        Assert.assertTrue(createPutResponse == fromListenableFuture.get());
    }
}
