/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.integration;

import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.regex.Pattern;
import kafka.utils.MockTime;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.KafkaClientSupplier;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.processor.TaskId;
import org.apache.kafka.streams.processor.TopologyBuilder;
import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier;
import org.apache.kafka.streams.processor.internals.StreamTask;
import org.apache.kafka.streams.processor.internals.StreamThread;
import org.apache.kafka.streams.processor.internals.StreamsMetadataState;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;

public class RegexSourceIntegrationTest {
    private static final int NUM_BROKERS = 1;
    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);
    private final MockTime mockTime;
    private static final String TOPIC_1 = "topic-1";
    private static final String TOPIC_2 = "topic-2";
    private static final String TOPIC_A = "topic-A";
    private static final String TOPIC_C = "topic-C";
    private static final String TOPIC_Y = "topic-Y";
    private static final String TOPIC_Z = "topic-Z";
    private static final String FA_TOPIC = "fa";
    private static final String FOO_TOPIC = "foo";
    private static final String PARTITIONED_TOPIC_1 = "partitioned-1";
    private static final String PARTITIONED_TOPIC_2 = "partitioned-2";
    private static final String DEFAULT_OUTPUT_TOPIC = "outputTopic";
    private static final String STRING_SERDE_CLASSNAME = Serdes.String().getClass().getName();
    private Properties streamsConfiguration;
    private static final String STREAM_TASKS_NOT_UPDATED = "Stream tasks not updated";

    public RegexSourceIntegrationTest() {
        this.mockTime = RegexSourceIntegrationTest.CLUSTER.time;
    }

    @BeforeClass
    public static void startKafkaCluster() throws Exception {
        CLUSTER.createTopic(TOPIC_1);
        CLUSTER.createTopic(TOPIC_2);
        CLUSTER.createTopic(TOPIC_A);
        CLUSTER.createTopic(TOPIC_C);
        CLUSTER.createTopic(TOPIC_Y);
        CLUSTER.createTopic(TOPIC_Z);
        CLUSTER.createTopic(FA_TOPIC);
        CLUSTER.createTopic(FOO_TOPIC);
        CLUSTER.createTopic(PARTITIONED_TOPIC_1, 2, 1);
        CLUSTER.createTopic(PARTITIONED_TOPIC_2, 2, 1);
        CLUSTER.createTopic(DEFAULT_OUTPUT_TOPIC);
    }

    @Before
    public void setUp() {
        this.streamsConfiguration = StreamsTestUtils.getStreamsConfig(CLUSTER.bootstrapServers(), STRING_SERDE_CLASSNAME, STRING_SERDE_CLASSNAME);
    }

    @After
    public void tearDown() throws Exception {
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void testRegexMatchesTopicsAWhenCreated() throws Exception {
        Serde stringSerde = Serdes.String();
        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-1");
        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
        StreamsConfig streamsConfig = new StreamsConfig((Map)this.streamsConfiguration);
        CLUSTER.createTopic("TEST-TOPIC-1");
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        Field streamThreadsField = streams.getClass().getDeclaredField("threads");
        streamThreadsField.setAccessible(true);
        StreamThread[] streamThreads = (StreamThread[])streamThreadsField.get(streams);
        StreamThread originalThread = streamThreads[0];
        final TestStreamThread testStreamThread = new TestStreamThread((TopologyBuilder)builder, streamsConfig, (KafkaClientSupplier)new DefaultKafkaClientSupplier(), originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), (Time)new SystemTime());
        TestCondition oneTopicAdded = new TestCondition(){

            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
            }
        };
        streamThreads[0] = testStreamThread;
        streams.start();
        TestUtils.waitForCondition((TestCondition)oneTopicAdded, (String)STREAM_TASKS_NOT_UPDATED);
        CLUSTER.createTopic("TEST-TOPIC-2");
        TestCondition secondTopicAdded = new TestCondition(){

            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
            }
        };
        TestUtils.waitForCondition((TestCondition)secondTopicAdded, (String)STREAM_TASKS_NOT_UPDATED);
        streams.close();
    }

    @Test
    public void testRegexMatchesTopicsAWhenDeleted() throws Exception {
        Serde stringSerde = Serdes.String();
        final List<String> expectedFirstAssignment = Arrays.asList("TEST-TOPIC-A", "TEST-TOPIC-B");
        final List<String> expectedSecondAssignment = Arrays.asList("TEST-TOPIC-B");
        StreamsConfig streamsConfig = new StreamsConfig((Map)this.streamsConfiguration);
        CLUSTER.createTopic("TEST-TOPIC-A");
        CLUSTER.createTopic("TEST-TOPIC-B");
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-[A-Z]"));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        Field streamThreadsField = streams.getClass().getDeclaredField("threads");
        streamThreadsField.setAccessible(true);
        StreamThread[] streamThreads = (StreamThread[])streamThreadsField.get(streams);
        StreamThread originalThread = streamThreads[0];
        final TestStreamThread testStreamThread = new TestStreamThread((TopologyBuilder)builder, streamsConfig, (KafkaClientSupplier)new DefaultKafkaClientSupplier(), originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), (Time)new SystemTime());
        streamThreads[0] = testStreamThread;
        TestCondition bothTopicsAdded = new TestCondition(){

            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(expectedFirstAssignment);
            }
        };
        streams.start();
        TestUtils.waitForCondition((TestCondition)bothTopicsAdded, (String)STREAM_TASKS_NOT_UPDATED);
        CLUSTER.deleteTopic("TEST-TOPIC-A");
        TestCondition oneTopicRemoved = new TestCondition(){

            public boolean conditionMet() {
                return testStreamThread.assignedTopicPartitions.equals(expectedSecondAssignment);
            }
        };
        TestUtils.waitForCondition((TestCondition)oneTopicRemoved, (String)STREAM_TASKS_NOT_UPDATED);
        streams.close();
    }

    @Test
    public void testShouldReadFromRegexAndNamedTopics() throws Exception {
        String topic1TestMessage = "topic-1 test";
        String topic2TestMessage = "topic-2 test";
        String topicATestMessage = "topic-A test";
        String topicCTestMessage = "topic-C test";
        String topicYTestMessage = "topic-Y test";
        String topicZTestMessage = "topic-Z test";
        Serde stringSerde = Serdes.String();
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("topic-\\d"));
        KStream pattern2Stream = builder.stream(Pattern.compile("topic-[A-D]"));
        KStream namedTopicsStream = builder.stream(new String[]{TOPIC_Y, TOPIC_Z});
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        namedTopicsStream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_1, Arrays.asList("topic-1 test"), producerConfig, (kafka.utils.Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_2, Arrays.asList("topic-2 test"), producerConfig, (kafka.utils.Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_A, Arrays.asList("topic-A test"), producerConfig, (kafka.utils.Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_C, Arrays.asList("topic-C test"), producerConfig, (kafka.utils.Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Y, Arrays.asList("topic-Y test"), producerConfig, (kafka.utils.Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(TOPIC_Z, Arrays.asList("topic-Z test"), producerConfig, (kafka.utils.Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        List<String> expectedReceivedValues = Arrays.asList("topic-A test", "topic-1 test", "topic-2 test", "topic-C test", "topic-Y test", "topic-Z test");
        List receivedKeyValues = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 6);
        ArrayList<Object> actualValues = new ArrayList<Object>(6);
        for (KeyValue receivedKeyValue : receivedKeyValues) {
            actualValues.add(receivedKeyValue.value);
        }
        streams.close();
        Collections.sort(actualValues);
        Collections.sort(expectedReceivedValues);
        Assert.assertThat(actualValues, (Matcher)CoreMatchers.equalTo(expectedReceivedValues));
    }

    @Test
    public void testMultipleConsumersCanReadFromPartitionedTopic() throws Exception {
        Serde stringSerde = Serdes.String();
        KStreamBuilder builderLeader = new KStreamBuilder();
        KStreamBuilder builderFollower = new KStreamBuilder();
        final List<String> expectedAssignment = Arrays.asList(PARTITIONED_TOPIC_1, PARTITIONED_TOPIC_2);
        KStream partitionedStreamLeader = builderLeader.stream(Pattern.compile("partitioned-\\d"));
        KStream partitionedStreamFollower = builderFollower.stream(Pattern.compile("partitioned-\\d"));
        partitionedStreamLeader.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        partitionedStreamFollower.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams partitionedStreamsLeader = new KafkaStreams((TopologyBuilder)builderLeader, this.streamsConfiguration);
        KafkaStreams partitionedStreamsFollower = new KafkaStreams((TopologyBuilder)builderFollower, this.streamsConfiguration);
        StreamsConfig streamsConfig = new StreamsConfig((Map)this.streamsConfiguration);
        Field leaderStreamThreadsField = partitionedStreamsLeader.getClass().getDeclaredField("threads");
        leaderStreamThreadsField.setAccessible(true);
        StreamThread[] leaderStreamThreads = (StreamThread[])leaderStreamThreadsField.get(partitionedStreamsLeader);
        StreamThread originalLeaderThread = leaderStreamThreads[0];
        final TestStreamThread leaderTestStreamThread = new TestStreamThread((TopologyBuilder)builderLeader, streamsConfig, (KafkaClientSupplier)new DefaultKafkaClientSupplier(), originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), (Time)new SystemTime());
        leaderStreamThreads[0] = leaderTestStreamThread;
        TestCondition bothTopicsAddedToLeader = new TestCondition(){

            public boolean conditionMet() {
                return leaderTestStreamThread.assignedTopicPartitions.equals(expectedAssignment);
            }
        };
        Field followerStreamThreadsField = partitionedStreamsFollower.getClass().getDeclaredField("threads");
        followerStreamThreadsField.setAccessible(true);
        StreamThread[] followerStreamThreads = (StreamThread[])followerStreamThreadsField.get(partitionedStreamsFollower);
        StreamThread originalFollowerThread = followerStreamThreads[0];
        final TestStreamThread followerTestStreamThread = new TestStreamThread((TopologyBuilder)builderFollower, streamsConfig, (KafkaClientSupplier)new DefaultKafkaClientSupplier(), originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), (Time)new SystemTime());
        followerStreamThreads[0] = followerTestStreamThread;
        TestCondition bothTopicsAddedToFollower = new TestCondition(){

            public boolean conditionMet() {
                return followerTestStreamThread.assignedTopicPartitions.equals(expectedAssignment);
            }
        };
        partitionedStreamsLeader.start();
        TestUtils.waitForCondition((TestCondition)bothTopicsAddedToLeader, (String)"Topics never assigned to leader stream");
        partitionedStreamsFollower.start();
        TestUtils.waitForCondition((TestCondition)bothTopicsAddedToFollower, (String)"Topics never assigned to follower stream");
        partitionedStreamsLeader.close();
        partitionedStreamsFollower.close();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test(expected=AssertionError.class)
    public void testNoMessagesSentExceptionFromOverlappingPatterns() throws Exception {
        String fooMessage = "fooMessage";
        String fMessage = "fMessage";
        Serde stringSerde = Serdes.String();
        KStreamBuilder builder = new KStreamBuilder();
        KStream pattern1Stream = builder.stream(Pattern.compile("foo.*"));
        KStream pattern2Stream = builder.stream(Pattern.compile("f.*"));
        pattern1Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        pattern2Stream.to(stringSerde, stringSerde, DEFAULT_OUTPUT_TOPIC);
        KafkaStreams streams = new KafkaStreams((TopologyBuilder)builder, this.streamsConfiguration);
        streams.start();
        Properties producerConfig = TestUtils.producerConfig((String)CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class);
        IntegrationTestUtils.produceValuesSynchronously(FA_TOPIC, Arrays.asList("fMessage"), producerConfig, (kafka.utils.Time)this.mockTime);
        IntegrationTestUtils.produceValuesSynchronously(FOO_TOPIC, Arrays.asList("fooMessage"), producerConfig, (kafka.utils.Time)this.mockTime);
        Properties consumerConfig = TestUtils.consumerConfig((String)CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class);
        try {
            IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(consumerConfig, DEFAULT_OUTPUT_TOPIC, 2, 5000L);
            Assert.fail((String)"Should not get here");
        }
        finally {
            streams.close();
        }
    }

    private class TestStreamThread
    extends StreamThread {
        public volatile List<String> assignedTopicPartitions;

        public TestStreamThread(TopologyBuilder builder, StreamsConfig config, KafkaClientSupplier clientSupplier, String applicationId, String clientId, UUID processId, Metrics metrics, Time time) {
            super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder));
            this.assignedTopicPartitions = new ArrayList<String>();
        }

        public StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitions) {
            ArrayList<String> topicPartitions = new ArrayList<String>();
            for (TopicPartition partition : partitions) {
                topicPartitions.add(partition.topic());
            }
            Collections.sort(topicPartitions);
            this.assignedTopicPartitions = topicPartitions;
            return super.createStreamTask(id, partitions);
        }
    }
}

