/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.StreamsTestUtils;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;

@RunWith(value=EasyMockRunner.class)
public class KStreamRepartitionTest {
    private final String inputTopic = "input-topic";
    private final Properties props = StreamsTestUtils.getStreamsConfig(Serdes.Integer(), Serdes.String());
    private StreamsBuilder builder;

    @Before
    public void setUp() {
        this.builder = new StreamsBuilder();
    }

    @Test
    public void shouldInvokePartitionerWhenSet() {
        int[] expectedKeys = new int[]{0, 1};
        StreamPartitioner streamPartitionerMock = (StreamPartitioner)EasyMock.mock(StreamPartitioner.class);
        EasyMock.expect((Object)streamPartitionerMock.partition(EasyMock.anyString(), (Object)EasyMock.eq((int)0), EasyMock.eq((Object)"X0"), EasyMock.anyInt())).andReturn((Object)1).times(1);
        EasyMock.expect((Object)streamPartitionerMock.partition(EasyMock.anyString(), (Object)EasyMock.eq((int)1), EasyMock.eq((Object)"X1"), EasyMock.anyInt())).andReturn((Object)1).times(1);
        EasyMock.replay((Object[])new Object[]{streamPartitionerMock});
        String repartitionOperationName = "test";
        Repartitioned repartitioned = Repartitioned.streamPartitioner((StreamPartitioner)streamPartitionerMock).withName("test");
        this.builder.stream("input-topic").repartition(repartitioned);
        try (TopologyTestDriver driver = new TopologyTestDriver(this.builder.build(), this.props);){
            TestInputTopic testInputTopic = driver.createInputTopic("input-topic", (Serializer)new IntegerSerializer(), (Serializer)new StringSerializer());
            String topicName = this.repartitionOutputTopic(this.props, "test");
            TestOutputTopic testOutputTopic = driver.createOutputTopic(topicName, (Deserializer)new IntegerDeserializer(), (Deserializer)new StringDeserializer());
            for (int i = 0; i < 2; ++i) {
                testInputTopic.pipeInput((Object)expectedKeys[i], (Object)("X" + expectedKeys[i]), (long)(i + 10));
            }
            MatcherAssert.assertThat((Object)testOutputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)0, (Object)"X0", Instant.ofEpochMilli(10L))));
            MatcherAssert.assertThat((Object)testOutputTopic.readRecord(), (Matcher)CoreMatchers.equalTo((Object)new TestRecord((Object)1, (Object)"X1", Instant.ofEpochMilli(11L))));
            Assert.assertTrue((boolean)testOutputTopic.readRecordsToList().isEmpty());
        }
        EasyMock.verify((Object[])new Object[]{streamPartitionerMock});
    }

    @Test
    public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationsDoNotMatchWhenJoining() {
        String topicB = "topic-b";
        String outputTopic = "topic-output";
        String topicBRepartitionedName = "topic-b-scale-up";
        String inputTopicRepartitionedName = "input-topic-scale-up";
        int topicBNumberOfPartitions = 2;
        int inputTopicNumberOfPartitions = 4;
        StreamsBuilder builder = new StreamsBuilder();
        Repartitioned inputTopicRepartitioned = Repartitioned.as((String)"input-topic-scale-up").withNumberOfPartitions(4);
        Repartitioned topicBRepartitioned = Repartitioned.as((String)"topic-b-scale-up").withNumberOfPartitions(2);
        KStream topicBStream = builder.stream("topic-b", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(topicBRepartitioned);
        builder.stream("input-topic", Consumed.with((Serde)Serdes.Integer(), (Serde)Serdes.String())).repartition(inputTopicRepartitioned).join(topicBStream, (value1, value2) -> value2, JoinWindows.of((Duration)Duration.ofSeconds(10L))).to("topic-output");
        Map repartitionTopicsWithNumOfPartitions = Utils.mkMap((Map.Entry[])new Map.Entry[]{Utils.mkEntry((Object)this.toRepartitionTopicName("topic-b-scale-up"), (Object)2), Utils.mkEntry((Object)this.toRepartitionTopicName("input-topic-scale-up"), (Object)4)});
        TopologyException expected = (TopologyException)Assert.assertThrows(TopologyException.class, () -> builder.build(this.props));
        String expectedErrorMessage = String.format("Following topics do not have the same number of partitions: [%s]", new TreeMap(repartitionTopicsWithNumOfPartitions));
        Assert.assertNotNull((Object)((Object)expected));
        Assert.assertTrue((boolean)expected.getMessage().contains(expectedErrorMessage));
    }

    private String toRepartitionTopicName(String input) {
        return input + "-repartition";
    }

    private String repartitionOutputTopic(Properties props, String repartitionOperationName) {
        return props.getProperty("application.id") + "-" + repartitionOperationName + "-repartition";
    }
}

