package org.apache.kafka.streams.integration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StateDirectory;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.MockStateStoreSupplier;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/RegexSourceIntegrationTest.class */
public class RegexSourceIntegrationTest {
    private final MockTime mockTime = CLUSTER.time;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String FA_TOPIC = "fa";
    private static final String FOO_TOPIC = "foo";
    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private Properties streamsConfiguration;
    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();

    /* loaded from: input_file:org/apache/kafka/streams/integration/RegexSourceIntegrationTest$TestStreamThread.class */
    private class TestStreamThread extends StreamThread {
        public volatile List<String> assignedTopicPartitions;

        public TestStreamThread(TopologyBuilder topologyBuilder, StreamsConfig streamsConfig, KafkaClientSupplier kafkaClientSupplier, String str, String str2, UUID uuid, Metrics metrics, Time time) {
            super(topologyBuilder, streamsConfig, kafkaClientSupplier, str, str2, uuid, metrics, time, new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0L, new StateDirectory(str, streamsConfig.getString("state.dir"), time));
            this.assignedTopicPartitions = new ArrayList();
        }

        public StreamTask createStreamTask(TaskId taskId, Collection<TopicPartition> collection) {
            ArrayList arrayList = new ArrayList();
            Iterator<TopicPartition> it = collection.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next().topic());
            }
            Collections.sort(arrayList);
            this.assignedTopicPartitions = arrayList;
            return super.createStreamTask(taskId, collection);
        }
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopics(TOPIC_1, TOPIC_2, TOPIC_A, TOPIC_C, TOPIC_Y, TOPIC_Z, FA_TOPIC, FOO_TOPIC, DEFAULT_OUTPUT_TOPIC);
        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, NUM_BROKERS);
        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, NUM_BROKERS);
    }

    @Before
    public void setUp() {
        Properties properties = new Properties();
        properties.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true);
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig("regex-source-integration-test", CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME, properties);
    }

    @After
    public void tearDown() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
        Serde String = Serdes.String();
        final List asList = Arrays.asList("TEST-TOPIC-1");
        final List asList2 = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
        StreamsConfig streamsConfig = new StreamsConfig(this.streamsConfiguration);
        CLUSTER.createTopic("TEST-TOPIC-1");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(Pattern.compile("TEST-TOPIC-\\d")).to(String, String, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        Field declaredField = kafkaStreams.getClass().getDeclaredField("threads");
        declaredField.setAccessible(true);
        StreamThread[] streamThreadArr = (StreamThread[]) declaredField.get(kafkaStreams);
        StreamThread streamThread = streamThreadArr[0];
        final TestStreamThread testStreamThread = new TestStreamThread(kStreamBuilder, streamsConfig, new DefaultKafkaClientSupplier(), streamThread.applicationId, streamThread.clientId, streamThread.processId, new Metrics(), Time.SYSTEM);
        TestCondition testCondition = new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.1
            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(asList);
            }
        };
        streamThreadArr[0] = testStreamThread;
        kafkaStreams.start();
        TestUtils.waitForCondition(testCondition, STREAM_TASKS_NOT_UPDATED);
        CLUSTER.createTopic("TEST-TOPIC-2");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.2
            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(asList2);
            }
        }, STREAM_TASKS_NOT_UPDATED);
        kafkaStreams.close();
    }

    @Test
    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
        Serde String = Serdes.String();
        final List asList = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
        final List asList2 = Arrays.asList("TEST-TOPIC-B");
        StreamsConfig streamsConfig = new StreamsConfig(this.streamsConfiguration);
        CLUSTER.createTopics("TEST-TOPIC-A", "TEST-TOPIC-B");
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(Pattern.compile("TEST-TOPIC-[A-Z]")).to(String, String, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        Field declaredField = kafkaStreams.getClass().getDeclaredField("threads");
        declaredField.setAccessible(true);
        StreamThread[] streamThreadArr = (StreamThread[]) declaredField.get(kafkaStreams);
        StreamThread streamThread = streamThreadArr[0];
        final TestStreamThread testStreamThread = new TestStreamThread(kStreamBuilder, streamsConfig, new DefaultKafkaClientSupplier(), streamThread.applicationId, streamThread.clientId, streamThread.processId, new Metrics(), Time.SYSTEM);
        streamThreadArr[0] = testStreamThread;
        TestCondition testCondition = new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.3
            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(asList);
            }
        };
        kafkaStreams.start();
        TestUtils.waitForCondition(testCondition, STREAM_TASKS_NOT_UPDATED);
        CLUSTER.deleteTopic("TEST-TOPIC-A");
        TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.4
            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(asList2);
            }
        }, STREAM_TASKS_NOT_UPDATED);
        kafkaStreams.close();
    }

    @Test
    public void shouldAddStateStoreToRegexDefinedSource() throws Exception {
        MockProcessorSupplier mockProcessorSupplier = new MockProcessorSupplier();
        final TopologyBuilder addStateStore = new TopologyBuilder().addSource("ingest", Pattern.compile("topic-\\d+")).addProcessor("my-processor", mockProcessorSupplier, new String[]{"ingest"}).addStateStore(new MockStateStoreSupplier("testStateStore", false), new String[]{"my-processor"});
        KafkaStreams kafkaStreams = new KafkaStreams(addStateStore, this.streamsConfiguration);
        try {
            kafkaStreams.start();
            TestUtils.waitForCondition(new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.5
                public boolean conditionMet() {
                    List list = (List) addStateStore.stateStoreNameToSourceTopics().get("testStateStore");
                    return (list == null || list.isEmpty() || !((String) list.get(0)).equals(RegexSourceIntegrationTest.TOPIC_1)) ? false : true;
                }
            }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Did not find topic: [topic-1] connected to state store: [testStateStore]");
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }

    @Test
    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
        Serde String = Serdes.String();
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(Pattern.compile("topic-\\d"));
        KStream stream2 = kStreamBuilder.stream(Pattern.compile("topic-[A-D]"));
        KStream stream3 = kStreamBuilder.stream(new String[]{TOPIC_Y, TOPIC_Z});
        stream.to(String, String, DEFAULT_OUTPUT_TOPIC);
        stream2.to(String, String, DEFAULT_OUTPUT_TOPIC);
        stream3.to(String, String, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        kafkaStreams.start();
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("topic-1 test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList("topic-2 test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList("topic-A test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList("topic-C test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList("topic-Y test"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList("topic-Z test"), producerConfig, this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        List asList = Arrays.asList("topic-A test", "topic-1 test", "topic-2 test", "topic-C test", "topic-Y test", "topic-Z test");
        List waitUntilMinKeyValueRecordsReceived = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
        ArrayList arrayList = new ArrayList(6);
        Iterator it = waitUntilMinKeyValueRecordsReceived.iterator();
        while (it.hasNext()) {
            arrayList.add(((KeyValue) it.next()).value);
        }
        kafkaStreams.close();
        Collections.sort(arrayList);
        Collections.sort(asList);
        Assert.assertThat(arrayList, CoreMatchers.equalTo(asList));
    }

    @Test
    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
        Serde String = Serdes.String();
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStreamBuilder kStreamBuilder2 = new KStreamBuilder();
        final List asList = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
        KStream stream = kStreamBuilder.stream(Pattern.compile("partitioned-\\d"));
        KStream stream2 = kStreamBuilder2.stream(Pattern.compile("partitioned-\\d"));
        stream.to(String, String, DEFAULT_OUTPUT_TOPIC);
        stream2.to(String, String, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        KafkaStreams kafkaStreams2 = new KafkaStreams(kStreamBuilder2, this.streamsConfiguration);
        StreamsConfig streamsConfig = new StreamsConfig(this.streamsConfiguration);
        Field declaredField = kafkaStreams.getClass().getDeclaredField("threads");
        declaredField.setAccessible(true);
        StreamThread[] streamThreadArr = (StreamThread[]) declaredField.get(kafkaStreams);
        StreamThread streamThread = streamThreadArr[0];
        final TestStreamThread testStreamThread = new TestStreamThread(kStreamBuilder, streamsConfig, new DefaultKafkaClientSupplier(), streamThread.applicationId, streamThread.clientId, streamThread.processId, new Metrics(), Time.SYSTEM);
        streamThreadArr[0] = testStreamThread;
        TestCondition testCondition = new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.6
            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(asList);
            }
        };
        Field declaredField2 = kafkaStreams2.getClass().getDeclaredField("threads");
        declaredField2.setAccessible(true);
        StreamThread[] streamThreadArr2 = (StreamThread[]) declaredField2.get(kafkaStreams2);
        StreamThread streamThread2 = streamThreadArr2[0];
        final TestStreamThread testStreamThread2 = new TestStreamThread(kStreamBuilder2, streamsConfig, new DefaultKafkaClientSupplier(), streamThread2.applicationId, streamThread2.clientId, streamThread2.processId, new Metrics(), Time.SYSTEM);
        streamThreadArr2[0] = testStreamThread2;
        TestCondition testCondition2 = new TestCondition() { // from class: org.apache.kafka.streams.integration.RegexSourceIntegrationTest.7
            public boolean conditionMet() {
                return testStreamThread2.assignedTopicPartitions.equals(asList);
            }
        };
        kafkaStreams.start();
        TestUtils.waitForCondition(testCondition, "Topics never assigned to leader stream");
        kafkaStreams2.start();
        TestUtils.waitForCondition(testCondition2, "Topics never assigned to follower stream");
        kafkaStreams.close();
        kafkaStreams2.close();
    }

    @Test(expected = AssertionError.class)
    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
        Serde String = Serdes.String();
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        KStream stream = kStreamBuilder.stream(Pattern.compile("foo.*"));
        KStream stream2 = kStreamBuilder.stream(Pattern.compile("f.*"));
        stream.to(String, String, DEFAULT_OUTPUT_TOPIC);
        stream2.to(String, String, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        kafkaStreams.start();
        Properties producerConfig = TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList("fMessage"), producerConfig, this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList("fooMessage"), producerConfig, this.mockTime);
        try {
            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class), DEFAULT_OUTPUT_TOPIC, 2, 5000L);
            Assert.fail("Should not get here");
            kafkaStreams.close();
        } catch (Throwable th) {
            kafkaStreams.close();
            throw th;
        }
    }
}
