/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.connect.replicator.util;

import com.google.common.base.Predicate;
import com.google.common.io.Files;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import io.confluent.connect.replicator.util.ReplicatorAdminClientWithZk;
import io.confluent.connect.replicator.util.TopicMetadata;
import java.io.File;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminOperationException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class ReplicatorAdminClientWithZkTest {
    private File kafkaLogDir;
    private TestingServer zkServer;
    private KafkaServerStartable kafkaServer;
    private ReplicatorAdminClient client;

    @Before
    public void startup() throws Exception {
        this.zkServer = new TestingServer();
        this.kafkaLogDir = Files.createTempDir();
        this.kafkaLogDir.deleteOnExit();
        Properties config = new Properties();
        config.put(KafkaConfig.ZkConnectProp(), this.zkServer.getConnectString());
        config.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:0");
        config.put(KafkaConfig.LogDirProp(), this.kafkaLogDir.getAbsolutePath());
        this.kafkaServer = new KafkaServerStartable(KafkaConfig.fromProps((Properties)config));
        this.kafkaServer.startup();
        ZkUtils zkUtils = ZkUtils.apply((String)this.zkServer.getConnectString(), (int)6000, (int)10000, (boolean)false);
        this.client = new ReplicatorAdminClientWithZk(zkUtils, (Time)new SystemTime(), 60000L);
    }

    @After
    public void shutdown() throws Exception {
        if (this.client != null) {
            this.client.close();
        }
        if (this.kafkaServer != null) {
            this.kafkaServer.shutdown();
            this.kafkaServer.awaitShutdown();
        }
        if (this.zkServer != null) {
            this.zkServer.close();
        }
    }

    @Test
    public void testClusterId() throws Exception {
        Assert.assertNotNull((Object)this.client.clusterId());
    }

    @Test
    public void testQueryBrokerMetadata() throws Exception {
        Assert.assertEquals((long)1L, (long)this.client.aliveBrokers());
    }

    @Test
    public void testQueryNonExistingTopic() {
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertNull((Object)this.client.topicMetadata("foo"));
    }

    @Test
    public void testQueryNonExistingPartition() throws Exception {
        int numPartitions = 2;
        Assert.assertTrue((boolean)this.client.createTopic("foo", numPartitions, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertFalse((boolean)this.client.partitionExists(new TopicPartition("foo", 4)));
    }

    @Test
    public void testCreateTopicAndPartitionAndTopicExists() throws Exception {
        int numPartitions = 4;
        Assert.assertTrue((boolean)this.client.createTopic("foo", numPartitions, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
            }
        });
        TopicMetadata topicMetadata = this.client.topicMetadata("foo");
        Assert.assertNotNull((Object)topicMetadata);
        Assert.assertEquals((long)numPartitions, (long)topicMetadata.numPartitions());
        Assert.assertTrue((boolean)this.client.partitionExists(new TopicPartition("foo", 3)));
        Assert.assertTrue((boolean)this.client.topicExists("foo"));
    }

    @Test
    public void testCreateAlreadyExistingTopic() throws Exception {
        int numPartitions = 4;
        Assert.assertTrue((boolean)this.client.createTopic("foo", numPartitions, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertFalse((boolean)this.client.createTopic("foo", numPartitions, 1, new Properties()));
    }

    @Test
    public void testAddPartitionsOnExistingTopic() throws Exception {
        final AtomicBoolean notified = new AtomicBoolean(false);
        int initPartitions = 2;
        int totalPartitions = 4;
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
                notified.set(true);
            }
        });
        Assert.assertTrue((boolean)this.client.createTopic("foo", initPartitions, 1, new Properties()));
        ReplicatorAdminClientWithZkTest.waitUntilTrue("Failed to get notification of topic creation", 12000L, new Predicate<Void>(){

            public boolean apply(Void aVoid) {
                return notified.get();
            }
        });
        TopicMetadata topicMetadata = this.client.topicMetadata("foo");
        Assert.assertNotNull((Object)topicMetadata);
        this.client.addPartitions("foo", totalPartitions);
        topicMetadata = this.client.topicMetadata("foo");
        Assert.assertNotNull((Object)topicMetadata);
        Assert.assertEquals((long)totalPartitions, (long)topicMetadata.numPartitions());
    }

    @Test(expected=AdminOperationException.class)
    public void testAddPartitionsOnNonExistingTopic() throws Exception {
        int initPartitions = 2;
        int totalPartitions = 4;
        Assert.assertTrue((boolean)this.client.createTopic("foo", initPartitions, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
            }
        });
        TopicMetadata topicMetadata = this.client.topicMetadata("foo");
        Assert.assertNotNull((Object)topicMetadata);
        this.client.addPartitions("bar", totalPartitions);
    }

    @Test
    public void testCreatedTopicNotification() throws Exception {
        final AtomicBoolean notified = new AtomicBoolean(false);
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
                notified.set(true);
            }
        });
        Assert.assertTrue((boolean)this.client.createTopic("foo", 1, 1, new Properties()));
        ReplicatorAdminClientWithZkTest.waitUntilTrue("Failed to get notification of topic creation", 5000L, new Predicate<Void>(){

            public boolean apply(Void aVoid) {
                return notified.get();
            }
        });
    }

    @Test
    public void testNoNotificationForUninterestedTopicCreation() throws Exception {
        final AtomicBoolean notified = new AtomicBoolean(false);
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener(){

            public void onTopicMetadataRefresh() {
                notified.set(true);
            }
        });
        Assert.assertTrue((boolean)this.client.createTopic("bar", 1, 1, new Properties()));
        Thread.sleep(1000L);
        Assert.assertFalse((boolean)notified.get());
    }

    private static void waitUntilTrue(String message, long timeout, Predicate<Void> predicate) throws InterruptedException {
        long deadline = System.currentTimeMillis() + timeout;
        while (System.currentTimeMillis() < deadline) {
            if (predicate.apply(null)) {
                return;
            }
            Thread.sleep(50L);
        }
        Assert.fail((String)message);
    }
}

