package org.apache.kafka.streams.integration;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.ChronoField;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.VersionedKeyQuery;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.VersionedRecordIterator;
import org.apache.kafka.test.IntegrationTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/IQv2VersionedStoreIntegrationTest.class */
public class IQv2VersionedStoreIntegrationTest {
    private static final int NUM_BROKERS = 1;
    private static final String INPUT_TOPIC_NAME = "input-topic";
    private static final String STORE_NAME = "versioned-store";
    private static final int NON_EXISTING_KEY = 3;
    private KafkaStreams kafkaStreams;
    private static final Duration HISTORY_RETENTION = Duration.ofDays(1);
    private static final Duration SEGMENT_INTERVAL = Duration.ofHours(1);
    private static final Instant BASE_TIMESTAMP = Instant.parse("2023-01-01T10:00:00.00Z");
    private static final Long BASE_TIMESTAMP_LONG = Long.valueOf(BASE_TIMESTAMP.getLong(ChronoField.INSTANT_SECONDS));
    private static final int RECORD_KEY = 2;
    private static final Integer[] RECORD_VALUES = {Integer.valueOf(RECORD_KEY), 20, 200, 2000};
    private static final Long[] RECORD_TIMESTAMPS = {BASE_TIMESTAMP_LONG, Long.valueOf(BASE_TIMESTAMP_LONG.longValue() + 10), Long.valueOf(BASE_TIMESTAMP_LONG.longValue() + 20), Long.valueOf(BASE_TIMESTAMP_LONG.longValue() + 30)};
    private static final int RECORD_NUMBER = RECORD_VALUES.length;
    private static final int LAST_INDEX = RECORD_NUMBER - 1;
    private static final Position INPUT_POSITION = Position.emptyPosition();
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1, Utils.mkProperties(Collections.singletonMap("auto.create.topics.enable", "true")));

    @BeforeClass
    public static void before() throws Exception {
        CLUSTER.start();
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], Integer.valueOf(RECORD_KEY), RECORD_VALUES[0])).get();
            kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[1], Integer.valueOf(RECORD_KEY), RECORD_VALUES[1])).get();
            kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[RECORD_KEY], Integer.valueOf(RECORD_KEY), RECORD_VALUES[RECORD_KEY])).get();
            kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[NON_EXISTING_KEY], Integer.valueOf(RECORD_KEY), RECORD_VALUES[NON_EXISTING_KEY])).get();
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 3L);
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }

    @Before
    public void beforeTest() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.table(INPUT_TOPIC_NAME, Materialized.as(Stores.persistentVersionedKeyValueStore(STORE_NAME, HISTORY_RETENTION, SEGMENT_INTERVAL)));
        Properties properties = new Properties();
        properties.put("application.id", "app");
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("default.key.serde", Serdes.IntegerSerde.class.getName());
        properties.put("default.value.serde", Serdes.IntegerSerde.class.getName());
        this.kafkaStreams = IntegrationTestUtils.getStartedStreams(properties, streamsBuilder, true);
    }

    @After
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close();
            this.kafkaStreams.cleanUp();
        }
    }

    @AfterClass
    public static void after() {
        CLUSTER.stop();
    }

    @Test
    public void verifyStore() {
        shouldHandleVersionedKeyQuery(Optional.empty(), RECORD_VALUES[NON_EXISTING_KEY], RECORD_TIMESTAMPS[NON_EXISTING_KEY], Optional.empty());
        shouldHandleVersionedKeyQuery(Optional.of(Instant.now()), RECORD_VALUES[NON_EXISTING_KEY], RECORD_TIMESTAMPS[NON_EXISTING_KEY], Optional.empty());
        shouldHandleVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[NON_EXISTING_KEY].longValue())), RECORD_VALUES[NON_EXISTING_KEY], RECORD_TIMESTAMPS[NON_EXISTING_KEY], Optional.empty());
        shouldHandleVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0].longValue())), RECORD_VALUES[0], RECORD_TIMESTAMPS[0], Optional.of(RECORD_TIMESTAMPS[1]));
        shouldVerifyGetNullForVersionedKeyQuery(Integer.valueOf(RECORD_KEY), Instant.ofEpochMilli(RECORD_TIMESTAMPS[0].longValue() - 50));
        shouldVerifyGetNullForVersionedKeyQuery(Integer.valueOf(NON_EXISTING_KEY), Instant.now());
        shouldHandleMultiVersionedKeyQuery(Optional.empty(), Optional.empty(), ResultOrder.ANY, 0, LAST_INDEX);
        shouldHandleMultiVersionedKeyQuery(Optional.empty(), Optional.empty(), ResultOrder.ASCENDING, 0, LAST_INDEX);
        shouldHandleMultiVersionedKeyQuery(Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[1].longValue() + 5)), Optional.of(Instant.now()), ResultOrder.ANY, 1, LAST_INDEX);
        shouldVerifyGetNullForMultiVersionedKeyQuery(Integer.valueOf(RECORD_KEY), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0].longValue() - 100)), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0].longValue() - 50)), ResultOrder.ANY);
        shouldVerifyGetNullForMultiVersionedKeyQuery(Integer.valueOf(RECORD_KEY), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0].longValue() - 100)), Optional.of(Instant.ofEpochMilli(RECORD_TIMESTAMPS[0].longValue() - 50)), ResultOrder.ASCENDING);
        shouldVerifyGetNullForMultiVersionedKeyQuery(Integer.valueOf(NON_EXISTING_KEY), Optional.empty(), Optional.empty(), ResultOrder.ANY);
        shouldVerifyGetNullForMultiVersionedKeyQuery(Integer.valueOf(NON_EXISTING_KEY), Optional.empty(), Optional.empty(), ResultOrder.ASCENDING);
        shouldHandleRaceCondition();
    }

    private void shouldHandleVersionedKeyQuery(Optional<Instant> optional, Integer num, Long l, Optional<Long> optional2) {
        QueryResult<VersionedRecord<Integer>> sendRequestAndReceiveResults = sendRequestAndReceiveResults(defineQuery(Integer.valueOf(RECORD_KEY), optional), this.kafkaStreams);
        if (sendRequestAndReceiveResults == null) {
            throw new AssertionError("The query returned null.");
        }
        if (sendRequestAndReceiveResults.isFailure()) {
            throw new AssertionError(sendRequestAndReceiveResults.toString());
        }
        if (sendRequestAndReceiveResults.getResult() == null) {
            throw new AssertionError("The query returned null.");
        }
        MatcherAssert.assertThat(Boolean.valueOf(sendRequestAndReceiveResults.isSuccess()), Matchers.is(true));
        VersionedRecord versionedRecord = (VersionedRecord) sendRequestAndReceiveResults.getResult();
        MatcherAssert.assertThat(versionedRecord.value(), Matchers.is(num));
        MatcherAssert.assertThat(Long.valueOf(versionedRecord.timestamp()), Matchers.is(l));
        MatcherAssert.assertThat(versionedRecord.validTo(), Matchers.is(optional2));
        MatcherAssert.assertThat(sendRequestAndReceiveResults.getExecutionInfo(), Matchers.is(Matchers.empty()));
    }

    private void shouldVerifyGetNullForVersionedKeyQuery(Integer num, Instant instant) {
        MatcherAssert.assertThat(sendRequestAndReceiveResults(defineQuery(num, Optional.of(instant)), this.kafkaStreams), CoreMatchers.nullValue());
    }

    private void shouldHandleMultiVersionedKeyQuery(Optional<Instant> optional, Optional<Instant> optional2, ResultOrder resultOrder, int i, int i2) {
        for (Map.Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>> entry : sendRequestAndReceiveResults(defineQuery(Integer.valueOf(RECORD_KEY), optional, optional2, resultOrder), this.kafkaStreams).entrySet()) {
            verifyPartitionResult(entry.getValue());
            VersionedRecordIterator versionedRecordIterator = (VersionedRecordIterator) entry.getValue().getResult();
            Throwable th = null;
            try {
                try {
                    int i3 = resultOrder.equals(ResultOrder.ASCENDING) ? 0 : i2;
                    int i4 = 0;
                    while (versionedRecordIterator.hasNext()) {
                        VersionedRecord versionedRecord = (VersionedRecord) versionedRecordIterator.next();
                        Long valueOf = Long.valueOf(versionedRecord.timestamp());
                        Optional validTo = versionedRecord.validTo();
                        Integer num = (Integer) versionedRecord.value();
                        Optional of = i3 < i2 ? Optional.of(RECORD_TIMESTAMPS[i3 + 1]) : Optional.empty();
                        MatcherAssert.assertThat(num, Matchers.is(RECORD_VALUES[i3]));
                        MatcherAssert.assertThat(valueOf, Matchers.is(RECORD_TIMESTAMPS[i3]));
                        MatcherAssert.assertThat(validTo, Matchers.is(of));
                        i3 = resultOrder.equals(ResultOrder.ASCENDING) ? i3 + 1 : i3 - 1;
                        i4++;
                    }
                    MatcherAssert.assertThat(Integer.valueOf(i4), Matchers.equalTo(Integer.valueOf((i2 - i) + 1)));
                    if (versionedRecordIterator != null) {
                        if (0 != 0) {
                            try {
                                versionedRecordIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            versionedRecordIterator.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (versionedRecordIterator != null) {
                    if (th != null) {
                        try {
                            versionedRecordIterator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        versionedRecordIterator.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void shouldVerifyGetNullForMultiVersionedKeyQuery(Integer num, Optional<Instant> optional, Optional<Instant> optional2, ResultOrder resultOrder) {
        Iterator<Map.Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>>> it = sendRequestAndReceiveResults(defineQuery(num, optional, optional2, resultOrder), this.kafkaStreams).entrySet().iterator();
        while (it.hasNext()) {
            VersionedRecordIterator versionedRecordIterator = (VersionedRecordIterator) it.next().getValue().getResult();
            Throwable th = null;
            try {
                try {
                    Assert.assertFalse(versionedRecordIterator.hasNext());
                    if (versionedRecordIterator != null) {
                        if (0 != 0) {
                            try {
                                versionedRecordIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            versionedRecordIterator.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th3) {
                if (versionedRecordIterator != null) {
                    if (th != null) {
                        try {
                            versionedRecordIterator.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        versionedRecordIterator.close();
                    }
                }
                throw th3;
            }
        }
    }

    private void shouldHandleRaceCondition() {
        Iterator<Map.Entry<Integer, QueryResult<VersionedRecordIterator<Integer>>>> it = sendRequestAndReceiveResults(defineQuery(Integer.valueOf(RECORD_KEY), Optional.empty(), Optional.empty(), ResultOrder.ANY), this.kafkaStreams).entrySet().iterator();
        while (it.hasNext()) {
            VersionedRecordIterator versionedRecordIterator = (VersionedRecordIterator) it.next().getValue().getResult();
            Throwable th = null;
            try {
                try {
                    int i = LAST_INDEX;
                    int i2 = 0;
                    while (versionedRecordIterator.hasNext()) {
                        VersionedRecord versionedRecord = (VersionedRecord) versionedRecordIterator.next();
                        Long valueOf = Long.valueOf(versionedRecord.timestamp());
                        Optional validTo = versionedRecord.validTo();
                        Integer num = (Integer) versionedRecord.value();
                        Optional of = i < LAST_INDEX ? Optional.of(RECORD_TIMESTAMPS[i + 1]) : Optional.empty();
                        MatcherAssert.assertThat(num, Matchers.is(RECORD_VALUES[i]));
                        MatcherAssert.assertThat(valueOf, Matchers.is(RECORD_TIMESTAMPS[i]));
                        MatcherAssert.assertThat(validTo, Matchers.is(of));
                        i--;
                        i2++;
                        if (i == RECORD_KEY) {
                            break;
                        }
                    }
                    updateRecordValue();
                    while (versionedRecordIterator.hasNext()) {
                        VersionedRecord versionedRecord2 = (VersionedRecord) versionedRecordIterator.next();
                        Long valueOf2 = Long.valueOf(versionedRecord2.timestamp());
                        Optional validTo2 = versionedRecord2.validTo();
                        Integer num2 = (Integer) versionedRecord2.value();
                        Optional of2 = Optional.of(RECORD_TIMESTAMPS[i + 1]);
                        MatcherAssert.assertThat(num2, Matchers.is(RECORD_VALUES[i]));
                        MatcherAssert.assertThat(valueOf2, Matchers.is(RECORD_TIMESTAMPS[i]));
                        MatcherAssert.assertThat(validTo2, Matchers.is(of2));
                        i--;
                        i2++;
                    }
                    MatcherAssert.assertThat(Integer.valueOf(i2), Matchers.equalTo(Integer.valueOf(RECORD_NUMBER)));
                    if (versionedRecordIterator != null) {
                        if (0 != 0) {
                            try {
                                versionedRecordIterator.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            versionedRecordIterator.close();
                        }
                    }
                } catch (Throwable th3) {
                    if (versionedRecordIterator != null) {
                        if (th != null) {
                            try {
                                versionedRecordIterator.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            versionedRecordIterator.close();
                        }
                    }
                    throw th3;
                }
            } finally {
            }
        }
    }

    private static VersionedKeyQuery<Integer, Integer> defineQuery(Integer num, Optional<Instant> optional) {
        VersionedKeyQuery<Integer, Integer> withKey = VersionedKeyQuery.withKey(num);
        if (optional.isPresent()) {
            withKey = withKey.asOf(optional.get());
        }
        return withKey;
    }

    private static MultiVersionedKeyQuery<Integer, Integer> defineQuery(Integer num, Optional<Instant> optional, Optional<Instant> optional2, ResultOrder resultOrder) {
        MultiVersionedKeyQuery<Integer, Integer> withKey = MultiVersionedKeyQuery.withKey(num);
        if (optional.isPresent()) {
            withKey = withKey.fromTime(optional.get());
        }
        if (optional2.isPresent()) {
            withKey = withKey.toTime(optional2.get());
        }
        if (resultOrder.equals(ResultOrder.ASCENDING)) {
            withKey = withKey.withAscendingTimestamps();
        }
        return withKey;
    }

    private static Map<Integer, QueryResult<VersionedRecordIterator<Integer>>> sendRequestAndReceiveResults(MultiVersionedKeyQuery<Integer, Integer> multiVersionedKeyQuery, KafkaStreams kafkaStreams) {
        return IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(multiVersionedKeyQuery).withPositionBound(PositionBound.at(INPUT_POSITION))).getPartitionResults();
    }

    private static QueryResult<VersionedRecord<Integer>> sendRequestAndReceiveResults(VersionedKeyQuery<Integer, Integer> versionedKeyQuery, KafkaStreams kafkaStreams) {
        return IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(versionedKeyQuery).withPositionBound(PositionBound.at(INPUT_POSITION))).getOnlyPartitionResult();
    }

    private static void verifyPartitionResult(QueryResult<VersionedRecordIterator<Integer>> queryResult) {
        MatcherAssert.assertThat(queryResult.getExecutionInfo(), Matchers.is(Matchers.empty()));
        if (queryResult.isFailure()) {
            throw new AssertionError(queryResult.toString());
        }
        MatcherAssert.assertThat(Boolean.valueOf(queryResult.isSuccess()), Matchers.is(true));
        queryResult.getClass();
        Assert.assertThrows(IllegalArgumentException.class, queryResult::getFailureReason);
        queryResult.getClass();
        Assert.assertThrows(IllegalArgumentException.class, queryResult::getFailureMessage);
    }

    private static void updateRecordValue() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("key.serializer", IntegerSerializer.class);
        properties.put("value.serializer", IntegerSerializer.class);
        KafkaProducer kafkaProducer = new KafkaProducer(properties);
        Throwable th = null;
        try {
            kafkaProducer.send(new ProducerRecord(INPUT_TOPIC_NAME, 0, RECORD_TIMESTAMPS[0], Integer.valueOf(RECORD_KEY), 999999));
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            INPUT_POSITION.withComponent(INPUT_TOPIC_NAME, 0, 4L);
            MatcherAssert.assertThat(INPUT_POSITION, Matchers.equalTo(Position.emptyPosition().withComponent(INPUT_TOPIC_NAME, 0, 4L)));
            Properties properties2 = new Properties();
            properties2.setProperty("bootstrap.servers", CLUSTER.bootstrapServers());
            properties2.setProperty("key.deserializer", IntegerDeserializer.class.getName());
            properties2.setProperty("value.deserializer", IntegerDeserializer.class.getName());
            properties2.setProperty("group.id", "foo");
            properties2.setProperty("auto.offset.reset", "earliest");
            try {
                IntegrationTestUtils.waitUntilMinRecordsReceived(properties2, INPUT_TOPIC_NAME, RECORD_NUMBER + 1);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th3) {
            if (kafkaProducer != null) {
                if (0 != 0) {
                    try {
                        kafkaProducer.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    kafkaProducer.close();
                }
            }
            throw th3;
        }
    }
}
