package org.apache.beam.it.kafka;

import com.google.common.truth.Truth;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.function.Consumer;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.admin.AdminClient;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Answers;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnit;
import org.mockito.junit.MockitoRule;
import org.testcontainers.containers.KafkaContainer;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/it/kafka/KafkaResourceManagerTest.class */
public final class KafkaResourceManagerTest {

    @Rule
    public final MockitoRule mockito = MockitoJUnit.rule();

    @Mock
    private KafkaContainer container;

    @Mock(answer = Answers.RETURNS_DEEP_STUBS)
    private AdminClient kafkaClient;
    private static final String TEST_ID = "test-id";
    private static final String HOST = "localhost";
    private static final String TOPIC_NAME = "topic-name";
    private static final int KAFKA_PORT = 9093;
    private static final int MAPPED_PORT = 10000;
    private KafkaResourceManager testManager;

    @Before
    public void setUp() throws IOException {
        ((KafkaContainer) Mockito.doReturn(this.container).when(this.container)).withLogConsumer((Consumer) ArgumentMatchers.any());
        this.testManager = new KafkaResourceManager(this.kafkaClient, this.container, KafkaResourceManager.builder(TEST_ID));
    }

    @Test
    public void testCreateResourceManagerBuilderReturnsKafkaResourceManager() throws IOException {
        Truth.assertThat(KafkaResourceManager.builder(TEST_ID).useStaticContainer().setHost(HOST).setPort(KAFKA_PORT).build()).isInstanceOf(KafkaResourceManager.class);
    }

    @Test
    public void testGetBootstrapServersShouldReturnCorrectValue() throws IOException {
        Mockito.when(this.container.getHost()).thenReturn(HOST);
        Mockito.when(this.container.getMappedPort(KAFKA_PORT)).thenReturn(Integer.valueOf(MAPPED_PORT));
        Truth.assertThat(new KafkaResourceManager(this.kafkaClient, this.container, KafkaResourceManager.builder(TEST_ID)).getBootstrapServers()).matches("PLAINTEXT://localhost:10000");
    }

    @Test
    public void testGetTopicNameShouldReturnCorrectValue() {
        Iterator it = this.testManager.getTopicNames().iterator();
        while (it.hasNext()) {
            Truth.assertThat((String) it.next()).matches("test-id-\\d-\\d{8}-\\d{6}-\\d{6}");
        }
    }

    @Test
    public void testSetTopicNamesAndSetNumTopicsExclusive() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            KafkaResourceManager.builder(TEST_ID).setTopicNames(ImmutableSet.of(TOPIC_NAME)).setNumTopics(1);
        });
    }

    @Test
    public void testCreateTopicZeroPartitionsThrowErrors() {
        Assert.assertThrows(IllegalArgumentException.class, () -> {
            this.testManager.createTopic(TOPIC_NAME, 0);
        });
    }

    @Test
    public void testCreateTopicShouldThrowErrorWhenKafkaFailsToListTopics() throws ExecutionException, InterruptedException {
        Mockito.when((Set) this.kafkaClient.listTopics().names().get()).thenThrow(new Throwable[]{new ExecutionException(new RuntimeException("list topic future fails"))});
        Assert.assertThrows(KafkaResourceManagerException.class, () -> {
            this.testManager.createTopic(TOPIC_NAME, 1);
        });
    }

    @Test
    public void testCreateTopicShouldThrowErrorWhenKafkaFailsToCreateTopic() throws ExecutionException, InterruptedException {
        Mockito.when((Void) this.kafkaClient.createTopics((Collection) ArgumentMatchers.any(Collection.class)).all().get()).thenThrow(new Throwable[]{new ExecutionException(new RuntimeException("create topic future fails"))});
        Assert.assertThrows(KafkaResourceManagerException.class, () -> {
            this.testManager.createTopic(TOPIC_NAME, 1);
        });
    }

    @Test
    public void testCreateTopicShouldReturnTopicIfTopicExists() throws ExecutionException, InterruptedException {
        Mockito.when((Set) this.kafkaClient.listTopics().names().get()).thenReturn(Collections.singleton(TOPIC_NAME));
        Assert.assertNotNull(this.testManager.createTopic(TOPIC_NAME, 1));
    }

    @Test
    public void testCreateTopicShouldWork() throws ExecutionException, InterruptedException {
        Mockito.when((Void) this.kafkaClient.createTopics(ArgumentMatchers.anyCollection()).all().get()).thenReturn((Object) null);
        Assert.assertNotNull(this.testManager.createTopic(TOPIC_NAME, 1));
    }

    @Test
    public void testCleanupAllShouldNotDropStaticTopic() throws IOException {
        new KafkaResourceManager(this.kafkaClient, this.container, KafkaResourceManager.builder(TEST_ID).setTopicNames(Collections.singleton(TOPIC_NAME))).cleanupAll();
        ((AdminClient) Mockito.verify(this.kafkaClient, Mockito.never())).deleteTopics(ArgumentMatchers.anyCollection());
    }

    @Test
    public void testCleanupShouldDropNonStaticTopic() throws IOException {
        new KafkaResourceManager(this.kafkaClient, this.container, KafkaResourceManager.builder(TEST_ID).setNumTopics(3)).cleanupAll();
        ((AdminClient) Mockito.verify(this.kafkaClient)).deleteTopics((Collection) ArgumentMatchers.argThat(collection -> {
            return collection.size() == 3;
        }));
    }

    @Test
    public void testCleanupAllShouldThrowErrorWhenKafkaClientFailsToDeleteTopic() throws ExecutionException, InterruptedException {
        Mockito.when((Void) this.kafkaClient.deleteTopics(ArgumentMatchers.anyCollection()).all().get()).thenThrow(new Throwable[]{new ExecutionException(new RuntimeException("delete topic future fails"))});
        Assert.assertThrows(KafkaResourceManagerException.class, () -> {
            this.testManager.cleanupAll();
        });
    }
}
