package co.cask.cdap.messaging.distributed;

import co.cask.cdap.api.dataset.lib.CloseableIterator;
import co.cask.cdap.api.metrics.MetricsCollectionService;
import co.cask.cdap.common.ServiceUnavailableException;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.DiscoveryRuntimeModule;
import co.cask.cdap.common.guice.LocationRuntimeModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.metrics.NoOpMetricsCollectionService;
import co.cask.cdap.common.namespace.InMemoryNamespaceClient;
import co.cask.cdap.common.namespace.NamespaceQueryAdmin;
import co.cask.cdap.common.service.Retries;
import co.cask.cdap.common.service.RetryStrategies;
import co.cask.cdap.common.utils.Tasks;
import co.cask.cdap.messaging.MessagingService;
import co.cask.cdap.messaging.client.StoreRequestBuilder;
import co.cask.cdap.messaging.data.RawMessage;
import co.cask.cdap.messaging.guice.MessagingServerRuntimeModule;
import co.cask.cdap.messaging.store.TableFactory;
import co.cask.cdap.messaging.store.cache.CachingTableFactory;
import co.cask.cdap.messaging.store.cache.DefaultMessageTableCacheProvider;
import co.cask.cdap.messaging.store.cache.MessageTableCacheProvider;
import co.cask.cdap.messaging.store.leveldb.LevelDBTableFactory;
import co.cask.cdap.proto.id.NamespaceId;
import co.cask.cdap.proto.id.TopicId;
import com.google.common.util.concurrent.Service;
import com.google.inject.AbstractModule;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import com.google.inject.PrivateModule;
import com.google.inject.Scopes;
import com.google.inject.name.Names;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.twill.internal.zookeeper.InMemoryZKServer;
import org.apache.twill.internal.zookeeper.KillZKSession;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.ZooKeeper;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:co/cask/cdap/messaging/distributed/LeaderElectionMessagingServiceTest.class */
public class LeaderElectionMessagingServiceTest {

    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
    private static InMemoryZKServer zkServer;
    private static CConfiguration cConf;
    private static NamespaceQueryAdmin namespaceQueryAdmin;
    private static LevelDBTableFactory levelDBTableFactory;

    @BeforeClass
    public static void init() throws IOException {
        zkServer = InMemoryZKServer.builder().setDataDir(TEMP_FOLDER.newFolder()).build();
        zkServer.startAndWait();
        cConf = CConfiguration.create();
        cConf.set("zookeeper.quorum", zkServer.getConnectionStr());
        cConf.setInt("zookeeper.session.timeout.millis", 2000);
        cConf.set("local.data.dir", TEMP_FOLDER.newFolder().getAbsolutePath());
        cConf.set("messaging.http.server.bind.address", InetAddress.getLocalHost().getHostName());
        cConf.set("messaging.system.topics", "topic");
        cConf.setLong("messaging.ha.fencing.delay.seconds", 0L);
        namespaceQueryAdmin = new InMemoryNamespaceClient();
        levelDBTableFactory = new LevelDBTableFactory(cConf);
    }

    @AfterClass
    public static void finish() {
        zkServer.stopAndWait();
    }

    @Test
    public void testTransition() throws Throwable {
        final TopicId topicId = NamespaceId.SYSTEM.topic("topic");
        Injector createInjector = createInjector(0);
        Injector createInjector2 = createInjector(1);
        ZKClientService zKClientService = (ZKClientService) createInjector.getInstance(ZKClientService.class);
        zKClientService.startAndWait();
        final Service service = (MessagingService) createInjector.getInstance(MessagingService.class);
        if (service instanceof Service) {
            service.startAndWait();
        }
        service.publish(StoreRequestBuilder.of(topicId).addPayload("Testing1").build());
        ZKClientService zKClientService2 = (ZKClientService) createInjector2.getInstance(ZKClientService.class);
        zKClientService2.startAndWait();
        final Service service2 = (MessagingService) createInjector2.getInstance(MessagingService.class);
        if (service2 instanceof Service) {
            service2.startAndWait();
        }
        try {
            service2.listTopics(NamespaceId.SYSTEM);
            Assert.fail("Expected service unavailable");
        } catch (ServiceUnavailableException e) {
        }
        KillZKSession.kill((ZooKeeper) zKClientService.getZooKeeperSupplier().get(), zKClientService.getConnectString(), 10000);
        Assert.assertEquals(Arrays.asList("Testing1", "Testing2"), (List) Retries.callWithRetries(new Retries.Callable<List<String>, Throwable>() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingServiceTest.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public List<String> m5call() throws Throwable {
                service2.publish(StoreRequestBuilder.of(topicId).addPayload("Testing2").build());
                ArrayList arrayList = new ArrayList();
                CloseableIterator fetch = service2.prepareFetch(topicId).fetch();
                Throwable th = null;
                while (fetch.hasNext()) {
                    try {
                        try {
                            arrayList.add(new String(((RawMessage) fetch.next()).getPayload(), "UTF-8"));
                        } finally {
                        }
                    } catch (Throwable th2) {
                        if (fetch != null) {
                            if (th != null) {
                                try {
                                    fetch.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                fetch.close();
                            }
                        }
                        throw th2;
                    }
                }
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fetch.close();
                    }
                }
                return arrayList;
            }
        }, RetryStrategies.timeLimit(10L, TimeUnit.SECONDS, RetryStrategies.fixDelay(1L, TimeUnit.SECONDS))));
        if (service2 instanceof Service) {
            service2.stopAndWait();
        }
        Assert.assertEquals(Arrays.asList("Testing1", "Testing2"), (List) Retries.callWithRetries(new Retries.Callable<List<String>, Throwable>() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingServiceTest.2
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public List<String> m6call() throws Throwable {
                ArrayList arrayList = new ArrayList();
                CloseableIterator fetch = service.prepareFetch(topicId).fetch();
                Throwable th = null;
                while (fetch.hasNext()) {
                    try {
                        try {
                            arrayList.add(new String(((RawMessage) fetch.next()).getPayload(), "UTF-8"));
                        } catch (Throwable th2) {
                            if (fetch != null) {
                                if (th != null) {
                                    try {
                                        fetch.close();
                                    } catch (Throwable th3) {
                                        th.addSuppressed(th3);
                                    }
                                } else {
                                    fetch.close();
                                }
                            }
                            throw th2;
                        }
                    } finally {
                    }
                }
                if (fetch != null) {
                    if (0 != 0) {
                        try {
                            fetch.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        fetch.close();
                    }
                }
                return arrayList;
            }
        }, RetryStrategies.timeLimit(10L, TimeUnit.SECONDS, RetryStrategies.fixDelay(1L, TimeUnit.SECONDS))));
        zKClientService.stopAndWait();
        zKClientService2.stopAndWait();
    }

    @Test
    public void testFencing() throws IOException, InterruptedException, ExecutionException, TimeoutException {
        final TopicId topicId = NamespaceId.SYSTEM.topic("topic");
        long j = cConf.getLong("messaging.ha.fencing.delay.seconds");
        cConf.setLong("messaging.ha.fencing.delay.seconds", 3L);
        try {
            Injector createInjector = createInjector(0);
            ZKClientService zKClientService = (ZKClientService) createInjector.getInstance(ZKClientService.class);
            zKClientService.startAndWait();
            final Service service = (MessagingService) createInjector.getInstance(MessagingService.class);
            if (service instanceof Service) {
                service.startAndWait();
            }
            try {
                service.listTopics(NamespaceId.SYSTEM);
                Assert.fail("Expected service unavailable exception");
            } catch (ServiceUnavailableException e) {
            }
            Tasks.waitFor(topicId, new Callable<TopicId>() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingServiceTest.3
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public TopicId call() throws Exception {
                    try {
                        return service.getTopic(topicId).getTopicId();
                    } catch (ServiceUnavailableException e2) {
                        return null;
                    }
                }
            }, 10L, TimeUnit.SECONDS, 200L, TimeUnit.MILLISECONDS);
            if (service instanceof Service) {
                service.stopAndWait();
            }
            zKClientService.stopAndWait();
            cConf.setLong("messaging.ha.fencing.delay.seconds", j);
        } catch (Throwable th) {
            cConf.setLong("messaging.ha.fencing.delay.seconds", j);
            throw th;
        }
    }

    private Injector createInjector(int i) {
        CConfiguration copy = CConfiguration.copy(cConf);
        copy.setInt("messaging.container.instance.id", i);
        return Guice.createInjector(new Module[]{new ConfigModule(copy), new ZKClientModule(), new DiscoveryRuntimeModule().getDistributedModules(), new LocationRuntimeModule().getDistributedModules(), new AbstractModule() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingServiceTest.4
            protected void configure() {
                bind(MetricsCollectionService.class).to(NoOpMetricsCollectionService.class);
                bind(NamespaceQueryAdmin.class).toInstance(LeaderElectionMessagingServiceTest.namespaceQueryAdmin);
            }
        }, new PrivateModule() { // from class: co.cask.cdap.messaging.distributed.LeaderElectionMessagingServiceTest.5
            protected void configure() {
                bind(TableFactory.class).annotatedWith(Names.named("delegate.table.factory")).toInstance(LeaderElectionMessagingServiceTest.levelDBTableFactory);
                bind(MessageTableCacheProvider.class).to(DefaultMessageTableCacheProvider.class).in(Scopes.SINGLETON);
                bind(TableFactory.class).to(CachingTableFactory.class);
                MessagingServerRuntimeModule.bindHandlers(binder(), "messaging.http.handler");
                bind(MessagingService.class).to(LeaderElectionMessagingService.class).in(Scopes.SINGLETON);
                expose(MessagingService.class);
            }
        }});
    }
}
