package org.apache.kafka.streams.kstream.internals;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.TreeMap;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TestInputTopic;
import org.apache.kafka.streams.TestOutputTopic;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.errors.TopologyException;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;
import org.apache.kafka.streams.test.TestRecord;
import org.apache.kafka.test.StreamsTestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

@RunWith(MockitoJUnitRunner.StrictStubs.class)
/* loaded from: input_file:org/apache/kafka/streams/kstream/internals/KStreamRepartitionTest.class */
public class KStreamRepartitionTest {
    private final String inputTopic = "input-topic";
    private final Properties props = StreamsTestUtils.getStreamsConfig((Serde<?>) Serdes.Integer(), (Serde<?>) Serdes.String());
    private StreamsBuilder builder;

    @Before
    public void setUp() {
        this.builder = new StreamsBuilder();
    }

    @Test
    public void shouldInvokePartitionerWhenSet() {
        int[] iArr = {0, 1};
        StreamPartitioner streamPartitioner = (StreamPartitioner) Mockito.mock(StreamPartitioner.class);
        Mockito.when(streamPartitioner.partitions(ArgumentMatchers.anyString(), Integer.valueOf(ArgumentMatchers.eq(0)), ArgumentMatchers.eq("X0"), ArgumentMatchers.anyInt())).thenReturn(Optional.of(Collections.singleton(1)));
        Mockito.when(streamPartitioner.partitions(ArgumentMatchers.anyString(), Integer.valueOf(ArgumentMatchers.eq(1)), ArgumentMatchers.eq("X1"), ArgumentMatchers.anyInt())).thenReturn(Optional.of(Collections.singleton(1)));
        this.builder.stream("input-topic").repartition(Repartitioned.streamPartitioner(streamPartitioner).withName("test"));
        TopologyTestDriver topologyTestDriver = new TopologyTestDriver(this.builder.build(), this.props);
        Throwable th = null;
        try {
            try {
                TestInputTopic createInputTopic = topologyTestDriver.createInputTopic("input-topic", new IntegerSerializer(), new StringSerializer());
                TestOutputTopic createOutputTopic = topologyTestDriver.createOutputTopic(repartitionOutputTopic(this.props, "test"), new IntegerDeserializer(), new StringDeserializer());
                for (int i = 0; i < 2; i++) {
                    createInputTopic.pipeInput(Integer.valueOf(iArr[i]), "X" + iArr[i], i + 10);
                }
                MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord(0, "X0", Instant.ofEpochMilli(10L))));
                MatcherAssert.assertThat(createOutputTopic.readRecord(), CoreMatchers.equalTo(new TestRecord(1, "X1", Instant.ofEpochMilli(11L))));
                Assert.assertTrue(createOutputTopic.readRecordsToList().isEmpty());
                if (topologyTestDriver != null) {
                    if (0 != 0) {
                        try {
                            topologyTestDriver.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        topologyTestDriver.close();
                    }
                }
                ((StreamPartitioner) Mockito.verify(streamPartitioner)).partitions(ArgumentMatchers.anyString(), Integer.valueOf(ArgumentMatchers.eq(0)), ArgumentMatchers.eq("X0"), ArgumentMatchers.anyInt());
                ((StreamPartitioner) Mockito.verify(streamPartitioner)).partitions(ArgumentMatchers.anyString(), Integer.valueOf(ArgumentMatchers.eq(1)), ArgumentMatchers.eq("X1"), ArgumentMatchers.anyInt());
            } finally {
            }
        } catch (Throwable th3) {
            if (topologyTestDriver != null) {
                if (th != null) {
                    try {
                        topologyTestDriver.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    topologyTestDriver.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationsDoNotMatchWhenJoining() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input-topic", Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("input-topic-scale-up").withNumberOfPartitions(4)).join(streamsBuilder.stream("topic-b", Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("topic-b-scale-up").withNumberOfPartitions(2)), (str, str2) -> {
            return str2;
        }, JoinWindows.of(Duration.ofSeconds(10L))).to("topic-output");
        Map mkMap = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(toRepartitionTopicName("topic-b-scale-up"), 2), Utils.mkEntry(toRepartitionTopicName("input-topic-scale-up"), 4)});
        TopologyException assertThrows = Assert.assertThrows(TopologyException.class, () -> {
            streamsBuilder.build(this.props);
        });
        String format = String.format("Following topics do not have the same number of partitions: [%s]", new TreeMap(mkMap));
        Assert.assertNotNull(assertThrows);
        Assert.assertTrue(assertThrows.getMessage().contains(format));
    }

    private String toRepartitionTopicName(String str) {
        return str + "-repartition";
    }

    private String repartitionOutputTopic(Properties properties, String str) {
        return properties.getProperty("application.id") + "-" + str + "-repartition";
    }
}
