package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.lang.Thread;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.class */
public class KStreamRepartitionIntegrationTest {
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private String topicB;
    private String inputTopic;
    private String outputTopic;
    private String applicationId;
    private Properties streamsConfiguration;
    private List<KafkaStreams> kafkaStreamsInstances;

    @Parameterized.Parameter
    public String topologyOptimization;

    @Rule
    public TestName testName = new TestName();

    /* JADX WARN: Multi-variable type inference failed */
    @Parameterized.Parameters(name = "Optimization = {0}")
    public static Collection<?> topologyOptimization() {
        return Arrays.asList(new String[]{"all"}, new String[]{"none"});
    }

    @Before
    public void before() throws InterruptedException {
        this.streamsConfiguration = new Properties();
        this.kafkaStreamsInstances = new ArrayList();
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.topicB = "topic-b-" + safeUniqueTestName;
        this.inputTopic = "input-topic-" + safeUniqueTestName;
        this.outputTopic = "output-topic-" + safeUniqueTestName;
        this.applicationId = "app-" + safeUniqueTestName;
        CLUSTER.createTopic(this.inputTopic, 4, NUM_BROKERS);
        CLUSTER.createTopic(this.outputTopic, NUM_BROKERS, NUM_BROKERS);
        this.streamsConfiguration.put("application.id", this.applicationId);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.streamsConfiguration.put("default.key.serde", Serdes.Integer().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("topology.optimization", this.topologyOptimization);
    }

    @After
    public void whenShuttingDown() throws IOException {
        this.kafkaStreamsInstances.stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).forEach((v0) -> {
            v0.close();
        });
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldThrowAnExceptionWhenNumberOfPartitionsOfRepartitionOperationDoNotMatchSourceTopicWhenJoining() throws InterruptedException {
        AtomicReference atomicReference = new AtomicReference();
        CLUSTER.createTopic(this.topicB, 6, NUM_BROKERS);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("join-repartition-test").withNumberOfPartitions(2)).join(streamsBuilder.stream(this.topicB, Consumed.with(Serdes.Integer(), Serdes.String())), (str, str2) -> {
            return str2;
        }, JoinWindows.of(Duration.ofSeconds(10L))).to(this.outputTopic);
        streamsBuilder.build(this.streamsConfiguration);
        startStreams(streamsBuilder, KafkaStreams.State.REBALANCING, KafkaStreams.State.ERROR, (thread, th) -> {
            atomicReference.set(th);
        });
        String format = String.format("Number of partitions [%s] of repartition topic [%s] doesn't match number of partitions [%s] of the source topic.", 2, toRepartitionTopicName("join-repartition-test"), 6);
        Assert.assertNotNull(atomicReference.get());
        Assert.assertTrue(((Throwable) atomicReference.get()).getMessage().contains(format));
    }

    @Test
    public void shouldDeductNumberOfPartitionsFromRepartitionOperation() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        CLUSTER.createTopic(this.topicB, 6, NUM_BROKERS);
        List<KeyValue<Integer, String>> asList = Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B"));
        sendEvents(currentTimeMillis, asList);
        sendEvents(this.topicB, currentTimeMillis, asList);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("join-repartition-test").withNumberOfPartitions(3)).join(streamsBuilder.stream(this.topicB, Consumed.with(Serdes.Integer(), Serdes.String())).map((v1, v2) -> {
            return new KeyValue(v1, v2);
        }, Named.as("topic-b-mapper")), (str, str2) -> {
            return str2;
        }, JoinWindows.of(Duration.ofSeconds(10L))).to(this.outputTopic);
        streamsBuilder.build(this.streamsConfiguration);
        startStreams(streamsBuilder);
        Assert.assertEquals(3L, getNumberOfPartitionsForTopic(toRepartitionTopicName("join-repartition-test")));
        Assert.assertEquals(3L, getNumberOfPartitionsForTopic(toRepartitionTopicName("topic-b-mapper")));
        validateReceivedMessages(new IntegerDeserializer(), new StringDeserializer(), asList);
    }

    @Test
    public void shouldDoProperJoiningWhenNumberOfPartitionsAreValidWhenUsingRepartitionOperation() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        CLUSTER.createTopic(this.topicB, NUM_BROKERS, NUM_BROKERS);
        List<KeyValue<Integer, String>> asList = Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B"));
        sendEvents(currentTimeMillis, asList);
        sendEvents(this.topicB, currentTimeMillis, asList);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("input-topic-scale-up").withNumberOfPartitions(4)).join(streamsBuilder.stream(this.topicB, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("topic-b-scale-up").withNumberOfPartitions(4)), (str, str2) -> {
            return str2;
        }, JoinWindows.of(Duration.ofSeconds(10L))).to(this.outputTopic);
        startStreams(streamsBuilder);
        Assert.assertEquals(4L, getNumberOfPartitionsForTopic(toRepartitionTopicName("topic-b-scale-up")));
        Assert.assertEquals(4L, getNumberOfPartitionsForTopic(toRepartitionTopicName("input-topic-scale-up")));
        validateReceivedMessages(new IntegerDeserializer(), new StringDeserializer(), asList);
    }

    @Test
    public void shouldUseStreamPartitionerForRepartitionOperation() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        List<KeyValue<Integer, String>> asList = Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B"));
        sendEvents(currentTimeMillis, asList);
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("partitioner-test").withStreamPartitioner((str, num, str2, i) -> {
            atomicInteger.incrementAndGet();
            return Integer.valueOf(NUM_BROKERS);
        })).to(this.outputTopic);
        startStreams(streamsBuilder);
        String repartitionTopicName = toRepartitionTopicName("partitioner-test");
        validateReceivedMessages(new IntegerDeserializer(), new StringDeserializer(), asList);
        Assert.assertTrue(topicExists(repartitionTopicName));
        Assert.assertEquals(asList.size(), atomicInteger.get());
    }

    @Test
    public void shouldPerformSelectKeyWithRepartitionOperation() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "10"), new KeyValue(2, "20")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).selectKey((num, str) -> {
            return Integer.valueOf(str);
        }).repartition().to(this.outputTopic);
        startStreams(streamsBuilder);
        validateReceivedMessages(new IntegerDeserializer(), new StringDeserializer(), Arrays.asList(new KeyValue(10, "10"), new KeyValue(20, "20")));
        Assert.assertEquals(1L, countOccurrencesInTopology(streamsBuilder.build().describe().toString(), "Sink: .*-repartition.*"));
    }

    @Test
    public void shouldCreateRepartitionTopicIfKeyChangingOperationWasNotPerformed() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("dummy")).to(this.outputTopic);
        startStreams(streamsBuilder);
        validateReceivedMessages(new IntegerDeserializer(), new StringDeserializer(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        String obj = streamsBuilder.build().describe().toString();
        Assert.assertTrue(topicExists(toRepartitionTopicName("dummy")));
        Assert.assertEquals(1L, countOccurrencesInTopology(obj, "Sink: .*dummy-repartition.*"));
    }

    @Test
    public void shouldPerformKeySelectOperationWhenRepartitionOperationIsUsedWithKeySelector() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).selectKey((num, str) -> {
            return num.toString();
        }, Named.as("new-key")).repartition(Repartitioned.as("new-key").withKeySerde(Serdes.String())).groupByKey().count().toStream().to(this.outputTopic);
        startStreams(streamsBuilder);
        validateReceivedMessages(new StringDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValue("1", 1L), new KeyValue("2", 1L)));
        String obj = streamsBuilder.build().describe().toString();
        Assert.assertTrue(topicExists(toRepartitionTopicName("new-key")));
        Assert.assertEquals(1L, countOccurrencesInTopology(obj, "Sink: .*new-key-repartition.*"));
        Assert.assertEquals(1L, countOccurrencesInTopology(obj, "<-- new-key\n"));
    }

    @Test
    public void shouldCreateRepartitionTopicWithSpecifiedNumberOfPartitions() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("new-partitions").withNumberOfPartitions(2)).groupByKey().count().toStream().to(this.outputTopic);
        startStreams(streamsBuilder);
        validateReceivedMessages(new IntegerDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), 1L), new KeyValue(2, 1L)));
        Assert.assertTrue(topicExists(toRepartitionTopicName("new-partitions")));
        Assert.assertEquals(2L, getNumberOfPartitionsForTopic(r0));
    }

    @Test
    public void shouldInheritRepartitionTopicPartitionNumberFromUpstreamTopicWhenNumberOfPartitionsIsNotSpecified() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).repartition(Repartitioned.as("new-topic")).groupByKey().count().toStream().to(this.outputTopic);
        startStreams(streamsBuilder);
        validateReceivedMessages(new IntegerDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), 1L), new KeyValue(2, 1L)));
        Assert.assertTrue(topicExists(toRepartitionTopicName("new-topic")));
        Assert.assertEquals(4L, getNumberOfPartitionsForTopic(r0));
    }

    @Test
    public void shouldCreateOnlyOneRepartitionTopicWhenRepartitionIsFollowedByGroupByKey() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).selectKey((num, str) -> {
            return num.toString();
        }).repartition(Repartitioned.as("new-partitions").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withNumberOfPartitions(NUM_BROKERS)).groupByKey().count().toStream().to(this.outputTopic);
        startStreams(streamsBuilder);
        String obj = streamsBuilder.build().describe().toString();
        validateReceivedMessages(new StringDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValue("1", 1L), new KeyValue("2", 1L)));
        Assert.assertTrue(topicExists(toRepartitionTopicName("new-partitions")));
        Assert.assertEquals(1L, countOccurrencesInTopology(obj, "Sink: .*-repartition"));
    }

    @Test
    public void shouldGenerateRepartitionTopicWhenNameIsNotSpecified() throws Exception {
        sendEvents(System.currentTimeMillis(), Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).selectKey((num, str) -> {
            return num.toString();
        }).repartition(Repartitioned.with(Serdes.String(), Serdes.String())).to(this.outputTopic);
        startStreams(streamsBuilder);
        validateReceivedMessages(new StringDeserializer(), new StringDeserializer(), Arrays.asList(new KeyValue("1", "A"), new KeyValue("2", "B")));
        Assert.assertEquals(1L, countOccurrencesInTopology(streamsBuilder.build().describe().toString(), "Sink: .*-repartition"));
    }

    @Test
    public void shouldGoThroughRebalancingCorrectly() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        sendEvents(currentTimeMillis, Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B")));
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream(this.inputTopic, Consumed.with(Serdes.Integer(), Serdes.String())).selectKey((num, str) -> {
            return num.toString();
        }).repartition(Repartitioned.as("rebalancing-test").withKeySerde(Serdes.String()).withValueSerde(Serdes.String()).withNumberOfPartitions(2)).groupByKey().count().toStream().to(this.outputTopic);
        startStreams(streamsBuilder);
        KafkaStreams startStreams = startStreams(streamsBuilder);
        validateReceivedMessages(new StringDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValue("1", 1L), new KeyValue("2", 1L)));
        startStreams.close();
        sendEvents(currentTimeMillis, Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "C"), new KeyValue(2, "D")));
        validateReceivedMessages(new StringDeserializer(), new LongDeserializer(), Arrays.asList(new KeyValue("1", 2L), new KeyValue("2", 2L)));
        Assert.assertTrue(topicExists(toRepartitionTopicName("rebalancing-test")));
        Assert.assertEquals(2L, getNumberOfPartitionsForTopic(r0));
    }

    private int getNumberOfPartitionsForTopic(String str) throws Exception {
        AdminClient createAdminClient = createAdminClient();
        Throwable th = null;
        try {
            try {
                int size = ((TopicDescription) ((KafkaFuture) createAdminClient.describeTopics(Collections.singleton(str)).values().get(str)).get()).partitions().size();
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                return size;
            } finally {
            }
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    private boolean topicExists(String str) throws Exception {
        AdminClient createAdminClient = createAdminClient();
        Throwable th = null;
        try {
            try {
                boolean contains = ((Set) createAdminClient.listTopics().names().get()).contains(str);
                if (createAdminClient != null) {
                    if (0 != 0) {
                        try {
                            createAdminClient.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        createAdminClient.close();
                    }
                }
                return contains;
            } finally {
            }
        } catch (Throwable th3) {
            if (createAdminClient != null) {
                if (th != null) {
                    try {
                        createAdminClient.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    createAdminClient.close();
                }
            }
            throw th3;
        }
    }

    private String toRepartitionTopicName(String str) {
        return this.applicationId + "-" + str + "-repartition";
    }

    private static AdminClient createAdminClient() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        return AdminClient.create(properties);
    }

    private static int countOccurrencesInTopology(String str, String str2) {
        Matcher matcher = Pattern.compile(str2).matcher(str);
        ArrayList arrayList = new ArrayList();
        while (matcher.find()) {
            arrayList.add(matcher.group());
        }
        return arrayList.size();
    }

    private void sendEvents(long j, List<KeyValue<Integer, String>> list) throws Exception {
        sendEvents(this.inputTopic, j, list);
    }

    private void sendEvents(String str, long j, List<KeyValue<Integer, String>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(str, list, TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), Long.valueOf(j));
    }

    private KafkaStreams startStreams(StreamsBuilder streamsBuilder) throws InterruptedException {
        return startStreams(streamsBuilder, KafkaStreams.State.REBALANCING, KafkaStreams.State.RUNNING, null);
    }

    private KafkaStreams startStreams(StreamsBuilder streamsBuilder, KafkaStreams.State state, KafkaStreams.State state2, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws InterruptedException {
        CountDownLatch countDownLatch;
        KafkaStreams kafkaStreams = new KafkaStreams(streamsBuilder.build(this.streamsConfiguration), this.streamsConfiguration);
        if (uncaughtExceptionHandler == null) {
            countDownLatch = new CountDownLatch(NUM_BROKERS);
        } else {
            countDownLatch = new CountDownLatch(2);
            kafkaStreams.setUncaughtExceptionHandler((thread, th) -> {
                uncaughtExceptionHandler.uncaughtException(thread, th);
                countDownLatch.countDown();
            });
        }
        CountDownLatch countDownLatch2 = countDownLatch;
        kafkaStreams.setStateListener((state3, state4) -> {
            if (state == state4 && state2 == state3) {
                countDownLatch2.countDown();
            }
        });
        kafkaStreams.start();
        countDownLatch.await(IntegrationTestUtils.DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
        this.kafkaStreamsInstances.add(kafkaStreams);
        return kafkaStreams;
    }

    private <K, V> void validateReceivedMessages(Deserializer<K> deserializer, Deserializer<V> deserializer2, List<KeyValue<K, V>> list) throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(properties, this.outputTopic, list);
    }
}
