package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.MockKeyValueMapper;
import org.apache.kafka.test.MockValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.class */
public class KStreamRepartitionJoinTest {
    private KStreamBuilder builder;
    private Properties streamsConfiguration;
    private KStream<Long, Integer> streamOne;
    private KStream<Integer, String> streamTwo;
    private KStream<Integer, String> streamFour;
    private KeyValueMapper<Long, Integer, KeyValue<Integer, Integer>> keyMapper;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String streamTwoInput;
    private String streamFourInput;

    @Parameterized.Parameter
    public long cacheSizeBytes;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    public static final ValueJoiner<Object, Object, String> TOSTRING_JOINER = MockValueJoiner.instance(":");
    private static final long WINDOW_SIZE = TimeUnit.MILLISECONDS.convert(1, TimeUnit.DAYS);
    private static volatile int testNo = 0;
    private final MockTime mockTime = CLUSTER.time;
    private final List<String> expectedStreamOneTwoJoin = Arrays.asList("1:A", "2:B", "3:C", "4:D", "5:E");

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamRepartitionJoinTest$ExpectedOutputOnTopic.class */
    public class ExpectedOutputOnTopic {
        private final List<String> expectedOutput;
        private final String outputTopic;

        ExpectedOutputOnTopic(List<String> list, String str) {
            this.expectedOutput = list;
            this.outputTopic = str;
        }
    }

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{0, 10485760L};
    }

    @Before
    public void before() throws InterruptedException {
        testNo += NUM_BROKERS;
        String str = "kstream-repartition-join-test-" + testNo;
        this.builder = new KStreamBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", str);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("num.stream.threads", 3);
        this.streamsConfiguration.put("cache.max.bytes.buffering", Long.valueOf(this.cacheSizeBytes));
        this.streamOne = this.builder.stream(Serdes.Long(), Serdes.Integer(), new String[]{this.streamOneInput});
        this.streamTwo = this.builder.stream(Serdes.Integer(), Serdes.String(), new String[]{this.streamTwoInput});
        this.streamFour = this.builder.stream(Serdes.Integer(), Serdes.String(), new String[]{this.streamFourInput});
        this.keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper();
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldCorrectlyRepartitionOnJoinOperations() throws Exception {
        produceMessages();
        ExpectedOutputOnTopic mapStreamOneAndJoin = mapStreamOneAndJoin();
        ExpectedOutputOnTopic mapBothStreamsAndJoin = mapBothStreamsAndJoin();
        ExpectedOutputOnTopic mapMapJoin = mapMapJoin();
        ExpectedOutputOnTopic selectKeyAndJoin = selectKeyAndJoin();
        ExpectedOutputOnTopic flatMapJoin = flatMapJoin();
        ExpectedOutputOnTopic joinMappedRhsStream = joinMappedRhsStream();
        ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined = joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined();
        ExpectedOutputOnTopic mapBothStreamsAndLeftJoin = mapBothStreamsAndLeftJoin();
        startStreams();
        verifyCorrectOutput(mapStreamOneAndJoin);
        verifyCorrectOutput(mapBothStreamsAndJoin);
        verifyCorrectOutput(mapMapJoin);
        verifyCorrectOutput(selectKeyAndJoin);
        verifyCorrectOutput(flatMapJoin);
        verifyCorrectOutput(joinMappedRhsStream);
        verifyCorrectOutput(joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined);
        verifyLeftJoin(mapBothStreamsAndLeftJoin);
    }

    private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException {
        String str = "map-one-join-output-" + testNo;
        doJoin(this.streamOne.map(this.keyMapper), this.streamTwo, str);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, str);
    }

    private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws Exception {
        doJoin(this.streamOne.map(this.keyMapper), this.streamTwo.map(MockKeyValueMapper.NoOpKeyValueMapper()), "map-both-streams-and-join-" + testNo);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo);
    }

    private ExpectedOutputOnTopic mapMapJoin() throws Exception {
        KStream<Integer, Integer> map = this.streamOne.map(new KeyValueMapper<Long, Integer, KeyValue<Long, Integer>>() { // from class: org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.1
            public KeyValue<Long, Integer> apply(Long l, Integer num) {
                return num == null ? new KeyValue<>((Object) null, (Object) null) : new KeyValue<>(Long.valueOf(l.longValue() + num.intValue()), num);
            }
        }).map(this.keyMapper);
        String str = "map-map-join-" + testNo;
        doJoin(map, this.streamTwo, str);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, str);
    }

    private ExpectedOutputOnTopic selectKeyAndJoin() throws ExecutionException, InterruptedException {
        KStream<Integer, Integer> selectKey = this.streamOne.selectKey(MockKeyValueMapper.SelectValueMapper());
        String str = "select-key-join-" + testNo;
        doJoin(selectKey, this.streamTwo, str);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, str);
    }

    private ExpectedOutputOnTopic flatMapJoin() throws Exception {
        KStream<Integer, Integer> flatMap = this.streamOne.flatMap(new KeyValueMapper<Long, Integer, Iterable<KeyValue<Integer, Integer>>>() { // from class: org.apache.kafka.streams.integration.KStreamRepartitionJoinTest.2
            public Iterable<KeyValue<Integer, Integer>> apply(Long l, Integer num) {
                return Collections.singletonList(new KeyValue(num, num));
            }
        });
        String str = "flat-map-join-" + testNo;
        doJoin(flatMap, this.streamTwo, str);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, str);
    }

    private ExpectedOutputOnTopic joinMappedRhsStream() throws Exception {
        String str = "join-rhs-stream-mapped-" + testNo;
        CLUSTER.createTopic(str);
        this.streamTwo.join(this.streamOne.map(this.keyMapper), TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.String(), Serdes.Integer()).to(Serdes.Integer(), Serdes.String(), str);
        return new ExpectedOutputOnTopic(Arrays.asList("A:1", "B:2", "C:3", "D:4", "E:5"), str);
    }

    private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws Exception {
        KStream map = this.streamOne.map(this.keyMapper);
        KStream map2 = this.streamTwo.map(MockKeyValueMapper.NoOpKeyValueMapper());
        String str = "left-join-" + testNo;
        CLUSTER.createTopic(str);
        map.leftJoin(map2, TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.Integer(), Serdes.String()).to(Serdes.Integer(), Serdes.String(), str);
        return new ExpectedOutputOnTopic(this.expectedStreamOneTwoJoin, str);
    }

    private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined() throws Exception {
        KStream map = this.streamOne.map(this.keyMapper);
        KeyValueMapper NoOpKeyValueMapper = MockKeyValueMapper.NoOpKeyValueMapper();
        KStream join = map.join(this.streamTwo.map(NoOpKeyValueMapper), TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.Integer(), Serdes.String());
        String str = "map-join-join-" + testNo;
        CLUSTER.createTopic(str);
        join.map(NoOpKeyValueMapper).join(this.streamFour.map(NoOpKeyValueMapper), TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.String(), Serdes.String()).to(Serdes.Integer(), Serdes.String(), str);
        return new ExpectedOutputOnTopic(Arrays.asList("1:A:A", "2:B:B", "3:C:C", "4:D:D", "5:E:E"), str);
    }

    private JoinWindows getJoinWindow() {
        return JoinWindows.of(WINDOW_SIZE).until(3 * WINDOW_SIZE);
    }

    private void verifyCorrectOutput(ExpectedOutputOnTopic expectedOutputOnTopic) throws InterruptedException {
        MatcherAssert.assertThat(receiveMessages(new StringDeserializer(), expectedOutputOnTopic.expectedOutput.size(), expectedOutputOnTopic.outputTopic), Is.is(expectedOutputOnTopic.expectedOutput));
    }

    private void verifyLeftJoin(ExpectedOutputOnTopic expectedOutputOnTopic) throws InterruptedException, ExecutionException {
        if (receiveMessages(new StringDeserializer(), expectedOutputOnTopic.expectedOutput.size(), expectedOutputOnTopic.outputTopic).equals(expectedOutputOnTopic.expectedOutput)) {
            return;
        }
        produceToStreamOne();
        verifyCorrectOutput(expectedOutputOnTopic.expectedOutput, expectedOutputOnTopic.outputTopic);
    }

    private void produceMessages() throws ExecutionException, InterruptedException {
        produceToStreamOne();
        produceStreamTwoInputTo(this.streamTwoInput);
        produceStreamTwoInputTo(this.streamFourInput);
    }

    private void produceStreamTwoInputTo(String str) throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(str, Arrays.asList(new KeyValue(Integer.valueOf(NUM_BROKERS), "A"), new KeyValue(2, "B"), new KeyValue(3, "C"), new KeyValue(4, "D"), new KeyValue(5, "E")), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class, new Properties()), this.mockTime);
    }

    private void produceToStreamOne() throws ExecutionException, InterruptedException {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.streamOneInput, Arrays.asList(new KeyValue(10L, Integer.valueOf(NUM_BROKERS)), new KeyValue(5L, 2), new KeyValue(12L, 3), new KeyValue(15L, 4), new KeyValue(20L, 5), new KeyValue(70L, (Object) null)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), LongSerializer.class, IntegerSerializer.class, new Properties()), this.mockTime);
    }

    private void createTopics() throws InterruptedException {
        this.streamOneInput = "stream-one-" + testNo;
        this.streamTwoInput = "stream-two-" + testNo;
        this.streamFourInput = "stream-four-" + testNo;
        CLUSTER.createTopic(this.streamOneInput, 2, NUM_BROKERS);
        CLUSTER.createTopic(this.streamTwoInput, 2, NUM_BROKERS);
        CLUSTER.createTopic(this.streamFourInput, 2, NUM_BROKERS);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder, this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private List<String> receiveMessages(Deserializer<?> deserializer, int i, String str) throws InterruptedException {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "kstream-test");
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", IntegerDeserializer.class.getName());
        properties.setProperty("value.deserializer", deserializer.getClass().getName());
        List<String> waitUntilMinValuesRecordsReceived = IntegrationTestUtils.waitUntilMinValuesRecordsReceived(properties, str, i, 60000L);
        Collections.sort(waitUntilMinValuesRecordsReceived);
        return waitUntilMinValuesRecordsReceived;
    }

    private void verifyCorrectOutput(List<String> list, String str) throws InterruptedException {
        MatcherAssert.assertThat(receiveMessages(new StringDeserializer(), list.size(), str), Is.is(list));
    }

    private void doJoin(KStream<Integer, Integer> kStream, KStream<Integer, String> kStream2, String str) throws InterruptedException {
        CLUSTER.createTopic(str);
        kStream.join(kStream2, TOSTRING_JOINER, getJoinWindow(), Serdes.Integer(), Serdes.Integer(), Serdes.String()).to(Serdes.Integer(), Serdes.String(), str);
    }
}
