package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.kstream.Suppressed;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/SuppressionIntegrationTest.class */
public class SuppressionIntegrationTest {

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties(Utils.mkMap(new Map.Entry[0])), 0);
    private static final StringSerializer STRING_SERIALIZER = new StringSerializer();
    private static final Serde<String> STRING_SERDE = Serdes.String();
    private static final int COMMIT_INTERVAL = 100;

    private static KTable<String, Long> buildCountsTable(String str, StreamsBuilder streamsBuilder) {
        return streamsBuilder.table(str, Consumed.with(STRING_SERDE, STRING_SERDE), Materialized.with(STRING_SERDE, STRING_SERDE).withCachingDisabled().withLoggingDisabled()).groupBy((str2, str3) -> {
            return new KeyValue(str3, str2);
        }, Grouped.with(STRING_SERDE, STRING_SERDE)).count(Materialized.as("counts").withCachingDisabled());
    }

    @Test
    public void shouldUseDefaultSerdes() {
        String str = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldInheritSerdes";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldInheritSerdes", "output-raw-shouldInheritSerdes", "output-suppressed-shouldInheritSerdes");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable aggregate = streamsBuilder.stream("input-shouldInheritSerdes").groupByKey().aggregate(() -> {
            return "()";
        }, (str2, str3, str4) -> {
            return str4 + ",(" + str2 + ": " + str3 + ")";
        });
        aggregate.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxRecords(1L).emitEarlyWhenFull())).toStream().to("output-suppressed-shouldInheritSerdes");
        aggregate.toStream().to("output-raw-shouldInheritSerdes");
        Properties streamsConfig = getStreamsConfig(str);
        streamsConfig.put("default.key.serde", Serdes.StringSerde.class);
        streamsConfig.put("default.value.serde", Serdes.StringSerde.class);
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, streamsBuilder, true);
        try {
            produceSynchronously("input-shouldInheritSerdes", Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(0L)), new KeyValueTimestamp("k1", "v2", scaledTime(1L)), new KeyValueTimestamp("k2", "v1", scaledTime(2L)), new KeyValueTimestamp("x", "x", scaledTime(3L))));
            boolean waitForAnyRecord = waitForAnyRecord("output-raw-shouldInheritSerdes");
            boolean waitForAnyRecord2 = waitForAnyRecord("output-suppressed-shouldInheritSerdes");
            MatcherAssert.assertThat(Boolean.valueOf(waitForAnyRecord), Matchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(waitForAnyRecord2), CoreMatchers.is(true));
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    @Test
    public void shouldInheritSerdes() {
        String str = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldInheritSerdes";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldInheritSerdes", "output-raw-shouldInheritSerdes", "output-suppressed-shouldInheritSerdes");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable count = streamsBuilder.stream("input-shouldInheritSerdes").groupByKey().count();
        count.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxRecords(1L).emitEarlyWhenFull())).toStream().to("output-suppressed-shouldInheritSerdes");
        count.toStream().to("output-raw-shouldInheritSerdes");
        Properties streamsConfig = getStreamsConfig(str);
        streamsConfig.put("default.key.serde", Serdes.StringSerde.class);
        streamsConfig.put("default.value.serde", Serdes.StringSerde.class);
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(streamsConfig, streamsBuilder, true);
        try {
            produceSynchronously("input-shouldInheritSerdes", Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(0L)), new KeyValueTimestamp("k1", "v2", scaledTime(1L)), new KeyValueTimestamp("k2", "v1", scaledTime(2L)), new KeyValueTimestamp("x", "x", scaledTime(3L))));
            boolean waitForAnyRecord = waitForAnyRecord("output-raw-shouldInheritSerdes");
            boolean waitForAnyRecord2 = waitForAnyRecord("output-suppressed-shouldInheritSerdes");
            MatcherAssert.assertThat(Boolean.valueOf(waitForAnyRecord), Matchers.is(true));
            MatcherAssert.assertThat(Boolean.valueOf(waitForAnyRecord2), CoreMatchers.is(true));
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    private static boolean waitForAnyRecord(String str) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.deserializer", StringDeserializer.class);
        properties.put("value.deserializer", StringDeserializer.class);
        properties.put("enable.auto.commit", false);
        KafkaConsumer kafkaConsumer = new KafkaConsumer(properties);
        Throwable th = null;
        try {
            try {
                List list = (List) kafkaConsumer.partitionsFor(str).stream().map(partitionInfo -> {
                    return new TopicPartition(partitionInfo.topic(), partitionInfo.partition());
                }).collect(Collectors.toList());
                kafkaConsumer.assign(list);
                kafkaConsumer.seekToBeginning(list);
                long currentTimeMillis = System.currentTimeMillis();
                while (System.currentTimeMillis() - currentTimeMillis < IntegrationTestUtils.DEFAULT_TIMEOUT) {
                    if (!kafkaConsumer.poll(Duration.ofMillis(500L)).isEmpty()) {
                        if (kafkaConsumer != null) {
                            if (0 != 0) {
                                try {
                                    kafkaConsumer.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                kafkaConsumer.close();
                            }
                        }
                        return true;
                    }
                }
                if (kafkaConsumer != null) {
                    if (0 != 0) {
                        try {
                            kafkaConsumer.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        kafkaConsumer.close();
                    }
                }
                return false;
            } finally {
            }
        } catch (Throwable th4) {
            if (kafkaConsumer != null) {
                if (th != null) {
                    try {
                        kafkaConsumer.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    kafkaConsumer.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void shouldShutdownWhenRecordConstraintIsViolated() throws InterruptedException {
        String str = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldShutdownWhenRecordConstraintIsViolated";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldShutdownWhenRecordConstraintIsViolated", "output-raw-shouldShutdownWhenRecordConstraintIsViolated", "output-suppressed-shouldShutdownWhenRecordConstraintIsViolated");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable<String, Long> buildCountsTable = buildCountsTable("input-shouldShutdownWhenRecordConstraintIsViolated", streamsBuilder);
        buildCountsTable.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxRecords(1L).shutDownWhenFull())).toStream().to("output-suppressed-shouldShutdownWhenRecordConstraintIsViolated", Produced.with(STRING_SERDE, Serdes.Long()));
        buildCountsTable.toStream().to("output-raw-shouldShutdownWhenRecordConstraintIsViolated", Produced.with(STRING_SERDE, Serdes.Long()));
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(getStreamsConfig(str), streamsBuilder, true);
        try {
            produceSynchronously("input-shouldShutdownWhenRecordConstraintIsViolated", Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(0L)), new KeyValueTimestamp("k1", "v2", scaledTime(1L)), new KeyValueTimestamp("k2", "v1", scaledTime(2L)), new KeyValueTimestamp("x", "x", scaledTime(3L))));
            verifyErrorShutdown(startedStreams);
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    @Test
    public void shouldShutdownWhenBytesConstraintIsViolated() throws InterruptedException {
        String str = getClass().getSimpleName().toLowerCase(Locale.getDefault()) + "-shouldShutdownWhenBytesConstraintIsViolated";
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, "input-shouldShutdownWhenBytesConstraintIsViolated", "output-raw-shouldShutdownWhenBytesConstraintIsViolated", "output-suppressed-shouldShutdownWhenBytesConstraintIsViolated");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        KTable<String, Long> buildCountsTable = buildCountsTable("input-shouldShutdownWhenBytesConstraintIsViolated", streamsBuilder);
        buildCountsTable.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(Long.MAX_VALUE), Suppressed.BufferConfig.maxBytes(200L).shutDownWhenFull())).toStream().to("output-suppressed-shouldShutdownWhenBytesConstraintIsViolated", Produced.with(STRING_SERDE, Serdes.Long()));
        buildCountsTable.toStream().to("output-raw-shouldShutdownWhenBytesConstraintIsViolated", Produced.with(STRING_SERDE, Serdes.Long()));
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(getStreamsConfig(str), streamsBuilder, true);
        try {
            produceSynchronously("input-shouldShutdownWhenBytesConstraintIsViolated", Arrays.asList(new KeyValueTimestamp("k1", "v1", scaledTime(0L)), new KeyValueTimestamp("k1", "v2", scaledTime(1L)), new KeyValueTimestamp("k2", "v1", scaledTime(2L)), new KeyValueTimestamp("x", "x", scaledTime(3L))));
            verifyErrorShutdown(startedStreams);
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    private static Properties getStreamsConfig(String str) {
        return Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("application.id", str), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("poll.ms", Integer.toString(COMMIT_INTERVAL)), Utils.mkEntry("commit.interval.ms", Integer.toString(COMMIT_INTERVAL)), Utils.mkEntry("processing.guarantee", "at_least_once"), Utils.mkEntry("state.dir", TestUtils.tempDirectory().getPath())}));
    }

    private static long scaledTime(long j) {
        return 200 * j;
    }

    private static void produceSynchronously(String str, List<KeyValueTimestamp<String, String>> list) {
        IntegrationTestUtils.produceSynchronously(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client.id", "anything"), Utils.mkEntry("key.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("value.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})), false, str, Optional.empty(), list);
    }

    private static void verifyErrorShutdown(KafkaStreams kafkaStreams) throws InterruptedException {
        TestUtils.waitForCondition(() -> {
            return !kafkaStreams.state().isRunningOrRebalancing();
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Streams didn't shut down.");
        MatcherAssert.assertThat(kafkaStreams.state(), CoreMatchers.is(KafkaStreams.State.ERROR));
    }
}
