package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.kstream.KGroupedStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.SessionWindows;
import org.apache.kafka.streams.kstream.StreamJoined;
import org.apache.kafka.streams.kstream.TimeWindows;
import org.apache.kafka.streams.kstream.WindowedSerdes;
import org.apache.kafka.streams.kstream.Windows;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.MockMapper;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.MatcherAssert;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/WindowedChangelogRetentionIntegrationTest.class */
public class WindowedChangelogRetentionIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String STREAM_ONE_INPUT = "stream-one";
    private static final String STREAM_TWO_INPUT = "stream-two";
    private static final String OUTPUT_TOPIC = "output";
    private StreamsBuilder builder;
    private Properties streamsConfiguration;
    private KafkaStreams kafkaStreams;
    private KGroupedStream<String, String> groupedStream;

    @Rule
    public TestName testName = new TestName();
    private static final Duration DEFAULT_RETENTION = Duration.ofDays(1);

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @BeforeClass
    public static void createTopics() throws InterruptedException {
        CLUSTER.createTopic(STREAM_ONE_INPUT, 3, 1);
        CLUSTER.createTopic(STREAM_TWO_INPUT, 3, 1);
        CLUSTER.createTopics(OUTPUT_TOPIC);
    }

    @Before
    public void before() {
        this.builder = new StreamsBuilder();
        this.streamsConfiguration = new Properties();
        this.streamsConfiguration.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        this.streamsConfiguration.put("bootstrap.servers", CLUSTER.bootstrapServers());
        this.streamsConfiguration.put("auto.offset.reset", "earliest");
        this.streamsConfiguration.put("state.dir", TestUtils.tempDirectory().getPath());
        this.streamsConfiguration.put("cache.max.bytes.buffering", 0);
        this.streamsConfiguration.put("commit.interval.ms", 100);
        this.streamsConfiguration.put("default.key.serde", Serdes.String().getClass());
        this.streamsConfiguration.put("default.value.serde", Serdes.Integer().getClass());
        this.groupedStream = this.builder.stream(STREAM_ONE_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())).groupBy(MockMapper.selectValueMapper(), Grouped.with(Serdes.String(), Serdes.String()));
    }

    @After
    public void whenShuttingDown() throws Exception {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
        }
        IntegrationTestUtils.purgeLocalStreamsState(this.streamsConfiguration);
    }

    @Test
    public void timeWindowedChangelogShouldHaveRetentionOfWindowSizeIfWindowSizeLargerThanDefaultRetention() throws Exception {
        Duration plus = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        runAndVerifyTimeWindows(TimeWindows.of(plus), null, plus);
    }

    @Test
    public void timeWindowedChangelogShouldHaveDefaultRetentionIfWindowSizeLessThanDefaultRetention() throws Exception {
        Duration minus = DEFAULT_RETENTION.minus(Duration.ofHours(1L));
        runAndVerifyTimeWindows(TimeWindows.of(minus), null, DEFAULT_RETENTION);
    }

    @Test
    public void timeWindowedChangelogShouldHaveRetentionOfWindowSizePlusGraceIfWindowSizePlusGraceLargerThanDefaultRetention() throws Exception {
        Duration plus = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration ofHours = Duration.ofHours(12L);
        runAndVerifyTimeWindows(TimeWindows.of(plus).grace(ofHours), null, plus.plus(ofHours));
    }

    @Test
    public void timeWindowedChangelogShouldHaveDefaultRetentionIfWindowSizePlusGraceLessThanDefaultRetention() throws Exception {
        Duration ofMillis = Duration.ofMillis(1000L);
        Duration minus = DEFAULT_RETENTION.minus(ofMillis).minus(Duration.ofMillis(500L));
        runAndVerifyTimeWindows(TimeWindows.of(minus).grace(ofMillis), null, DEFAULT_RETENTION);
    }

    @Test
    public void timeWindowedChangelogShouldHaveUserSpecifiedRetentionIfUserSpecifiedRetentionEvenIfLessThanDefaultRetention() throws Exception {
        Duration ofHours = Duration.ofHours(6L);
        Duration minus = DEFAULT_RETENTION.minus(ofHours).minus(Duration.ofHours(1L));
        Duration plus = minus.plus(ofHours).plus(Duration.ofHours(1L));
        runAndVerifyTimeWindows(TimeWindows.of(minus).grace(ofHours), plus, plus);
    }

    private void runAndVerifyTimeWindows(Windows<TimeWindow> windows, Duration duration, Duration duration2) throws Exception {
        this.groupedStream.windowedBy(windows).count(duration != null ? Materialized.as("windowed-store").withRetention(duration) : Materialized.as("windowed-store")).toStream().to(OUTPUT_TOPIC, Produced.with(WindowedSerdes.timeWindowedSerdeFrom(String.class, windows.size()), Serdes.Long()));
        startStreams();
        verifyChangelogRetentionOfWindowedStore("windowed-store", duration2);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveRetentionOfGapIfGapLargerThanDefaultRetention() throws Exception {
        Duration plus = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        runAndVerifySessionWindows(SessionWindows.with(plus), null, plus);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveDefaultRetentionIfGapLessThanDefaultRetention() throws Exception {
        Duration minus = DEFAULT_RETENTION.minus(Duration.ofHours(1L));
        runAndVerifySessionWindows(SessionWindows.with(minus), null, DEFAULT_RETENTION);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveDefaultRetentionIfGapPlusGraceLessThanDefaultRetention() throws Exception {
        Duration ofHours = Duration.ofHours(1L);
        Duration minus = DEFAULT_RETENTION.minus(ofHours).minus(Duration.ofHours(1L));
        runAndVerifySessionWindows(SessionWindows.with(minus).grace(ofHours), null, DEFAULT_RETENTION);
    }

    @Test
    public void sessionWindowedChangelogShouldHaveUserSpecifiedRetentionIfUserSpecifiedRetentionEvenIfLessThanDefaultRetention() throws Exception {
        Duration ofHours = Duration.ofHours(6L);
        Duration minus = DEFAULT_RETENTION.minus(ofHours).minus(Duration.ofHours(1L));
        Duration plus = minus.plus(ofHours).plus(Duration.ofHours(1L));
        runAndVerifySessionWindows(SessionWindows.with(minus).grace(ofHours), plus, plus);
    }

    private void runAndVerifySessionWindows(SessionWindows sessionWindows, Duration duration, Duration duration2) throws Exception {
        this.groupedStream.windowedBy(sessionWindows).count(duration != null ? Materialized.as("windowed-store").withRetention(duration) : Materialized.as("windowed-store")).toStream().to(OUTPUT_TOPIC, Produced.with(WindowedSerdes.sessionWindowedSerdeFrom(String.class), Serdes.Long()));
        startStreams();
        verifyChangelogRetentionOfWindowedStore("windowed-store", duration2);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfDoubleWindowSizeIfWindowSizeLargerThanDefaultRetention() throws Exception {
        Duration plus = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        runAndVerifyJoinWindows(JoinWindows.of(plus), plus.multipliedBy(2L));
    }

    @Test
    public void joinWindowedChangelogShouldHaveDefaultRetentionIfDoubleWindowSizeLessThanDefaultRetention() throws Exception {
        Duration minus = DEFAULT_RETENTION.dividedBy(2L).minus(Duration.ofHours(1L));
        runAndVerifyJoinWindows(JoinWindows.of(minus), DEFAULT_RETENTION);
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfDoubleWindowSizePlusGraceIfDoubleWindowSizePlusGraceLessThanDefaultRetention() throws Exception {
        Duration ofHours = Duration.ofHours(3L);
        Duration minus = DEFAULT_RETENTION.dividedBy(2L).minus(ofHours).minus(Duration.ofHours(1L));
        runAndVerifyJoinWindows(JoinWindows.of(minus).grace(ofHours), minus.multipliedBy(2L).plus(ofHours));
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfDoubleWindowSizePlusGraceIfDoubleWindowSizePlusGraceGreaterThanDefaultRetention() throws Exception {
        Duration plus = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration ofHours = Duration.ofHours(3L);
        runAndVerifyJoinWindows(JoinWindows.of(plus).grace(ofHours), plus.multipliedBy(2L).plus(ofHours));
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfBeforePlusAfterPlusGraceIfBeforePlusAfterPlusGraceGreaterThanDefaultRetention() throws Exception {
        Duration plus = DEFAULT_RETENTION.plus(Duration.ofHours(1L));
        Duration minus = DEFAULT_RETENTION.minus(Duration.ofHours(1L));
        Duration ofHours = Duration.ofHours(3L);
        runAndVerifyJoinWindows(JoinWindows.of(plus).after(minus).grace(ofHours), plus.plus(minus).plus(ofHours));
    }

    @Test
    public void joinWindowedChangelogShouldHaveRetentionOfBeforePlusAfterPlusGraceIfBeforePlusAfterePlusGraceLessThanDefaultRetention() throws Exception {
        Duration ofHours = Duration.ofHours(3L);
        Duration minus = DEFAULT_RETENTION.dividedBy(2L).minus(ofHours).minus(Duration.ofHours(1L));
        Duration minus2 = DEFAULT_RETENTION.dividedBy(2L).minus(ofHours).minus(Duration.ofHours(4L));
        runAndVerifyJoinWindows(JoinWindows.of(minus).after(minus2).grace(ofHours), minus.plus(minus2).plus(ofHours));
    }

    private void runAndVerifyJoinWindows(JoinWindows joinWindows, Duration duration) throws Exception {
        this.builder.stream(STREAM_ONE_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())).join(this.builder.stream(STREAM_TWO_INPUT, Consumed.with(Serdes.Integer(), Serdes.String())), (str, str2) -> {
            return str;
        }, joinWindows, StreamJoined.as("testjoin")).to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), Serdes.String()));
        startStreams();
        verifyChangelogRetentionOfWindowedStore("testjoin-this-join-store", duration);
        verifyChangelogRetentionOfWindowedStore("testjoin-other-join-store", duration);
    }

    private void startStreams() throws Exception {
        Topology build = this.builder.build();
        System.out.println(build.describe().toString());
        this.kafkaStreams = new KafkaStreams(build, this.streamsConfiguration);
        this.kafkaStreams.start();
        IntegrationTestUtils.waitForApplicationState(Collections.singletonList(this.kafkaStreams), KafkaStreams.State.RUNNING, Duration.ofSeconds(30L));
    }

    private void verifyChangelogRetentionOfWindowedStore(String str, Duration duration) {
        MatcherAssert.assertThat(Long.valueOf(Long.parseLong(CLUSTER.getLogConfig(this.streamsConfiguration.getProperty("application.id") + "-" + str + "-changelog").getProperty("retention.ms"))), Is.is(Long.valueOf(duration.toMillis() + Duration.ofDays(1L).toMillis())));
    }
}
