package org.apache.distributedlog.client.proxy;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.finagle.stats.NullStatsReceiver;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.distributedlog.client.ClientConfig;
import org.apache.distributedlog.client.proxy.MockDistributedLogServices;
import org.apache.distributedlog.client.proxy.MockProxyClientBuilder;
import org.apache.distributedlog.client.proxy.ProxyClient;
import org.apache.distributedlog.client.resolver.DefaultRegionResolver;
import org.apache.distributedlog.client.stats.ClientStats;
import org.apache.distributedlog.thrift.service.ServerInfo;
import org.jboss.netty.util.HashedWheelTimer;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/distributedlog/client/proxy/TestProxyClientManager.class */
public class TestProxyClientManager {

    @Rule
    public TestName runtime = new TestName();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/distributedlog/client/proxy/TestProxyClientManager$TestHostProvider.class */
    public static class TestHostProvider implements HostProvider {
        Set<SocketAddress> hosts = new HashSet();

        TestHostProvider() {
        }

        synchronized void addHost(SocketAddress socketAddress) {
            this.hosts.add(socketAddress);
        }

        public synchronized Set<SocketAddress> getHosts() {
            return ImmutableSet.copyOf(this.hosts);
        }
    }

    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, long j) {
        return createProxyClientManager(builder, new TestHostProvider(), j);
    }

    private static ProxyClientManager createProxyClientManager(ProxyClient.Builder builder, HostProvider hostProvider, long j) {
        ClientConfig clientConfig = new ClientConfig();
        clientConfig.setPeriodicHandshakeIntervalMs(j);
        clientConfig.setPeriodicOwnershipSyncIntervalMs(-1L);
        return new ProxyClientManager(clientConfig, builder, new HashedWheelTimer(new ThreadFactoryBuilder().setNameFormat("TestProxyClientManager-timer-%d").build(), clientConfig.getRedirectBackoffStartMs(), TimeUnit.MILLISECONDS), hostProvider, new ClientStats(NullStatsReceiver.get(), false, new DefaultRegionResolver()));
    }

    private static SocketAddress createSocketAddress(int i) {
        return new InetSocketAddress("127.0.0.1", i);
    }

    private static MockProxyClientBuilder.MockProxyClient createMockProxyClient(SocketAddress socketAddress) {
        return new MockProxyClientBuilder.MockProxyClient(socketAddress, new MockDistributedLogServices.MockBasicService());
    }

    private static Pair<MockProxyClientBuilder.MockProxyClient, MockDistributedLogServices.MockServerInfoService> createMockProxyClient(SocketAddress socketAddress, ServerInfo serverInfo) {
        MockDistributedLogServices.MockServerInfoService mockServerInfoService = new MockDistributedLogServices.MockServerInfoService();
        MockProxyClientBuilder.MockProxyClient mockProxyClient = new MockProxyClientBuilder.MockProxyClient(socketAddress, mockServerInfoService);
        mockServerInfoService.updateServerInfo(serverInfo);
        return Pair.of(mockProxyClient, mockServerInfoService);
    }

    @Test(timeout = 60000)
    public void testBasicCreateRemove() throws Exception {
        SocketAddress createSocketAddress = createSocketAddress(1000);
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        MockProxyClientBuilder.MockProxyClient createMockProxyClient = createMockProxyClient(createSocketAddress);
        mockProxyClientBuilder.provideProxyClient(createSocketAddress, createMockProxyClient);
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, 0L);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        ProxyClient createClient = createProxyClientManager.createClient(createSocketAddress);
        Assert.assertEquals("Create client should build the proxy client", 1L, createProxyClientManager.getNumProxies());
        Assert.assertTrue("The client returned should be the same client that builder built", createMockProxyClient == createClient);
    }

    @Test(timeout = 60000)
    public void testGetShouldCreateClient() throws Exception {
        SocketAddress createSocketAddress = createSocketAddress(2000);
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        MockProxyClientBuilder.MockProxyClient createMockProxyClient = createMockProxyClient(createSocketAddress);
        mockProxyClientBuilder.provideProxyClient(createSocketAddress, createMockProxyClient);
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, 0L);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        ProxyClient client = createProxyClientManager.getClient(createSocketAddress);
        Assert.assertEquals("Get client should build the proxy client", 1L, createProxyClientManager.getNumProxies());
        Assert.assertTrue("The client returned should be the same client that builder built", createMockProxyClient == client);
    }

    @Test(timeout = 60000)
    public void testConditionalRemoveClient() throws Exception {
        SocketAddress createSocketAddress = createSocketAddress(3000);
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        MockProxyClientBuilder.MockProxyClient createMockProxyClient = createMockProxyClient(createSocketAddress);
        MockProxyClientBuilder.MockProxyClient createMockProxyClient2 = createMockProxyClient(createSocketAddress);
        mockProxyClientBuilder.provideProxyClient(createSocketAddress, createMockProxyClient);
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, 0L);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        createProxyClientManager.createClient(createSocketAddress);
        Assert.assertEquals("Create client should build the proxy client", 1L, createProxyClientManager.getNumProxies());
        createProxyClientManager.removeClient(createSocketAddress, createMockProxyClient2);
        Assert.assertEquals("Conditional remove should not remove proxy client", 1L, createProxyClientManager.getNumProxies());
        createProxyClientManager.removeClient(createSocketAddress, createMockProxyClient);
        Assert.assertEquals("Conditional remove should remove proxy client", 0L, createProxyClientManager.getNumProxies());
    }

    @Test(timeout = 60000)
    public void testRemoveClient() throws Exception {
        SocketAddress createSocketAddress = createSocketAddress(3000);
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        mockProxyClientBuilder.provideProxyClient(createSocketAddress, createMockProxyClient(createSocketAddress));
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, 0L);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        createProxyClientManager.createClient(createSocketAddress);
        Assert.assertEquals("Create client should build the proxy client", 1L, createProxyClientManager.getNumProxies());
        createProxyClientManager.removeClient(createSocketAddress);
        Assert.assertEquals("Remove should remove proxy client", 0L, createProxyClientManager.getNumProxies());
    }

    @Test(timeout = 60000)
    public void testCreateClientShouldHandshake() throws Exception {
        SocketAddress createSocketAddress = createSocketAddress(3000);
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        ServerInfo serverInfo = new ServerInfo();
        serverInfo.putToOwnerships(this.runtime.getMethodName() + "_stream", this.runtime.getMethodName() + "_owner");
        mockProxyClientBuilder.provideProxyClient(createSocketAddress, (MockProxyClientBuilder.MockProxyClient) createMockProxyClient(createSocketAddress, serverInfo).getLeft());
        final AtomicReference atomicReference = new AtomicReference(null);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        ProxyListener proxyListener = new ProxyListener() { // from class: org.apache.distributedlog.client.proxy.TestProxyClientManager.1
            public void onHandshakeSuccess(SocketAddress socketAddress, ProxyClient proxyClient, ServerInfo serverInfo2) {
                atomicReference.set(serverInfo2);
                countDownLatch.countDown();
            }

            public void onHandshakeFailure(SocketAddress socketAddress, ProxyClient proxyClient, Throwable th) {
            }
        };
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, 0L);
        createProxyClientManager.registerProxyListener(proxyListener);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        createProxyClientManager.createClient(createSocketAddress);
        Assert.assertEquals("Create client should build the proxy client", 1L, createProxyClientManager.getNumProxies());
        countDownLatch.await();
        Assert.assertEquals("Handshake should return server info", serverInfo, atomicReference.get());
    }

    @Test(timeout = 60000)
    public void testHandshake() throws Exception {
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        HashMap hashMap = new HashMap();
        for (int i = 0; i < 3; i++) {
            SocketAddress createSocketAddress = createSocketAddress(4000 + i);
            ServerInfo serverInfo = new ServerInfo();
            for (int i2 = 0; i2 < 3; i2++) {
                serverInfo.putToOwnerships(this.runtime.getMethodName() + "_stream_" + i2, createSocketAddress.toString());
            }
            mockProxyClientBuilder.provideProxyClient(createSocketAddress, (MockProxyClientBuilder.MockProxyClient) createMockProxyClient(createSocketAddress, serverInfo).getLeft());
            hashMap.put(createSocketAddress, serverInfo);
        }
        final HashMap hashMap2 = new HashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(6);
        ProxyListener proxyListener = new ProxyListener() { // from class: org.apache.distributedlog.client.proxy.TestProxyClientManager.2
            public void onHandshakeSuccess(SocketAddress socketAddress, ProxyClient proxyClient, ServerInfo serverInfo2) {
                synchronized (hashMap2) {
                    hashMap2.put(socketAddress, serverInfo2);
                }
                countDownLatch.countDown();
            }

            public void onHandshakeFailure(SocketAddress socketAddress, ProxyClient proxyClient, Throwable th) {
            }
        };
        TestHostProvider testHostProvider = new TestHostProvider();
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, testHostProvider, 0L);
        createProxyClientManager.registerProxyListener(proxyListener);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        for (int i3 = 0; i3 < 3; i3++) {
            testHostProvider.addHost(createSocketAddress(4000 + i3));
        }
        createProxyClientManager.handshake();
        countDownLatch.await();
        Assert.assertEquals("Handshake should return server info", 3L, hashMap2.size());
        Assert.assertTrue("Handshake should get all server infos", Maps.difference(hashMap, hashMap2).areEqual());
    }

    @Test(timeout = 60000)
    public void testPeriodicHandshake() throws Exception {
        MockProxyClientBuilder mockProxyClientBuilder = new MockProxyClientBuilder();
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        final HashMap hashMap3 = new HashMap();
        for (int i = 0; i < 3; i++) {
            SocketAddress createSocketAddress = createSocketAddress(5000 + i);
            ServerInfo serverInfo = new ServerInfo();
            for (int i2 = 0; i2 < 3; i2++) {
                serverInfo.putToOwnerships(this.runtime.getMethodName() + "_stream_" + i2, createSocketAddress.toString());
            }
            Pair<MockProxyClientBuilder.MockProxyClient, MockDistributedLogServices.MockServerInfoService> createMockProxyClient = createMockProxyClient(createSocketAddress, serverInfo);
            mockProxyClientBuilder.provideProxyClient(createSocketAddress, (MockProxyClientBuilder.MockProxyClient) createMockProxyClient.getLeft());
            hashMap.put(createSocketAddress, serverInfo);
            hashMap2.put(createSocketAddress, createMockProxyClient.getRight());
            hashMap3.put(createSocketAddress, new CountDownLatch(2));
        }
        final HashMap hashMap4 = new HashMap();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        ProxyListener proxyListener = new ProxyListener() { // from class: org.apache.distributedlog.client.proxy.TestProxyClientManager.3
            public void onHandshakeSuccess(SocketAddress socketAddress, ProxyClient proxyClient, ServerInfo serverInfo2) {
                synchronized (hashMap4) {
                    hashMap4.put(socketAddress, serverInfo2);
                    CountDownLatch countDownLatch2 = (CountDownLatch) hashMap3.get(socketAddress);
                    if (null != countDownLatch2) {
                        countDownLatch2.countDown();
                    }
                }
                countDownLatch.countDown();
            }

            public void onHandshakeFailure(SocketAddress socketAddress, ProxyClient proxyClient, Throwable th) {
            }
        };
        TestHostProvider testHostProvider = new TestHostProvider();
        ProxyClientManager createProxyClientManager = createProxyClientManager(mockProxyClientBuilder, testHostProvider, 50L);
        createProxyClientManager.setPeriodicHandshakeEnabled(false);
        createProxyClientManager.registerProxyListener(proxyListener);
        Assert.assertEquals("There should be no clients in the manager", 0L, createProxyClientManager.getNumProxies());
        for (int i3 = 0; i3 < 3; i3++) {
            SocketAddress createSocketAddress2 = createSocketAddress(5000 + i3);
            testHostProvider.addHost(createSocketAddress2);
            createProxyClientManager.createClient(createSocketAddress2);
        }
        countDownLatch.await();
        Assert.assertEquals("Handshake should return server info", 3L, hashMap4.size());
        Assert.assertTrue("Handshake should get all server infos", Maps.difference(hashMap, hashMap4).areEqual());
        for (int i4 = 0; i4 < 3; i4++) {
            SocketAddress createSocketAddress3 = createSocketAddress(5000 + i4);
            ServerInfo serverInfo2 = new ServerInfo();
            for (int i5 = 0; i5 < 3; i5++) {
                serverInfo2.putToOwnerships(this.runtime.getMethodName() + "_new_stream_" + i5, createSocketAddress3.toString());
            }
            MockDistributedLogServices.MockServerInfoService mockServerInfoService = (MockDistributedLogServices.MockServerInfoService) hashMap2.get(createSocketAddress3);
            hashMap.put(createSocketAddress3, serverInfo2);
            mockServerInfoService.updateServerInfo(serverInfo2);
        }
        createProxyClientManager.setPeriodicHandshakeEnabled(true);
        for (int i6 = 0; i6 < 3; i6++) {
            ((CountDownLatch) hashMap3.get(createSocketAddress(5000 + i6))).await();
        }
        Assert.assertTrue("Periodic handshake should update all server infos", Maps.difference(hashMap, hashMap4).areEqual());
    }
}
