/*
 * Decompiled with CFR 0.152.
 */
package org.apache.bookkeeper.clients.utils;

import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.clients.impl.channel.StorageServerChannel;
import org.apache.bookkeeper.clients.impl.container.StorageContainerChannel;
import org.apache.bookkeeper.clients.utils.ClientConstants;
import org.apache.bookkeeper.clients.utils.ListenableFutureRpcProcessor;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ListenableFutureRpcProcessorTest {
    private ListenableFutureRpcProcessor<String, String, String> processor;
    private StorageContainerChannel scChannel;
    private ScheduledExecutorService executor;

    @Before
    public void setup() {
        this.executor = Executors.newSingleThreadScheduledExecutor();
        this.scChannel = (StorageContainerChannel)Mockito.mock(StorageContainerChannel.class);
        this.processor = (ListenableFutureRpcProcessor)Mockito.spy((Object)new ListenableFutureRpcProcessor<String, String, String>(this.scChannel, this.executor, ClientConstants.DEFAULT_INFINIT_BACKOFF_POLICY){

            protected String createRequest() {
                return null;
            }

            protected ListenableFuture<String> sendRPC(StorageServerChannel rsChannel, String s) {
                return null;
            }

            protected String processResponse(String response) throws Exception {
                return null;
            }
        });
    }

    @Test
    public void testFailToConnect() {
        CompletableFuture serverFuture = new CompletableFuture();
        Mockito.when((Object)this.scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
        CompletableFuture resultFuture = this.processor.process();
        ((StorageContainerChannel)Mockito.verify((Object)this.scChannel, (VerificationMode)Mockito.times((int)1))).getStorageContainerChannelFuture();
        Exception testExc = new Exception("test-exception");
        serverFuture.completeExceptionally(testExc);
        try {
            FutureUtils.result((CompletableFuture)resultFuture);
            Assert.fail((String)"Should fail the process if failed to connect to storage server");
        }
        catch (Exception e) {
            Assert.assertSame((Object)testExc, (Object)e);
        }
    }

    @Test
    public void testProcessSuccessfully() throws Exception {
        String request = "request";
        String response = "response";
        String result = "result";
        StorageServerChannel serverChannel = (StorageServerChannel)Mockito.mock(StorageServerChannel.class);
        CompletableFuture serverFuture = new CompletableFuture();
        Mockito.when((Object)this.scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
        SettableFuture rpcFuture = SettableFuture.create();
        Mockito.when((Object)this.processor.createRequest()).thenReturn((Object)request);
        Mockito.when((Object)this.processor.sendRPC((StorageServerChannel)ArgumentMatchers.same((Object)serverChannel), ArgumentMatchers.eq((Object)request))).thenReturn((Object)rpcFuture);
        Mockito.when((Object)this.processor.processResponse(ArgumentMatchers.eq((Object)response))).thenReturn((Object)result);
        CompletableFuture resultFuture = this.processor.process();
        ((StorageContainerChannel)Mockito.verify((Object)this.scChannel, (VerificationMode)Mockito.times((int)1))).getStorageContainerChannelFuture();
        FutureUtils.complete(serverFuture, (Object)serverChannel);
        rpcFuture.set((Object)response);
        Assert.assertEquals((Object)result, resultFuture.get());
    }

    @Test
    public void testProcessResponseException() throws Exception {
        String request = "request";
        String response = "response";
        StorageServerChannel serverChannel = (StorageServerChannel)Mockito.mock(StorageServerChannel.class);
        CompletableFuture serverFuture = new CompletableFuture();
        Mockito.when((Object)this.scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
        SettableFuture rpcFuture = SettableFuture.create();
        Exception testException = new Exception("test-exception");
        Mockito.when((Object)this.processor.createRequest()).thenReturn((Object)request);
        Mockito.when((Object)this.processor.sendRPC((StorageServerChannel)ArgumentMatchers.same((Object)serverChannel), ArgumentMatchers.eq((Object)request))).thenReturn((Object)rpcFuture);
        Mockito.when((Object)this.processor.processResponse(ArgumentMatchers.eq((Object)response))).thenThrow(new Throwable[]{testException});
        CompletableFuture resultFuture = this.processor.process();
        ((StorageContainerChannel)Mockito.verify((Object)this.scChannel, (VerificationMode)Mockito.times((int)1))).getStorageContainerChannelFuture();
        FutureUtils.complete(serverFuture, (Object)serverChannel);
        rpcFuture.set((Object)response);
        try {
            FutureUtils.result((CompletableFuture)resultFuture);
            Assert.fail((String)"Should throw exception on processing result");
        }
        catch (Exception e) {
            Assert.assertSame((Object)testException, (Object)e);
        }
    }

    @Test
    public void testProcessRpcException() throws Exception {
        String request = "request";
        String response = "response";
        String result = "result";
        StorageServerChannel serverChannel = (StorageServerChannel)Mockito.mock(StorageServerChannel.class);
        CompletableFuture serverFuture = new CompletableFuture();
        Mockito.when((Object)this.scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
        SettableFuture rpcFuture = SettableFuture.create();
        Mockito.when((Object)this.processor.createRequest()).thenReturn((Object)request);
        Mockito.when((Object)this.processor.sendRPC((StorageServerChannel)ArgumentMatchers.same((Object)serverChannel), ArgumentMatchers.eq((Object)request))).thenReturn((Object)rpcFuture);
        Mockito.when((Object)this.processor.processResponse(ArgumentMatchers.eq((Object)response))).thenReturn((Object)result);
        CompletableFuture resultFuture = this.processor.process();
        ((StorageContainerChannel)Mockito.verify((Object)this.scChannel, (VerificationMode)Mockito.times((int)1))).getStorageContainerChannelFuture();
        FutureUtils.complete(serverFuture, (Object)serverChannel);
        rpcFuture.setException((Throwable)new StatusRuntimeException(Status.INTERNAL));
        try {
            FutureUtils.result((CompletableFuture)resultFuture);
            Assert.fail((String)"Should throw fail immediately if rpc request failed");
        }
        catch (Exception e) {
            Assert.assertTrue((boolean)(e instanceof StatusRuntimeException));
            StatusRuntimeException sre = (StatusRuntimeException)e;
            Assert.assertEquals((Object)Status.INTERNAL, (Object)sre.getStatus());
        }
    }

    @Test
    public void testProcessRetryNotFoundRpcException() throws Exception {
        String request = "request";
        String response = "response";
        String result = "result";
        StorageServerChannel serverChannel = (StorageServerChannel)Mockito.mock(StorageServerChannel.class);
        CompletableFuture serverFuture = new CompletableFuture();
        Mockito.when((Object)this.scChannel.getStorageContainerChannelFuture()).thenReturn(serverFuture);
        AtomicInteger numRpcs = new AtomicInteger(0);
        Mockito.when((Object)this.processor.createRequest()).thenReturn((Object)request);
        Mockito.when((Object)this.processor.processResponse(ArgumentMatchers.eq((Object)response))).thenReturn((Object)result);
        Mockito.when((Object)this.processor.sendRPC((StorageServerChannel)ArgumentMatchers.same((Object)serverChannel), ArgumentMatchers.eq((Object)request))).thenAnswer(invocationOnMock -> {
            SettableFuture rpcFuture = SettableFuture.create();
            if (numRpcs.getAndIncrement() > 2) {
                rpcFuture.set((Object)response);
            } else {
                rpcFuture.setException((Throwable)new StatusRuntimeException(Status.NOT_FOUND));
            }
            return rpcFuture;
        });
        CompletableFuture resultFuture = this.processor.process();
        FutureUtils.complete(serverFuture, (Object)serverChannel);
        Assert.assertEquals((Object)result, (Object)FutureUtils.result((CompletableFuture)resultFuture));
        ((StorageContainerChannel)Mockito.verify((Object)this.scChannel, (VerificationMode)Mockito.times((int)4))).getStorageContainerChannelFuture();
    }
}

