package com.twitter.distributedlog.client.routing;

import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.twitter.distributedlog.client.resolver.DefaultRegionResolver;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.client.routing.ServerSetWatcher;
import com.twitter.distributedlog.service.DLSocketAddress;
import com.twitter.finagle.Address;
import com.twitter.finagle.Addresses;
import com.twitter.finagle.ChannelWriteException;
import com.twitter.finagle.NoBrokersAvailableException;
import com.twitter.finagle.stats.NullStatsReceiver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService.class */
public class TestConsistentHashRoutingService {

    /* loaded from: input_file:com/twitter/distributedlog/client/routing/TestConsistentHashRoutingService$TestServerSetWatcher.class */
    private static class TestServerSetWatcher implements ServerSetWatcher {
        final LinkedBlockingQueue<ImmutableSet<DLSocketAddress>> changeQueue;
        final CopyOnWriteArrayList<ServerSetWatcher.ServerSetMonitor> monitors;

        private TestServerSetWatcher() {
            this.changeQueue = new LinkedBlockingQueue<>();
            this.monitors = new CopyOnWriteArrayList<>();
        }

        public void watch(ServerSetWatcher.ServerSetMonitor serverSetMonitor) throws ServerSetWatcher.MonitorException {
            this.monitors.add(serverSetMonitor);
            while (true) {
                ImmutableSet<DLSocketAddress> poll = this.changeQueue.poll();
                if (poll == null) {
                    return;
                } else {
                    notifyChanges(poll);
                }
            }
        }

        void notifyChanges(ImmutableSet<DLSocketAddress> immutableSet) {
            if (this.monitors.isEmpty()) {
                this.changeQueue.add(immutableSet);
                return;
            }
            Iterator<ServerSetWatcher.ServerSetMonitor> it = this.monitors.iterator();
            while (it.hasNext()) {
                it.next().onChange(immutableSet);
            }
        }
    }

    @Test(timeout = 60000)
    public void testBlackoutHost() throws Exception {
        TestName testName = new TestName();
        RoutingService build = ConsistentHashRoutingService.newBuilder().serverSet(new NameServerSet(testName)).resolveFromName(true).numReplicas(997).blackoutSeconds(2).build();
        InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 3181);
        Address newInetAddress = Addresses.newInetAddress(inetSocketAddress);
        ArrayList arrayList = new ArrayList(1);
        arrayList.add(newInetAddress);
        testName.changeAddrs(arrayList);
        build.startService();
        RoutingService.RoutingContext of = RoutingService.RoutingContext.of(new DefaultRegionResolver());
        Assert.assertEquals(inetSocketAddress, build.getHost("test-blackout-host", of));
        build.removeHost(inetSocketAddress, new ChannelWriteException(new IOException("test exception")));
        try {
            build.getHost("test-blackout-host", of);
            Assert.fail("Should fail to get host since no brokers are available");
        } catch (NoBrokersAvailableException e) {
        }
        TimeUnit.SECONDS.sleep(3L);
        Assert.assertEquals(inetSocketAddress, build.getHost("test-blackout-host", of));
        build.stopService();
    }

    @Test(timeout = 60000)
    public void testPerformServerSetChangeOnName() throws Exception {
        TestName testName = new TestName();
        ConsistentHashRoutingService build = ConsistentHashRoutingService.newBuilder().serverSet(new NameServerSet(testName)).resolveFromName(true).numReplicas(997).build();
        ArrayList newArrayListWithExpectedSize = Lists.newArrayListWithExpectedSize(4);
        ArrayList newArrayListWithExpectedSize2 = Lists.newArrayListWithExpectedSize(4);
        ArrayList newArrayListWithExpectedSize3 = Lists.newArrayListWithExpectedSize(4);
        for (int i = 0; i < 4; i++) {
            newArrayListWithExpectedSize.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3180 + i)));
        }
        for (int i2 = 0; i2 < 4; i2++) {
            newArrayListWithExpectedSize2.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3180 + 2 + i2)));
        }
        for (int i3 = 0; i3 < 4; i3++) {
            newArrayListWithExpectedSize3.add(Addresses.newInetAddress(new InetSocketAddress("127.0.0.1", 3180 + 10 + i3)));
        }
        final ArrayList newArrayList = Lists.newArrayList();
        final ArrayList newArrayList2 = Lists.newArrayList();
        build.registerListener(new RoutingService.RoutingListener() { // from class: com.twitter.distributedlog.client.routing.TestConsistentHashRoutingService.1
            public void onServerLeft(SocketAddress socketAddress) {
                synchronized (newArrayList) {
                    newArrayList.add(socketAddress);
                    newArrayList.notifyAll();
                }
            }

            public void onServerJoin(SocketAddress socketAddress) {
                synchronized (newArrayList2) {
                    newArrayList2.add(socketAddress);
                    newArrayList2.notifyAll();
                }
            }
        });
        testName.changeAddrs(newArrayListWithExpectedSize);
        build.startService();
        synchronized (newArrayList2) {
            while (newArrayList2.size() < 4) {
                newArrayList2.wait();
            }
        }
        synchronized (newArrayList2) {
            Assert.assertEquals(4, newArrayList2.size());
        }
        synchronized (newArrayList) {
            Assert.assertEquals(0L, newArrayList.size());
        }
        Assert.assertEquals(4, build.shardId2Address.size());
        Assert.assertEquals(4, build.address2ShardId.size());
        for (int i4 = 0; i4 < 4; i4++) {
            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 3180 + i4);
            Assert.assertTrue(build.address2ShardId.containsKey(inetSocketAddress));
            SocketAddress socketAddress = (SocketAddress) build.shardId2Address.get(Integer.valueOf(((Integer) build.address2ShardId.get(inetSocketAddress)).intValue()));
            Assert.assertNotNull(socketAddress);
            Assert.assertEquals(inetSocketAddress, socketAddress);
        }
        testName.changeAddrs(newArrayListWithExpectedSize2);
        synchronized (newArrayList2) {
            while (newArrayList2.size() < 4 + 2) {
                newArrayList2.wait();
            }
        }
        synchronized (newArrayList) {
            while (newArrayList.size() < 4 - 2) {
                newArrayList.wait();
            }
        }
        Assert.assertEquals(4, build.shardId2Address.size());
        Assert.assertEquals(4, build.address2ShardId.size());
        for (int i5 = 0; i5 < 2; i5++) {
            Assert.assertFalse(build.address2ShardId.containsKey(new InetSocketAddress("127.0.0.1", 3180 + i5)));
        }
        for (int i6 = 0; i6 < 4; i6++) {
            InetSocketAddress inetSocketAddress2 = new InetSocketAddress("127.0.0.1", 3180 + 2 + i6);
            Assert.assertTrue(build.address2ShardId.containsKey(inetSocketAddress2));
            SocketAddress socketAddress2 = (SocketAddress) build.shardId2Address.get(Integer.valueOf(((Integer) build.address2ShardId.get(inetSocketAddress2)).intValue()));
            Assert.assertNotNull(socketAddress2);
            Assert.assertEquals(inetSocketAddress2, socketAddress2);
        }
        testName.changeAddrs(newArrayListWithExpectedSize3);
        synchronized (newArrayList2) {
            while (newArrayList2.size() < 4 + 2 + 4) {
                newArrayList2.wait();
            }
        }
        synchronized (newArrayList) {
            while (newArrayList.size() < (4 - 2) + 4) {
                newArrayList.wait();
            }
        }
        Assert.assertEquals(4, build.shardId2Address.size());
        Assert.assertEquals(4, build.address2ShardId.size());
        for (int i7 = 0; i7 < 2 + 4; i7++) {
            Assert.assertFalse(build.address2ShardId.containsKey(new InetSocketAddress("127.0.0.1", 3180 + i7)));
        }
        for (int i8 = 0; i8 < 4; i8++) {
            InetSocketAddress inetSocketAddress3 = new InetSocketAddress("127.0.0.1", 3180 + 10 + i8);
            Assert.assertTrue(build.address2ShardId.containsKey(inetSocketAddress3));
            SocketAddress socketAddress3 = (SocketAddress) build.shardId2Address.get(Integer.valueOf(((Integer) build.address2ShardId.get(inetSocketAddress3)).intValue()));
            Assert.assertNotNull(socketAddress3);
            Assert.assertEquals(inetSocketAddress3, socketAddress3);
        }
    }

    @Test(timeout = 60000)
    public void testPerformServerSetChangeOnServerSet() throws Exception {
        TestServerSetWatcher testServerSetWatcher = new TestServerSetWatcher();
        ConsistentHashRoutingService consistentHashRoutingService = new ConsistentHashRoutingService(testServerSetWatcher, 997, Integer.MAX_VALUE, NullStatsReceiver.get());
        Set newConcurrentHashSet = Sets.newConcurrentHashSet();
        Set newConcurrentHashSet2 = Sets.newConcurrentHashSet();
        Set newConcurrentHashSet3 = Sets.newConcurrentHashSet();
        for (int i = 0; i < 4; i++) {
            newConcurrentHashSet.add(new DLSocketAddress(i, new InetSocketAddress("127.0.0.1", 3180 + i)));
        }
        for (int i2 = 0; i2 < 4; i2++) {
            newConcurrentHashSet2.add(new DLSocketAddress(i2 + 2, new InetSocketAddress("127.0.0.1", 3180 + 4 + i2)));
        }
        for (int i3 = 0; i3 < 4; i3++) {
            newConcurrentHashSet3.add(new DLSocketAddress(i3, new InetSocketAddress("127.0.0.1", 3180 + 10 + i3)));
        }
        final ArrayList newArrayList = Lists.newArrayList();
        final ArrayList newArrayList2 = Lists.newArrayList();
        consistentHashRoutingService.registerListener(new RoutingService.RoutingListener() { // from class: com.twitter.distributedlog.client.routing.TestConsistentHashRoutingService.2
            public void onServerLeft(SocketAddress socketAddress) {
                synchronized (newArrayList) {
                    newArrayList.add(socketAddress);
                    newArrayList.notifyAll();
                }
            }

            public void onServerJoin(SocketAddress socketAddress) {
                synchronized (newArrayList2) {
                    newArrayList2.add(socketAddress);
                    newArrayList2.notifyAll();
                }
            }
        });
        testServerSetWatcher.notifyChanges(ImmutableSet.copyOf(newConcurrentHashSet));
        consistentHashRoutingService.startService();
        synchronized (newArrayList2) {
            while (newArrayList2.size() < 4) {
                newArrayList2.wait();
            }
        }
        synchronized (newArrayList2) {
            Assert.assertEquals(4, newArrayList2.size());
        }
        synchronized (newArrayList) {
            Assert.assertEquals(0L, newArrayList.size());
        }
        Assert.assertEquals(4, consistentHashRoutingService.shardId2Address.size());
        Assert.assertEquals(4, consistentHashRoutingService.address2ShardId.size());
        for (int i4 = 0; i4 < 4; i4++) {
            InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 3180 + i4);
            Assert.assertTrue(consistentHashRoutingService.address2ShardId.containsKey(inetSocketAddress));
            int intValue = ((Integer) consistentHashRoutingService.address2ShardId.get(inetSocketAddress)).intValue();
            Assert.assertEquals(i4, intValue);
            SocketAddress socketAddress = (SocketAddress) consistentHashRoutingService.shardId2Address.get(Integer.valueOf(intValue));
            Assert.assertNotNull(socketAddress);
            Assert.assertEquals(inetSocketAddress, socketAddress);
        }
        testServerSetWatcher.notifyChanges(ImmutableSet.copyOf(newConcurrentHashSet2));
        synchronized (newArrayList2) {
            while (newArrayList2.size() < 4 + 2) {
                newArrayList2.wait();
            }
        }
        synchronized (newArrayList) {
            while (newArrayList.size() < 2) {
                newArrayList.wait();
            }
        }
        Assert.assertEquals(4 + 2, consistentHashRoutingService.shardId2Address.size());
        Assert.assertEquals(4 + 2, consistentHashRoutingService.address2ShardId.size());
        for (int i5 = 0; i5 < 2; i5++) {
            InetSocketAddress inetSocketAddress2 = new InetSocketAddress("127.0.0.1", 3180 + i5);
            Assert.assertTrue(consistentHashRoutingService.address2ShardId.containsKey(inetSocketAddress2));
            int intValue2 = ((Integer) consistentHashRoutingService.address2ShardId.get(inetSocketAddress2)).intValue();
            Assert.assertEquals(i5, intValue2);
            SocketAddress socketAddress2 = (SocketAddress) consistentHashRoutingService.shardId2Address.get(Integer.valueOf(intValue2));
            Assert.assertNotNull(socketAddress2);
            Assert.assertEquals(inetSocketAddress2, socketAddress2);
        }
        for (int i6 = 0; i6 < 4; i6++) {
            InetSocketAddress inetSocketAddress3 = new InetSocketAddress("127.0.0.1", 3180 + 4 + i6);
            Assert.assertTrue(consistentHashRoutingService.address2ShardId.containsKey(inetSocketAddress3));
            int intValue3 = ((Integer) consistentHashRoutingService.address2ShardId.get(inetSocketAddress3)).intValue();
            Assert.assertEquals(i6 + 2, intValue3);
            SocketAddress socketAddress3 = (SocketAddress) consistentHashRoutingService.shardId2Address.get(Integer.valueOf(intValue3));
            Assert.assertNotNull(socketAddress3);
            Assert.assertEquals(inetSocketAddress3, socketAddress3);
        }
        testServerSetWatcher.notifyChanges(ImmutableSet.copyOf(newConcurrentHashSet3));
        synchronized (newArrayList2) {
            while (newArrayList2.size() < 4 + 2 + 4) {
                newArrayList2.wait();
            }
        }
        synchronized (newArrayList) {
            while (newArrayList.size() < 2 + 4) {
                newArrayList.wait();
            }
        }
        Assert.assertEquals(4 + 2, consistentHashRoutingService.shardId2Address.size());
        Assert.assertEquals(4 + 2, consistentHashRoutingService.address2ShardId.size());
        for (int i7 = 0; i7 < 4; i7++) {
            InetSocketAddress inetSocketAddress4 = new InetSocketAddress("127.0.0.1", 3180 + 10 + i7);
            Assert.assertTrue(consistentHashRoutingService.address2ShardId.containsKey(inetSocketAddress4));
            int intValue4 = ((Integer) consistentHashRoutingService.address2ShardId.get(inetSocketAddress4)).intValue();
            Assert.assertEquals(i7, intValue4);
            SocketAddress socketAddress4 = (SocketAddress) consistentHashRoutingService.shardId2Address.get(Integer.valueOf(intValue4));
            Assert.assertNotNull(socketAddress4);
            Assert.assertEquals(inetSocketAddress4, socketAddress4);
        }
        for (int i8 = 0; i8 < 2; i8++) {
            InetSocketAddress inetSocketAddress5 = new InetSocketAddress("127.0.0.1", 3180 + 4 + 2 + i8);
            Assert.assertTrue(consistentHashRoutingService.address2ShardId.containsKey(inetSocketAddress5));
            int intValue5 = ((Integer) consistentHashRoutingService.address2ShardId.get(inetSocketAddress5)).intValue();
            Assert.assertEquals(4 + i8, intValue5);
            SocketAddress socketAddress5 = (SocketAddress) consistentHashRoutingService.shardId2Address.get(Integer.valueOf(intValue5));
            Assert.assertNotNull(socketAddress5);
            Assert.assertEquals(inetSocketAddress5, socketAddress5);
        }
    }
}
