package io.confluent.connect.replicator.util;

import com.google.common.base.Predicate;
import com.google.common.io.Files;
import io.confluent.connect.replicator.util.ReplicatorAdminClient;
import java.io.File;
import java.util.Collections;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import kafka.admin.AdminOperationException;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServerStartable;
import kafka.utils.ZkUtils;
import org.apache.curator.test.TestingServer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.SystemTime;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:io/confluent/connect/replicator/util/ReplicatorAdminClientWithZkTest.class */
public class ReplicatorAdminClientWithZkTest {
    private File kafkaLogDir;
    private TestingServer zkServer;
    private KafkaServerStartable kafkaServer;
    private ReplicatorAdminClient client;

    @Before
    public void startup() throws Exception {
        this.zkServer = new TestingServer();
        this.kafkaLogDir = Files.createTempDir();
        this.kafkaLogDir.deleteOnExit();
        Properties properties = new Properties();
        properties.put(KafkaConfig.ZkConnectProp(), this.zkServer.getConnectString());
        properties.put(KafkaConfig.ListenersProp(), "PLAINTEXT://localhost:0");
        properties.put(KafkaConfig.LogDirProp(), this.kafkaLogDir.getAbsolutePath());
        this.kafkaServer = new KafkaServerStartable(KafkaConfig.fromProps(properties));
        this.kafkaServer.startup();
        this.client = new ReplicatorAdminClientWithZk(ZkUtils.apply(this.zkServer.getConnectString(), 6000, 10000, false), new SystemTime(), 60000L);
    }

    @After
    public void shutdown() throws Exception {
        if (this.client != null) {
            this.client.close();
        }
        if (this.kafkaServer != null) {
            this.kafkaServer.shutdown();
            this.kafkaServer.awaitShutdown();
        }
        if (this.zkServer != null) {
            this.zkServer.close();
        }
    }

    @Test
    public void testClusterId() throws Exception {
        Assert.assertNotNull(this.client.clusterId());
    }

    @Test
    public void testQueryBrokerMetadata() throws Exception {
        Assert.assertEquals(1L, this.client.aliveBrokers());
    }

    @Test
    public void testQueryNonExistingTopic() {
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.1
            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertNull(this.client.topicMetadata("foo"));
    }

    @Test
    public void testQueryNonExistingPartition() throws Exception {
        Assert.assertTrue(this.client.createTopic("foo", 2, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.2
            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertFalse(this.client.partitionExists(new TopicPartition("foo", 4)));
    }

    @Test
    public void testCreateTopicAndPartitionAndTopicExists() throws Exception {
        Assert.assertTrue(this.client.createTopic("foo", 4, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.3
            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertNotNull(this.client.topicMetadata("foo"));
        Assert.assertEquals(4, r0.numPartitions());
        Assert.assertTrue(this.client.partitionExists(new TopicPartition("foo", 3)));
        Assert.assertTrue(this.client.topicExists("foo"));
    }

    @Test
    public void testCreateAlreadyExistingTopic() throws Exception {
        Assert.assertTrue(this.client.createTopic("foo", 4, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.4
            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertFalse(this.client.createTopic("foo", 4, 1, new Properties()));
    }

    @Test
    public void testAddPartitionsOnExistingTopic() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.5
            public void onTopicMetadataRefresh() {
                atomicBoolean.set(true);
            }
        });
        Assert.assertTrue(this.client.createTopic("foo", 2, 1, new Properties()));
        waitUntilTrue("Failed to get notification of topic creation", 12000L, new Predicate<Void>() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.6
            public boolean apply(Void r3) {
                return atomicBoolean.get();
            }
        });
        Assert.assertNotNull(this.client.topicMetadata("foo"));
        this.client.addPartitions("foo", 4);
        Assert.assertNotNull(this.client.topicMetadata("foo"));
        Assert.assertEquals(4, r0.numPartitions());
    }

    @Test(expected = AdminOperationException.class)
    public void testAddPartitionsOnNonExistingTopic() throws Exception {
        Assert.assertTrue(this.client.createTopic("foo", 2, 1, new Properties()));
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.7
            public void onTopicMetadataRefresh() {
            }
        });
        Assert.assertNotNull(this.client.topicMetadata("foo"));
        this.client.addPartitions("bar", 4);
    }

    @Test
    public void testCreatedTopicNotification() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.8
            public void onTopicMetadataRefresh() {
                atomicBoolean.set(true);
            }
        });
        Assert.assertTrue(this.client.createTopic("foo", 1, 1, new Properties()));
        waitUntilTrue("Failed to get notification of topic creation", 5000L, new Predicate<Void>() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.9
            public boolean apply(Void r3) {
                return atomicBoolean.get();
            }
        });
    }

    @Test
    public void testNoNotificationForUninterestedTopicCreation() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.client.setInterestedTopics(Collections.singleton("foo"), new ReplicatorAdminClient.TopicMetadataListener() { // from class: io.confluent.connect.replicator.util.ReplicatorAdminClientWithZkTest.10
            public void onTopicMetadataRefresh() {
                atomicBoolean.set(true);
            }
        });
        Assert.assertTrue(this.client.createTopic("bar", 1, 1, new Properties()));
        Thread.sleep(1000L);
        Assert.assertFalse(atomicBoolean.get());
    }

    private static void waitUntilTrue(String str, long j, Predicate<Void> predicate) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (System.currentTimeMillis() < currentTimeMillis) {
            if (predicate.apply((Object) null)) {
                return;
            } else {
                Thread.sleep(50L);
            }
        }
        Assert.fail(str);
    }
}
