package co.cask.cdap.common.zookeeper.coordination;

import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.discovery.ResolvingDiscoverable;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import org.apache.twill.common.Cancellable;
import org.apache.twill.discovery.Discoverable;
import org.apache.twill.discovery.DiscoveryService;
import org.apache.twill.discovery.DiscoveryServiceClient;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.zookeeper.ZKClientService;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/common/zookeeper/coordination/ResourceCoordinatorTest.class */
public class ResourceCoordinatorTest {
    private static final Logger LOG = LoggerFactory.getLogger(ResourceCoordinatorTest.class);

    @ClassRule
    public static final TemporaryFolder TMP_FOLDER = new TemporaryFolder();
    private static InMemoryZKServer zkServer;

    /* JADX WARN: Finally extract failed */
    @Test
    public void testAssignment() throws InterruptedException, ExecutionException {
        CConfiguration create = CConfiguration.create();
        create.set("zookeeper.quorum", zkServer.getConnectionStr());
        Injector createInjector = Guice.createInjector(new Module[]{new ConfigModule(create), new ZKClientModule(), new DiscoveryRuntimeModule().getDistributedModules()});
        ZKClientService zKClientService = (ZKClientService) createInjector.getInstance(ZKClientService.class);
        zKClientService.startAndWait();
        DiscoveryService discoveryService = (DiscoveryService) createInjector.getInstance(DiscoveryService.class);
        try {
            ResourceCoordinator resourceCoordinator = new ResourceCoordinator(zKClientService, (DiscoveryServiceClient) createInjector.getInstance(DiscoveryServiceClient.class), new BalancedAssignmentStrategy());
            resourceCoordinator.startAndWait();
            try {
                ResourceCoordinatorClient resourceCoordinatorClient = new ResourceCoordinatorClient(zKClientService);
                resourceCoordinatorClient.startAndWait();
                try {
                    ResourceRequirement build = ResourceRequirement.builder("test-assignment").addPartitions("p", 5, 1).build();
                    resourceCoordinatorClient.submitRequirement(build).get();
                    Assert.assertEquals(build, resourceCoordinatorClient.fetchRequirement(build.getName()).get());
                    Discoverable createDiscoverable = createDiscoverable("test-assignment", 10000);
                    Cancellable register = discoveryService.register(ResolvingDiscoverable.of(createDiscoverable));
                    SynchronousQueue synchronousQueue = new SynchronousQueue();
                    Semaphore semaphore = new Semaphore(0);
                    Cancellable subscribe = subscribe(resourceCoordinatorClient, createDiscoverable, synchronousQueue, semaphore);
                    Assert.assertNotNull(synchronousQueue.poll(30L, TimeUnit.SECONDS));
                    Assert.assertEquals(5L, r0.size());
                    register.cancel();
                    Assert.assertTrue(synchronousQueue.poll(30L, TimeUnit.SECONDS).isEmpty());
                    Cancellable register2 = discoveryService.register(ResolvingDiscoverable.of(createDiscoverable));
                    Assert.assertNotNull(synchronousQueue.poll(30L, TimeUnit.SECONDS));
                    Assert.assertEquals(5L, r0.size());
                    Discoverable createDiscoverable2 = createDiscoverable("test-assignment", 10001);
                    Cancellable register3 = discoveryService.register(ResolvingDiscoverable.of(createDiscoverable2));
                    Assert.assertNotNull(synchronousQueue.poll(30L, TimeUnit.SECONDS));
                    Assert.assertEquals(3L, r0.size());
                    register2.cancel();
                    Assert.assertTrue(synchronousQueue.poll(30L, TimeUnit.SECONDS).isEmpty());
                    subscribe.cancel();
                    Assert.assertTrue(semaphore.tryAcquire(2L, TimeUnit.SECONDS));
                    Cancellable subscribe2 = subscribe(resourceCoordinatorClient, createDiscoverable2, synchronousQueue, semaphore);
                    Assert.assertNotNull(synchronousQueue.poll(30L, TimeUnit.SECONDS));
                    Assert.assertEquals(5L, r0.size());
                    resourceCoordinatorClient.submitRequirement(ResourceRequirement.builder("test-assignment").build());
                    Assert.assertTrue(synchronousQueue.poll(30L, TimeUnit.SECONDS).isEmpty());
                    resourceCoordinatorClient.submitRequirement(ResourceRequirement.builder("test-assignment").addPartitions("p", 1, 1).build());
                    Assert.assertNotNull(synchronousQueue.poll(30L, TimeUnit.SECONDS));
                    Assert.assertEquals(1L, r0.size());
                    resourceCoordinatorClient.deleteRequirement(build.getName());
                    Assert.assertTrue(synchronousQueue.poll(30L, TimeUnit.SECONDS).isEmpty());
                    subscribe2.cancel();
                    Assert.assertTrue(semaphore.tryAcquire(2L, TimeUnit.SECONDS));
                    register3.cancel();
                    resourceCoordinatorClient.stopAndWait();
                    resourceCoordinator.stopAndWait();
                } catch (Throwable th) {
                    resourceCoordinatorClient.stopAndWait();
                    throw th;
                }
            } catch (Throwable th2) {
                resourceCoordinator.stopAndWait();
                throw th2;
            }
        } finally {
            zKClientService.stopAndWait();
        }
    }

    @BeforeClass
    public static void init() throws IOException {
        zkServer = InMemoryZKServer.builder().setDataDir(TMP_FOLDER.newFolder()).build();
        zkServer.startAndWait();
    }

    @AfterClass
    public static void finish() {
        zkServer.stopAndWait();
    }

    private Cancellable subscribe(ResourceCoordinatorClient resourceCoordinatorClient, final Discoverable discoverable, final BlockingQueue<Collection<PartitionReplica>> blockingQueue, final Semaphore semaphore) {
        return resourceCoordinatorClient.subscribe(discoverable.getName(), new ResourceHandler(discoverable) { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorTest.1
            public void onChange(Collection<PartitionReplica> collection) {
                try {
                    ResourceCoordinatorTest.LOG.debug("Discoverable {} Received: {}", Integer.valueOf(discoverable.getSocketAddress().getPort()), collection);
                    blockingQueue.put(collection);
                } catch (InterruptedException e) {
                    ResourceCoordinatorTest.LOG.error("Interrupted.", e);
                }
            }

            public void finished(Throwable th) {
                ResourceCoordinatorTest.LOG.debug("Finished on {}", Integer.valueOf(discoverable.getSocketAddress().getPort()));
                if (th == null) {
                    semaphore.release();
                } else {
                    ResourceCoordinatorTest.LOG.error("Finished with failure for {}", Integer.valueOf(discoverable.getSocketAddress().getPort()), th);
                }
            }
        });
    }

    private Discoverable createDiscoverable(final String str, int i) {
        InetSocketAddress inetSocketAddress;
        try {
            inetSocketAddress = new InetSocketAddress(InetAddress.getLocalHost(), i);
        } catch (UnknownHostException e) {
            inetSocketAddress = new InetSocketAddress(i);
        }
        final InetSocketAddress inetSocketAddress2 = inetSocketAddress;
        return new Discoverable() { // from class: co.cask.cdap.common.zookeeper.coordination.ResourceCoordinatorTest.2
            public String getName() {
                return str;
            }

            public InetSocketAddress getSocketAddress() {
                return inetSocketAddress2;
            }
        };
    }
}
