package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import kafka.utils.MockTime;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.LongSerializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStreamBuilder;
import org.apache.kafka.streams.kstream.KeyValueMapper;
import org.apache.kafka.streams.kstream.Reducer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest.class */
public class KStreamKTableJoinIntegrationTest {
    private final MockTime mockTime = CLUSTER.time;
    private String userClicksTopic;
    private String userRegionsTopic;
    private String userRegionsStoreName;
    private String outputTopic;
    private KafkaStreams kafkaStreams;
    private Properties streamsConfiguration;

    @Parameterized.Parameter
    public long cacheSizeBytes;
    private static final int NUM_BROKERS = 1;

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
    private static volatile int testNo = 0;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/KStreamKTableJoinIntegrationTest$RegionWithClicks.class */
    public static final class RegionWithClicks {
        private final String region;
        private final long clicks;

        public RegionWithClicks(String str, long j) {
            if (str == null || str.isEmpty()) {
                throw new IllegalArgumentException("region must be set");
            }
            if (j < 0) {
                throw new IllegalArgumentException("clicks must not be negative");
            }
            this.region = str;
            this.clicks = j;
        }

        public String getRegion() {
            return this.region;
        }

        public long getClicks() {
            return this.clicks;
        }
    }

    @Before
    public void before() throws InterruptedException {
        testNo += NUM_BROKERS;
        this.userClicksTopic = "user-clicks-" + testNo;
        this.userRegionsTopic = "user-regions-" + testNo;
        this.userRegionsStoreName = "user-regions-store-name-" + testNo;
        this.outputTopic = "output-topic-" + testNo;
        CLUSTER.createTopic(this.userClicksTopic);
        CLUSTER.createTopic(this.userRegionsTopic);
        CLUSTER.createTopic(this.outputTopic);
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "join-integration-test-" + testNo);
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("key.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("value.serde", Serdes.String().getClass().getName());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", Long.valueOf(this.cacheSizeBytes));
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Parameterized.Parameters
    public static Object[] data() {
        return new Object[]{0, 10485760L};
    }

    @Test
    public void shouldCountClicksPerRegion() throws Exception {
        List asList = Arrays.asList(new KeyValue("alice", 13L), new KeyValue("bob", 4L), new KeyValue("chao", 25L), new KeyValue("bob", 19L), new KeyValue("dave", 56L), new KeyValue("eve", 78L), new KeyValue("alice", 40L), new KeyValue("fang", 99L));
        List asList2 = Arrays.asList(new KeyValue("alice", "asia"), new KeyValue("bob", "americas"), new KeyValue("chao", "asia"), new KeyValue("dave", "europe"), new KeyValue("alice", "europe"), new KeyValue("eve", "americas"), new KeyValue("fang", "asia"));
        List asList3 = this.cacheSizeBytes == 0 ? Arrays.asList(new KeyValue("europe", 13L), new KeyValue("americas", 4L), new KeyValue("asia", 25L), new KeyValue("americas", 23L), new KeyValue("europe", 69L), new KeyValue("americas", 101L), new KeyValue("europe", 109L), new KeyValue("asia", 124L)) : Arrays.asList(new KeyValue("americas", 101L), new KeyValue("europe", 109L), new KeyValue("asia", 124L));
        Serde String = Serdes.String();
        Serde Long = Serdes.Long();
        KStreamBuilder kStreamBuilder = new KStreamBuilder();
        kStreamBuilder.stream(String, Long, new String[]{this.userClicksTopic}).leftJoin(kStreamBuilder.table(String, String, this.userRegionsTopic, this.userRegionsStoreName), new ValueJoiner<Long, String, RegionWithClicks>() { // from class: org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.3
            public RegionWithClicks apply(Long l, String str) {
                return new RegionWithClicks(str == null ? "UNKNOWN" : str, l.longValue());
            }
        }).map(new KeyValueMapper<String, RegionWithClicks, KeyValue<String, Long>>() { // from class: org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.2
            public KeyValue<String, Long> apply(String str, RegionWithClicks regionWithClicks) {
                return new KeyValue<>(regionWithClicks.getRegion(), Long.valueOf(regionWithClicks.getClicks()));
            }
        }).groupByKey(String, Long).reduce(new Reducer<Long>() { // from class: org.apache.kafka.streams.integration.KStreamKTableJoinIntegrationTest.1
            public Long apply(Long l, Long l2) {
                return Long.valueOf(l.longValue() + l2.longValue());
            }
        }, "ClicksPerRegionUnwindowed").to(String, Long, this.outputTopic);
        this.kafkaStreams = new KafkaStreams(kStreamBuilder, this.streamsConfiguration);
        this.kafkaStreams.start();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("acks", "all");
        properties.put("retries", 0);
        properties.put("key.serializer", StringSerializer.class);
        properties.put("value.serializer", StringSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(this.userRegionsTopic, asList2, properties, this.mockTime);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties2.put("acks", "all");
        properties2.put("retries", 0);
        properties2.put("key.serializer", StringSerializer.class);
        properties2.put("value.serializer", LongSerializer.class);
        IntegrationTestUtils.produceKeyValuesSynchronously(this.userClicksTopic, asList, properties2, this.mockTime);
        Properties properties3 = new Properties();
        properties3.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties3.put("group.id", "join-integration-test-standard-consumer");
        properties3.put("auto.offset.reset", "earliest");
        properties3.put("key.deserializer", StringDeserializer.class);
        properties3.put("value.deserializer", LongDeserializer.class);
        Assert.assertThat(IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(properties3, this.outputTopic, asList3.size()), CoreMatchers.equalTo(asList3));
    }
}
