package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.EmitStrategy;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
import org.apache.kafka.streams.kstream.TimeWindowedKStream;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.UnlimitedWindows;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockAggregator;
import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/TimeWindowedKStreamIntegrationTest.class */
public class TimeWindowedKStreamIntegrationTest {
    private static final int NUM_BROKERS = 1;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("log.retention.hours", "-1"), Utils.mkEntry("log.retention.bytes", "-1")})));
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private String streamOneInput;
    private String streamTwoInput;
    private String outputTopic;

    @Parameterized.Parameter
    public EmitStrategy.StrategyType type;

    @Parameterized.Parameter(1)
    public boolean withCache;

    @Parameterized.Parameter(2)
    public EmitStrategy emitStrategy;
    private boolean emitFinal;

    @Rule
    public Timeout globalTimeout = Timeout.seconds(600);

    @Rule
    public TestName testName = new TestName();

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Parameterized.Parameters(name = "{0}_{1}")
    public static Collection<Object[]> getEmitStrategy() {
        return Arrays.asList(new Object[]{EmitStrategy.StrategyType.ON_WINDOW_UPDATE, true, EmitStrategy.onWindowUpdate()}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_UPDATE, false, EmitStrategy.onWindowUpdate()}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_CLOSE, true, EmitStrategy.onWindowClose()}, new Object[]{EmitStrategy.StrategyType.ON_WINDOW_CLOSE, false, EmitStrategy.onWindowClose()});
    }

    @Before
    public void before() throws InterruptedException {
        this.builder = new StreamsBuilder();
        createTopics();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100L);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("__emit.interval.ms.kstreams.windowed.aggregation__", 0);
        this.streamsConfiguration.put("windowstore.changelog.additional.retention.ms", Long.MAX_VALUE);
        this.emitFinal = this.emitStrategy.type() == EmitStrategy.StrategyType.ON_WINDOW_CLOSE;
    }

    @After
    public void whenShuttingDown() throws IOException {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void shouldAggregateWindowedWithNoGrace() throws Exception {
        produceMessages(this.streamOneInput, new KeyValueTimestamp<>("A", "1", 0L), new KeyValueTimestamp<>("A", "2", 5L), new KeyValueTimestamp<>("A", "3", 10L), new KeyValueTimestamp<>("B", "4", 6L), new KeyValueTimestamp<>("B", "5", 11L), new KeyValueTimestamp<>("B", "6", 15L), new KeyValueTimestamp<>("C", "7", 25L));
        this.builder.stream(this.streamOneInput, Consumed.with(Serdes.String(), Serdes.String())).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, getMaterialized()).toStream().to(this.outputTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L), new Serdes.StringSerde()));
        startStreams();
        MatcherAssert.assertThat(receiveMessagesWithTimestamp(new TimeWindowedDeserializer(new StringDeserializer(), 10L), new StringDeserializer(), 10L, String.class, this.emitFinal ? 6 : 12), Is.is(this.emitFinal ? Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+5+6", 15L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(15L, 25L)), "0+6", 15L)) : Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+2", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+4", 6L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+5", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+5+6", 15L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(15L, 25L)), "0+6", 15L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(20L, 30L)), "0+7", 25L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(25L, 35L)), "0+7", 25L))));
    }

    @Test
    public void shouldAggregateWindowedWithGrace() throws Exception {
        produceMessages(this.streamOneInput, new KeyValueTimestamp<>("A", "1", 0L), new KeyValueTimestamp<>("A", "2", 5L), new KeyValueTimestamp<>("A", "3", 10L), new KeyValueTimestamp<>("B", "4", 6L), new KeyValueTimestamp<>("B", "5", 11L), new KeyValueTimestamp<>("B", "6", 15L), new KeyValueTimestamp<>("C", "7", 25L));
        this.builder.stream(this.streamOneInput, Consumed.with(Serdes.String(), Serdes.String())).groupByKey().windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMillis(10L), Duration.ofMillis(5L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, getMaterialized()).toStream().to(this.outputTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L), new Serdes.StringSerde()));
        startStreams();
        MatcherAssert.assertThat(receiveMessagesWithTimestamp(new TimeWindowedDeserializer(new StringDeserializer(), 10L), new StringDeserializer(), 10L, String.class, this.emitFinal ? 6 : 13), Is.is(this.emitFinal ? Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+4", 6L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+5+6", 15L)) : Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1", 0L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+1+2", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+2", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+2+3", 10L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(10L, 20L)), "0+3", 10L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(0L, 10L)), "0+4", 6L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+4", 6L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+4+5", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+5", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+5+6", 15L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(15L, 25L)), "0+6", 15L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(20L, 30L)), "0+7", 25L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(25L, 35L)), "0+7", 25L))));
    }

    @Test
    public void shouldRestoreAfterJoinRestart() throws Exception {
        produceMessages(this.streamOneInput, new KeyValueTimestamp<>("A", "L1", 0L), new KeyValueTimestamp<>("A", "L1", 5L), new KeyValueTimestamp<>("B", "L2", 11L), new KeyValueTimestamp<>("B", "L2", 15L), new KeyValueTimestamp<>("C", "L3", 25L));
        produceMessages(this.streamTwoInput, new KeyValueTimestamp<>("A", "R1", 0L), new KeyValueTimestamp<>("A", "R1", 5L), new KeyValueTimestamp<>("B", "R2", 11L), new KeyValueTimestamp<>("B", "R2", 15L), new KeyValueTimestamp<>("C", "R3", 25L));
        this.builder.stream(this.streamOneInput, Consumed.with(Serdes.String(), Serdes.String())).join(this.builder.stream(this.streamTwoInput, Consumed.with(Serdes.String(), Serdes.String())), (str, str2) -> {
            return str + "," + str2;
        }, JoinWindows.ofTimeDifferenceWithNoGrace(Duration.ofMillis(2L))).groupByKey().windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMillis(10L)).advanceBy(Duration.ofMillis(5L))).emitStrategy(this.emitStrategy).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, getMaterialized()).toStream().to(this.outputTopic, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, 10L), new Serdes.StringSerde()));
        startStreams();
        MatcherAssert.assertThat(receiveMessagesWithTimestamp(new TimeWindowedDeserializer(new StringDeserializer(), 10L), new StringDeserializer(), 10L, String.class, this.emitFinal ? 5 : 9), Is.is(this.emitFinal ? Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+L1,R1+L1,R1", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+L1,R1", 5L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+L2,R2", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+L2,R2+L2,R2", 15L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(15L, 25L)), "0+L2,R2", 15L)) : Arrays.asList(new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+L1,R1", 0L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(0L, 10L)), "0+L1,R1+L1,R1", 5L), new KeyValueTimestamp(new Windowed("A", new TimeWindow(5L, 15L)), "0+L1,R1", 5L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(5L, 15L)), "0+L2,R2", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+L2,R2", 11L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(10L, 20L)), "0+L2,R2+L2,R2", 15L), new KeyValueTimestamp(new Windowed("B", new TimeWindow(15L, 25L)), "0+L2,R2", 15L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(20L, 30L)), "0+L3,R3", 25L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(25L, 35L)), "0+L3,R3", 25L))));
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        produceMessages(this.streamOneInput, new KeyValueTimestamp<>("C", "L3", 35L));
        produceMessages(this.streamTwoInput, new KeyValueTimestamp<>("C", "R3", 35L));
        startStreams();
        MatcherAssert.assertThat(receiveMessagesWithTimestamp(new TimeWindowedDeserializer(new StringDeserializer(), 10L), new StringDeserializer(), 10L, String.class, 2), Is.is(this.emitFinal ? Arrays.asList(new KeyValueTimestamp(new Windowed("C", new TimeWindow(20L, 30L)), "0+L3,R3", 25L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(25L, 35L)), "0+L3,R3", 25L)) : Arrays.asList(new KeyValueTimestamp(new Windowed("C", new TimeWindow(30L, 40L)), "0+L3,R3", 35L), new KeyValueTimestamp(new Windowed("C", new TimeWindow(35L, 45L)), "0+L3,R3", 35L))));
    }

    @Test
    public void shouldThrowUnlimitedWindows() {
        TimeWindowedKStream windowedBy = this.builder.stream(this.streamOneInput, Consumed.with(Serdes.String(), Serdes.String())).groupByKey().windowedBy(UnlimitedWindows.of().startOn(Instant.ofEpochMilli(0L)));
        if (this.emitFinal) {
            Assert.assertThrows(IllegalArgumentException.class, () -> {
                windowedBy.emitStrategy(this.emitStrategy);
            });
        } else {
            windowedBy.emitStrategy(this.emitStrategy);
        }
    }

    private void produceMessages(String str, KeyValueTimestamp<String, String>... keyValueTimestampArr) {
        IntegrationTestUtils.produceSynchronously(TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class), false, str, Optional.empty(), Arrays.asList(keyValueTimestampArr));
    }

    private Materialized getMaterialized() {
        return this.withCache ? Materialized.with((Serde) null, new Serdes.StringSerde()).withCachingEnabled() : Materialized.with((Serde) null, new Serdes.StringSerde()).withCachingDisabled();
    }

    private void createTopics() throws InterruptedException {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        this.streamOneInput = "stream-one-" + safeUniqueTestName;
        this.streamTwoInput = "stream-two-" + safeUniqueTestName;
        this.outputTopic = "output-" + safeUniqueTestName;
        CLUSTER.createTopic(this.streamOneInput, 1, 1);
        CLUSTER.createTopic(this.streamTwoInput, 1, 1);
        CLUSTER.createTopic(this.outputTopic);
    }

    private void startStreams() {
        this.kafkaStreams = new KafkaStreams(this.builder.build(), this.streamsConfiguration);
        this.kafkaStreams.start();
    }

    private <K, V> List<KeyValueTimestamp<K, V>> receiveMessagesWithTimestamp(Deserializer<K> deserializer, Deserializer<V> deserializer2, long j, Class cls, int i) throws Exception {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.setProperty("group.id", "group-" + safeUniqueTestName);
        properties.setProperty("auto.offset.reset", "earliest");
        properties.setProperty("key.deserializer", deserializer.getClass().getName());
        properties.setProperty("value.deserializer", deserializer2.getClass().getName());
        properties.put("window.size.ms", Long.valueOf(j));
        if ((deserializer instanceof TimeWindowedDeserializer) || (deserializer instanceof SessionWindowedDeserializer)) {
            properties.setProperty("windowed.inner.class.serde", Serdes.serdeFrom(cls).getClass().getName());
        }
        return IntegrationTestUtils.waitUntilMinKeyValueWithTimestampRecordsReceived(properties, this.outputTopic, i, IntegrationTestUtils.DEFAULT_TIMEOUT);
    }
}
