package org.apache.hive.druid.org.apache.druid.curator.announcement;

import java.util.HashSet;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.framework.api.CuratorEventType;
import org.apache.curator.framework.api.CuratorListener;
import org.apache.curator.framework.api.GetDataWatchBackgroundStatable;
import org.apache.curator.framework.api.transaction.CuratorOp;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.framework.api.transaction.CuratorTransactionResult;
import org.apache.curator.test.KillSession;
import org.apache.curator.utils.ZKPaths;
import org.apache.hive.druid.com.google.common.collect.Sets;
import org.apache.hive.druid.org.apache.druid.curator.CuratorTestBase;
import org.apache.hive.druid.org.apache.druid.java.util.common.StringUtils;
import org.apache.hive.druid.org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.hive.druid.org.apache.druid.java.util.common.logger.Logger;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/hive/druid/org/apache/druid/curator/announcement/AnnouncerTest.class */
public class AnnouncerTest extends CuratorTestBase {
    private static final Logger log = new Logger(AnnouncerTest.class);
    private ExecutorService exec;

    @Before
    public void setUp() throws Exception {
        setupServerAndCurator();
        this.exec = Execs.singleThreaded("test-announcer-sanity-%s");
    }

    @After
    public void tearDown() {
        tearDownServerAndCurator();
    }

    @Test(timeout = 60000)
    public void testSanity() throws Exception {
        this.curator.start();
        this.curator.blockUntilConnected();
        Announcer announcer = new Announcer(this.curator, this.exec);
        announcer.initializeAddedChildren();
        byte[] utf8 = StringUtils.toUtf8("billy");
        announcer.announce("/test1", utf8);
        Assert.assertNull("/test1 does not exists", this.curator.checkExists().forPath("/test1"));
        Assert.assertNull("/somewhere/test2 does not exists", this.curator.checkExists().forPath("/somewhere/test2"));
        announcer.start();
        while (!announcer.getAddedChildren().contains("/test1")) {
            Thread.sleep(100L);
        }
        try {
            Assert.assertArrayEquals("/test1 has data", utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/test1"));
            Assert.assertNull("/somewhere/test2 still does not exist", this.curator.checkExists().forPath("/somewhere/test2"));
            announcer.announce("/somewhere/test2", utf8);
            Assert.assertArrayEquals("/test1 still has data", utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/test1"));
            Assert.assertArrayEquals("/somewhere/test2 has data", utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/somewhere/test2"));
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.curator.getCuratorListenable().addListener(new CuratorListener() { // from class: org.apache.hive.druid.org.apache.druid.curator.announcement.AnnouncerTest.1
                public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {
                    if (curatorEvent.getType() == CuratorEventType.CREATE && curatorEvent.getPath().equals("/test1")) {
                        countDownLatch.countDown();
                    }
                }
            });
            List forOperations = this.curator.transaction().forOperations(new CuratorOp[]{(CuratorOp) this.curator.transactionOp().delete().forPath("/test1")});
            Assert.assertEquals(1L, forOperations.size());
            Assert.assertEquals(KeeperException.Code.OK.intValue(), ((CuratorTransactionResult) forOperations.iterator().next()).getError());
            Assert.assertTrue("Wait for /test1 to be created", this.timing.forWaiting().awaitLatch(countDownLatch));
            Assert.assertArrayEquals("expect /test1 data is restored", utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/test1"));
            Assert.assertArrayEquals("expect /somewhere/test2 is still there", utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/somewhere/test2"));
            announcer.unannounce("/test1");
            Assert.assertNull("expect /test1 unannounced", this.curator.checkExists().forPath("/test1"));
            Assert.assertArrayEquals("expect /somewhere/test2 is still still there", utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/somewhere/test2"));
            announcer.stop();
            Assert.assertNull("expect /test1 remains unannounced", this.curator.checkExists().forPath("/test1"));
            Assert.assertNull("expect /somewhere/test2 unannounced", this.curator.checkExists().forPath("/somewhere/test2"));
        } catch (Throwable th) {
            announcer.stop();
            throw th;
        }
    }

    @Test(timeout = 60000)
    public void testSessionKilled() throws Exception {
        this.curator.start();
        this.curator.blockUntilConnected();
        Announcer announcer = new Announcer(this.curator, this.exec);
        try {
            ((CuratorTransactionBridge) this.curator.inTransaction().create().forPath("/somewhere")).and().commit();
            announcer.start();
            byte[] utf8 = StringUtils.toUtf8("billy");
            final HashSet newHashSet = Sets.newHashSet(new String[]{"/test1", "/somewhere/test2"});
            announcer.announce("/test1", utf8);
            announcer.announce("/somewhere/test2", utf8);
            Assert.assertArrayEquals(utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/test1"));
            Assert.assertArrayEquals(utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/somewhere/test2"));
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            this.curator.getCuratorListenable().addListener(new CuratorListener() { // from class: org.apache.hive.druid.org.apache.druid.curator.announcement.AnnouncerTest.2
                public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {
                    if (curatorEvent.getType() == CuratorEventType.CREATE) {
                        newHashSet.remove(curatorEvent.getPath());
                        if (newHashSet.isEmpty()) {
                            countDownLatch.countDown();
                        }
                    }
                }
            });
            KillSession.kill(this.curator.getZookeeperClient().getZooKeeper(), this.server.getConnectString());
            Assert.assertTrue(this.timing.forWaiting().awaitLatch(countDownLatch));
            Assert.assertArrayEquals(utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/test1"));
            Assert.assertArrayEquals(utf8, (byte[]) ((GetDataWatchBackgroundStatable) this.curator.getData().decompressed()).forPath("/somewhere/test2"));
            announcer.stop();
            while (true) {
                if (this.curator.checkExists().forPath("/test1") == null && this.curator.checkExists().forPath("/somewhere/test2") == null) {
                    Assert.assertNull(this.curator.checkExists().forPath("/test1"));
                    Assert.assertNull(this.curator.checkExists().forPath("/somewhere/test2"));
                    announcer.stop();
                    return;
                }
                Thread.sleep(100L);
            }
        } catch (Throwable th) {
            announcer.stop();
            throw th;
        }
    }

    @Test
    public void testCleansUpItsLittleTurdlings() throws Exception {
        this.curator.start();
        this.curator.blockUntilConnected();
        Announcer announcer = new Announcer(this.curator, this.exec);
        byte[] utf8 = StringUtils.toUtf8("billy");
        String path = ZKPaths.getPathAndNode("/somewhere/test2").getPath();
        announcer.start();
        try {
            Assert.assertNull(this.curator.checkExists().forPath(path));
            awaitAnnounce(announcer, "/somewhere/test2", utf8, true);
            Assert.assertNotNull(this.curator.checkExists().forPath(path));
            announcer.stop();
            Assert.assertNull(this.curator.checkExists().forPath(path));
        } catch (Throwable th) {
            announcer.stop();
            throw th;
        }
    }

    @Test
    public void testLeavesBehindTurdlingsThatAlreadyExisted() throws Exception {
        this.curator.start();
        this.curator.blockUntilConnected();
        Announcer announcer = new Announcer(this.curator, this.exec);
        byte[] utf8 = StringUtils.toUtf8("billy");
        String path = ZKPaths.getPathAndNode("/somewhere/test2").getPath();
        this.curator.create().forPath(path);
        Stat stat = (Stat) this.curator.checkExists().forPath(path);
        announcer.start();
        try {
            Assert.assertEquals(stat.getMzxid(), ((Stat) this.curator.checkExists().forPath(path)).getMzxid());
            awaitAnnounce(announcer, "/somewhere/test2", utf8, true);
            Assert.assertEquals(stat.getMzxid(), ((Stat) this.curator.checkExists().forPath(path)).getMzxid());
            announcer.stop();
            Assert.assertEquals(stat.getMzxid(), ((Stat) this.curator.checkExists().forPath(path)).getMzxid());
        } catch (Throwable th) {
            announcer.stop();
            throw th;
        }
    }

    @Test
    public void testLeavesBehindTurdlingsWhenToldTo() throws Exception {
        this.curator.start();
        this.curator.blockUntilConnected();
        Announcer announcer = new Announcer(this.curator, this.exec);
        byte[] utf8 = StringUtils.toUtf8("billy");
        String path = ZKPaths.getPathAndNode("/somewhere/test2").getPath();
        announcer.start();
        try {
            Assert.assertNull(this.curator.checkExists().forPath(path));
            awaitAnnounce(announcer, "/somewhere/test2", utf8, false);
            Assert.assertNotNull(this.curator.checkExists().forPath(path));
            announcer.stop();
            Assert.assertNotNull(this.curator.checkExists().forPath(path));
        } catch (Throwable th) {
            announcer.stop();
            throw th;
        }
    }

    private void awaitAnnounce(Announcer announcer, final String str, byte[] bArr, boolean z) throws InterruptedException {
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        this.curator.getCuratorListenable().addListener(new CuratorListener() { // from class: org.apache.hive.druid.org.apache.druid.curator.announcement.AnnouncerTest.3
            public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) {
                if (curatorEvent.getType() == CuratorEventType.CREATE && curatorEvent.getPath().equals(str)) {
                    countDownLatch.countDown();
                }
            }
        });
        announcer.announce(str, bArr, z);
        countDownLatch.await();
    }
}
