package co.cask.cdap.security.zookeeper;

import co.cask.cdap.api.common.Bytes;
import co.cask.cdap.common.conf.CConfiguration;
import co.cask.cdap.common.guice.ConfigModule;
import co.cask.cdap.common.guice.ZKClientModule;
import co.cask.cdap.common.io.Codec;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.SettableFuture;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import java.io.IOException;
import java.util.ArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.twill.zookeeper.ZKClientService;
import org.apache.zookeeper.ZooDefs;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:co/cask/cdap/security/zookeeper/SharedResourceCacheTest.class */
public class SharedResourceCacheTest {
    private static final String ZK_NAMESPACE = "/SharedResourceCacheTest";
    private static final Logger LOG = LoggerFactory.getLogger(SharedResourceCacheTest.class);
    private static MiniZooKeeperCluster zkCluster;
    private static String zkConnectString;
    private static Injector injector1;
    private static Injector injector2;

    /* loaded from: input_file:co/cask/cdap/security/zookeeper/SharedResourceCacheTest$StringCodec.class */
    private static final class StringCodec implements Codec<String> {
        private StringCodec() {
        }

        public byte[] encode(String str) throws IOException {
            return Bytes.toBytes(str);
        }

        /* renamed from: decode, reason: merged with bridge method [inline-methods] */
        public String m5decode(byte[] bArr) throws IOException {
            return Bytes.toString(bArr);
        }
    }

    @BeforeClass
    public static void startUp() throws Exception {
        HBaseTestingUtility hBaseTestingUtility = new HBaseTestingUtility();
        zkCluster = hBaseTestingUtility.startMiniZKCluster();
        zkConnectString = hBaseTestingUtility.getConfiguration().get("hbase.zookeeper.quorum") + ":" + zkCluster.getClientPort();
        LOG.info("Running ZK cluster at " + zkConnectString);
        CConfiguration create = CConfiguration.create();
        create.set("zookeeper.quorum", zkConnectString);
        injector1 = Guice.createInjector(new Module[]{new ConfigModule(create, hBaseTestingUtility.getConfiguration()), new ZKClientModule()});
        injector2 = Guice.createInjector(new Module[]{new ConfigModule(create, hBaseTestingUtility.getConfiguration()), new ZKClientModule()});
    }

    @AfterClass
    public static void tearDown() throws Exception {
        zkCluster.shutdown();
    }

    @Test
    public void testCache() throws Exception {
        ArrayList newArrayList = Lists.newArrayList(ZooDefs.Ids.OPEN_ACL_UNSAFE);
        ZKClientService zKClientService = (ZKClientService) injector1.getInstance(ZKClientService.class);
        zKClientService.startAndWait();
        SharedResourceCache<String> sharedResourceCache = new SharedResourceCache<>(zKClientService, new StringCodec(), "/SharedResourceCacheTest/testCache", newArrayList);
        sharedResourceCache.init();
        sharedResourceCache.put("key1", "value1");
        ZKClientService zKClientService2 = (ZKClientService) injector2.getInstance(ZKClientService.class);
        zKClientService2.startAndWait();
        SharedResourceCache<String> sharedResourceCache2 = new SharedResourceCache<>(zKClientService2, new StringCodec(), "/SharedResourceCacheTest/testCache", newArrayList);
        sharedResourceCache2.init();
        waitForEntry(sharedResourceCache2, "key1", "value1", 10000L);
        Assert.assertEquals(sharedResourceCache.get("key1"), sharedResourceCache2.get("key1"));
        sharedResourceCache.put("key2", "value2");
        waitForEntry(sharedResourceCache2, "key2", "value2", 10000L);
        Assert.assertEquals(sharedResourceCache.get("key2"), sharedResourceCache2.get("key2"));
        sharedResourceCache2.put("key3", "value3");
        waitForEntry(sharedResourceCache, "key3", "value3", 10000L);
        Assert.assertEquals(sharedResourceCache2.get("key3"), sharedResourceCache.get("key3"));
        final SettableFuture create = SettableFuture.create();
        BaseResourceListener<String> baseResourceListener = new BaseResourceListener<String>() { // from class: co.cask.cdap.security.zookeeper.SharedResourceCacheTest.1
            public void onResourceUpdate(String str, String str2) {
                SharedResourceCacheTest.LOG.info("Resource updated: {}={}", str, str2);
                if (str.equals("key2")) {
                    create.set(str2);
                }
            }
        };
        sharedResourceCache2.addListener(baseResourceListener);
        sharedResourceCache.put("key2", "value2.2");
        Assert.assertEquals("value2.2", (String) create.get());
        Assert.assertEquals("value2.2", sharedResourceCache2.get("key2"));
        sharedResourceCache2.removeListener(baseResourceListener);
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        sharedResourceCache.addListener(new BaseResourceListener<String>() { // from class: co.cask.cdap.security.zookeeper.SharedResourceCacheTest.2
            public void onResourceDelete(String str) {
                SharedResourceCacheTest.LOG.info("Resource deleted on cache 1 {}", str);
                if (str.equals("key3")) {
                    countDownLatch.countDown();
                }
            }
        });
        final SettableFuture create2 = SettableFuture.create();
        sharedResourceCache2.addListener(new BaseResourceListener<String>() { // from class: co.cask.cdap.security.zookeeper.SharedResourceCacheTest.3
            public void onResourceDelete(String str) {
                SharedResourceCacheTest.LOG.info("Resource deleted on cache 2 {}", str);
                if (str.equals("key3")) {
                    create2.set(str);
                    countDownLatch.countDown();
                }
            }
        });
        sharedResourceCache.remove("key3");
        Assert.assertEquals("key3", (String) create2.get());
        Assert.assertNull(sharedResourceCache2.get("key3"));
        countDownLatch.await(5L, TimeUnit.SECONDS);
        Assert.assertEquals(sharedResourceCache, sharedResourceCache2);
    }

    private void waitForEntry(SharedResourceCache<String> sharedResourceCache, String str, String str2, long j) throws InterruptedException {
        boolean equals = str2.equals((String) sharedResourceCache.get(str));
        Stopwatch start = new Stopwatch().start();
        while (!equals && start.elapsedTime(TimeUnit.MILLISECONDS) < j) {
            TimeUnit.MILLISECONDS.sleep(200L);
            equals = str2.equals((String) sharedResourceCache.get(str));
        }
        if (!equals) {
            throw new RuntimeException("Timed out waiting for expected value '" + str2 + "' in cache");
        }
    }
}
