package org.apache.kafka.streams.integration;

import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValueTimestamp;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest.class */
public class ResetPartitionTimeIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final Properties BROKER_CONFIG = new Properties();

    @ClassRule
    public static final EmbeddedKafkaCluster CLUSTER;
    private static final StringDeserializer STRING_DESERIALIZER;
    private static final StringSerializer STRING_SERIALIZER;
    private static final Serde<String> STRING_SERDE;
    private static final int DEFAULT_TIMEOUT = 100;
    private final boolean eosEnabled;
    private static long lastRecordedTimestamp;

    /* loaded from: input_file:org/apache/kafka/streams/integration/ResetPartitionTimeIntegrationTest$MaxTimestampExtractor.class */
    public static final class MaxTimestampExtractor implements TimestampExtractor {
        public long extract(ConsumerRecord<Object, Object> consumerRecord, long j) {
            long unused = ResetPartitionTimeIntegrationTest.lastRecordedTimestamp = j;
            return consumerRecord.timestamp();
        }
    }

    @Parameterized.Parameters(name = "{index}: eosEnabled={0}")
    public static Collection<Object[]> parameters() {
        return Arrays.asList(new Object[]{false}, new Object[]{true});
    }

    public ResetPartitionTimeIntegrationTest(boolean z) {
        this.eosEnabled = z;
    }

    @Test
    public void shouldPreservePartitionTimeOnKafkaStreamRestart() throws Exception {
        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, "input", "output-raw");
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.stream("input", Consumed.with(STRING_SERDE, STRING_SERDE)).to("output-raw");
        Properties properties = new Properties();
        properties.put("default.timestamp.extractor", MaxTimestampExtractor.class);
        properties.put("application.id", "appId");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("poll.ms", Integer.toString(DEFAULT_TIMEOUT));
        properties.put("commit.interval.ms", Integer.toString(DEFAULT_TIMEOUT));
        properties.put("processing.guarantee", this.eosEnabled ? "exactly_once" : "at_least_once");
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        KafkaStreams startedStreams = IntegrationTestUtils.getStartedStreams(properties, streamsBuilder, true);
        try {
            produceSynchronouslyToPartitionZero("input", Arrays.asList(new KeyValueTimestamp("k3", "v3", 5000L)));
            verifyOutput("output-raw", Arrays.asList(new KeyValueTimestamp("k3", "v3", 5000L)));
            MatcherAssert.assertThat(Long.valueOf(lastRecordedTimestamp), CoreMatchers.is(-1L));
            lastRecordedTimestamp = -2L;
            startedStreams.close();
            MatcherAssert.assertThat(startedStreams.state(), CoreMatchers.is(KafkaStreams.State.NOT_RUNNING));
            startedStreams = IntegrationTestUtils.getStartedStreams(properties, streamsBuilder, true);
            produceSynchronouslyToPartitionZero("input", Arrays.asList(new KeyValueTimestamp("k5", "v5", 4999L)));
            verifyOutput("output-raw", Arrays.asList(new KeyValueTimestamp("k5", "v5", 4999L)));
            MatcherAssert.assertThat(Long.valueOf(lastRecordedTimestamp), CoreMatchers.is(5000L));
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
        } catch (Throwable th) {
            startedStreams.close();
            IntegrationTestUtils.cleanStateAfterTest(CLUSTER, startedStreams);
            throw th;
        }
    }

    private void verifyOutput(String str, List<KeyValueTimestamp<String, String>> list) {
        IntegrationTestUtils.verifyKeyValueTimestamps(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("group.id", "test-group"), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers()), Utils.mkEntry("key.deserializer", STRING_DESERIALIZER.getClass().getName()), Utils.mkEntry("value.deserializer", STRING_DESERIALIZER.getClass().getName())})), str, list);
    }

    private static void produceSynchronouslyToPartitionZero(String str, List<KeyValueTimestamp<String, String>> list) {
        IntegrationTestUtils.produceSynchronously(Utils.mkProperties(Utils.mkMap(new Map.Entry[]{Utils.mkEntry("client.id", "anything"), Utils.mkEntry("key.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("value.serializer", STRING_SERIALIZER.getClass().getName()), Utils.mkEntry("bootstrap.servers", CLUSTER.bootstrapServers())})), false, str, Optional.of(0), list);
    }

    static {
        BROKER_CONFIG.put("transaction.state.log.replication.factor", (short) 1);
        BROKER_CONFIG.put("transaction.state.log.min.isr", Integer.valueOf(NUM_BROKERS));
        CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS, BROKER_CONFIG, 0L);
        STRING_DESERIALIZER = new StringDeserializer();
        STRING_SERIALIZER = new StringSerializer();
        STRING_SERDE = Serdes.String();
        lastRecordedTimestamp = -2L;
    }
}
