package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.test.IntegrationTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/SuppressionDurabilityIntegrationTest.class */
public class SuppressionDurabilityIntegrationTest {

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(3, Utils.mkProperties(Utils.mkMap(new Map.Entry[0])), 0);
    private static final StringDeserializer STRING_DESERIALIZER = new StringDeserializer();
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
    private static final int COMMIT_INTERVAL = 100;
    private final boolean eosEnabled;

    public SuppressionDurabilityIntegrationTest(boolean z) {
        this.eosEnabled = z;
    }

    @Parameterized.Parameters(name = "{index}: eosEnabled={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    private KTable<String, Long> buildCountsTable(String str, StreamsBuilder streamsBuilder) {
        return streamsBuilder.table(str, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((str2, str3) -> {
            return new KeyValue(str3, str2);
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.as("counts").withCachingDisabled());
    }

    @Test
    public void shouldRecoverBufferAfterShutdown() {
        String str = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldRecoverBufferAfterShutdown";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldRecoverBufferAfterShutdown", "output-raw-shouldRecoverBufferAfterShutdown", "output-suppressed-shouldRecoverBufferAfterShutdown");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable<String, Long> buildCountsTable = buildCountsTable("input-shouldRecoverBufferAfterShutdown", streamsBuilder);
        KStream stream = buildCountsTable.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxRecords(3L).emitEarlyWhenFull())).toStream();
        AtomicInteger atomicInteger = new AtomicInteger(0);
        stream.foreach((str2, l) -> {
            atomicInteger.incrementAndGet();
        });
        stream.to("output-suppressed-shouldRecoverBufferAfterShutdown", Produced.with(STRING_SERDE, Serdes.Long()));
        buildCountsTable.toStream().to("output-raw-shouldRecoverBufferAfterShutdown", Produced.with(STRING_SERDE, Serdes.Long()));
        Map.Entry[] entryArr = new Map.Entry[5];
        entryArr[0] = Utils.mkEntry("application.id", str);
        entryArr[1] = Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers());
        entryArr[2] = Utils.mkEntry("poll.ms", Integer.toString(COMMIT_INTERVAL));
        entryArr[3] = Utils.mkEntry("commit.interval.ms", Integer.toString(COMMIT_INTERVAL));
        entryArr[4] = Utils.mkEntry("processing.guarantee", this.eosEnabled ? "exactly_once" : "at_least_once");
        Properties mkProperties = Utils.mkProperties(Utils.mkMap(entryArr));
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(mkProperties, streamsBuilder, true);
        try {
            produceSynchronously("input-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(1L)), new KeyValueTimestamp("k2", "v2", scaledTime(2L)), new KeyValueTimestamp("k3", "v3", scaledTime(3L))));
            verifyOutput("output-raw-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("v1", 1L, scaledTime(1L)), new KeyValueTimestamp("v2", 1L, scaledTime(2L)), new KeyValueTimestamp("v3", 1L, scaledTime(3L))));
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(0));
            produceSynchronously("input-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("k4", "v4", scaledTime(4L)), new KeyValueTimestamp("k5", "v5", scaledTime(5L))));
            verifyOutput("output-raw-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("v4", 1L, scaledTime(4L)), new KeyValueTimestamp("v5", 1L, scaledTime(5L))));
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(2));
            verifyOutput("output-suppressed-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("v1", 1L, scaledTime(1L)), new KeyValueTimestamp("v2", 1L, scaledTime(2L))));
            startedStreams.close();
            MatcherAssert.assertThat(startedStreams.state(), CoreMatchers.is(KafkaStreams.State.NOT_RUNNING));
            startedStreams = IntegrationTestUtils.getStartedStreams(mkProperties, streamsBuilder, false);
            produceSynchronously("input-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("k6", "v6", scaledTime(6L)), new KeyValueTimestamp("k7", "v7", scaledTime(7L)), new KeyValueTimestamp("k8", "v8", scaledTime(8L))));
            verifyOutput("output-raw-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("v6", 1L, scaledTime(6L)), new KeyValueTimestamp("v7", 1L, scaledTime(7L)), new KeyValueTimestamp("v8", 1L, scaledTime(8L))));
            MatcherAssert.assertThat(Integer.valueOf(atomicInteger.get()), CoreMatchers.is(5));
            verifyOutput("output-suppressed-shouldRecoverBufferAfterShutdown", Arrays.asList(new KeyValueTimestamp("v3", 1L, scaledTime(3L)), new KeyValueTimestamp("v4", 1L, scaledTime(4L)), new KeyValueTimestamp("v5", 1L, scaledTime(5L))));
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    private void verifyOutput(String str, List<KeyValueTimestamp<String, Long>> list) {
        IntegrationTestUtils.verifyKeyValueTimestamps(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("group.id", "test-group"), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("key.deserializer", STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry("value.deserializer", LONG_DESERIALIZER.getClass().getName())})), str, list);
    }

    private long scaledTime(long j) {
        return 200 * j;
    }

    private void produceSynchronously(String str, List<KeyValueTimestamp<String, String>> list) {
        IntegrationTestUtils.produceSynchronously(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client.id", "anything"), Utils.mkEntry("key.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("value.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})), false, str, list);
    }
}
