package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Produced;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.internals.VersionedKeyValueToBytesStoreAdapter;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.jupiter.api.Tag;
import org.junit.rules.TestName;

@Tag("bazel:shard_count:2")
@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest.class */
public class VersionedKeyValueStoreIntegrationTest {
    private static final String STORE_NAME = "versioned-store";
    private static final long HISTORY_RETENTION = 3600000;
    private String inputStream;
    private String globalTableTopic;
    private String outputStream;
    private long baseTimestamp;
    private KafkaStreams kafkaStreams;
    private static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @Rule
    public TestName testName = new TestName();

    /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$CustomIQv2VersionedStoreSupplier.class */
    private static class CustomIQv2VersionedStoreSupplier implements VersionedBytesStoreSupplier {

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$CustomIQv2VersionedStoreSupplier$CustomIQv2VersionedStore.class */
        public static class CustomIQv2VersionedStore implements VersionedKeyValueStore<Bytes, byte[]> {
            private CustomIQv2VersionedStore() {
            }

            public <R> QueryResult<R> query(Query<R> query, PositionBound positionBound, QueryConfig queryConfig) {
                if (query instanceof TestQuery) {
                    return QueryResult.forResult("success");
                }
                throw new UnsupportedOperationException();
            }

            public long put(Bytes bytes, byte[] bArr, long j) {
                throw new UnsupportedOperationException();
            }

            public VersionedRecord<byte[]> delete(Bytes bytes, long j) {
                throw new UnsupportedOperationException();
            }

            public VersionedRecord<byte[]> get(Bytes bytes) {
                throw new UnsupportedOperationException();
            }

            public VersionedRecord<byte[]> get(Bytes bytes, long j) {
                throw new UnsupportedOperationException();
            }

            public String name() {
                return VersionedKeyValueStoreIntegrationTest.STORE_NAME;
            }

            @Deprecated
            public void init(ProcessorContext processorContext, StateStore stateStore) {
                throw new UnsupportedOperationException();
            }

            public void init(StateStoreContext stateStoreContext, StateStore stateStore) {
                stateStoreContext.register(stateStore, (bArr, bArr2) -> {
                });
            }

            public void flush() {
            }

            public void close() {
            }

            public boolean persistent() {
                return false;
            }

            public boolean isOpen() {
                return true;
            }
        }

        private CustomIQv2VersionedStoreSupplier() {
        }

        public String name() {
            return VersionedKeyValueStoreIntegrationTest.STORE_NAME;
        }

        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public KeyValueStore<Bytes, byte[]> m88get() {
            return new VersionedKeyValueToBytesStoreAdapter(new CustomIQv2VersionedStore());
        }

        public String metricsScope() {
            return "metrics-scope";
        }

        public long historyRetentionMs() {
            return VersionedKeyValueStoreIntegrationTest.HISTORY_RETENTION;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$DataTracker.class */
    public static class DataTracker {
        static final String DELETE_VALUE_KEYWORD = "delete";
        final Map<Integer, Map<Long, Optional<String>>> data;

        private DataTracker() {
            this.data = new HashMap();
        }

        void add(Integer num, long j, String str) {
            this.data.computeIfAbsent(num, num2 -> {
                return new HashMap();
            });
            if (DELETE_VALUE_KEYWORD.equals(str)) {
                this.data.get(num).put(Long.valueOf(j), Optional.empty());
            } else {
                this.data.get(num).put(Long.valueOf(j), Optional.ofNullable(str));
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$KeyValueStoreContentCheckerProcessor.class */
    private static class KeyValueStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<Integer, Integer> context;
        private KeyValueStore<Integer, String> store;
        private final Map<Integer, Optional<String>> data = new HashMap();

        KeyValueStoreContentCheckerProcessor() {
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Integer, Integer> processorContext) {
            this.context = processorContext;
            this.store = processorContext.getStateStore(VersionedKeyValueStoreIntegrationTest.STORE_NAME);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void process(Record<Integer, String> record) {
            if ("delete".equals(record.value())) {
                throw new IllegalArgumentException("Using 'delete' keyword for KeyValueStoreContentCheckerProcessor will result in the record timestamp being ignored. Use regular put with null value instead.");
            }
            this.store.put(record.key(), record.value());
            this.data.put(record.key(), Optional.ofNullable(record.value()));
            this.context.forward(record.withValue(Integer.valueOf(checkStoreContents())));
        }

        private int checkStoreContents() {
            int i = 0;
            for (Map.Entry<Integer, Optional<String>> entry : this.data.entrySet()) {
                if (!Objects.equals((String) this.store.get(entry.getKey()), entry.getValue().orElse(null))) {
                    i++;
                }
            }
            return i;
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$TestQuery.class */
    private static class TestQuery implements Query<String> {
        private TestQuery() {
        }
    }

    /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$TimestampedStoreContentCheckerProcessor.class */
    private static class TimestampedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<Integer, Integer> context;
        private TimestampedKeyValueStore<Integer, String> store;
        private final Map<Integer, Optional<ValueAndTimestamp<String>>> data = new HashMap();

        TimestampedStoreContentCheckerProcessor() {
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Integer, Integer> processorContext) {
            this.context = processorContext;
            this.store = processorContext.getStateStore(VersionedKeyValueStoreIntegrationTest.STORE_NAME);
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void process(Record<Integer, String> record) {
            if ("delete".equals(record.value())) {
                throw new IllegalArgumentException("Using 'delete' keyword for TimestampedStoreContentCheckerProcessor will result in the record timestamp being ignored. Use regular put with null value instead.");
            }
            ValueAndTimestamp make = ValueAndTimestamp.make(record.value(), record.timestamp());
            this.store.put(record.key(), make);
            this.data.put(record.key(), Optional.ofNullable(make));
            this.context.forward(record.withValue(Integer.valueOf(checkStoreContents())));
        }

        private int checkStoreContents() {
            int i = 0;
            for (Map.Entry<Integer, Optional<ValueAndTimestamp<String>>> entry : this.data.entrySet()) {
                if (!Objects.equals((ValueAndTimestamp) this.store.get(entry.getKey()), entry.getValue().orElse(null))) {
                    i++;
                }
            }
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/VersionedKeyValueStoreIntegrationTest$VersionedStoreContentCheckerProcessor.class */
    public static class VersionedStoreContentCheckerProcessor implements Processor<Integer, String, Integer, Integer> {
        private org.apache.kafka.streams.processor.api.ProcessorContext<Integer, Integer> context;
        private VersionedKeyValueStore<Integer, String> store;
        private final boolean writeToStore;
        private final DataTracker data;

        VersionedStoreContentCheckerProcessor(boolean z) {
            this(z, new DataTracker());
        }

        VersionedStoreContentCheckerProcessor(boolean z, DataTracker dataTracker) {
            this.writeToStore = z;
            this.data = dataTracker;
        }

        public void init(org.apache.kafka.streams.processor.api.ProcessorContext<Integer, Integer> processorContext) {
            this.context = processorContext;
            this.store = processorContext.getStateStore(VersionedKeyValueStoreIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, String> record) {
            if (this.writeToStore) {
                if ("delete".equals(record.value())) {
                    this.store.delete(record.key(), record.timestamp());
                } else {
                    this.store.put(record.key(), record.value(), record.timestamp());
                }
                this.data.add((Integer) record.key(), record.timestamp(), (String) record.value());
            }
            this.context.forward(record.withValue(Integer.valueOf(checkStoreContents())));
        }

        private int checkStoreContents() {
            int i = 0;
            for (Map.Entry<Integer, Map<Long, Optional<String>>> entry : this.data.data.entrySet()) {
                Integer key = entry.getKey();
                long j = -1;
                String str = null;
                for (Map.Entry<Long, Optional<String>> entry2 : entry.getValue().entrySet()) {
                    Long key2 = entry2.getKey();
                    String orElse = entry2.getValue().orElse(null);
                    if (key2.longValue() > j) {
                        j = key2.longValue();
                        str = orElse;
                    }
                    if (!contentsMatch(this.store.get(key, key2.longValue()), orElse, key2.longValue())) {
                        i++;
                    }
                }
                if (!contentsMatch(this.store.get(key), str, j)) {
                    i++;
                }
            }
            return i;
        }

        private static boolean contentsMatch(VersionedRecord<String> versionedRecord, String str, long j) {
            return str == null ? versionedRecord == null : versionedRecord != null && str.equals(versionedRecord.value()) && j == versionedRecord.timestamp();
        }
    }

    @BeforeClass
    public static void before() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void after() {
        CLUSTER.stop();
    }

    @Before
    public void beforeTest() throws InterruptedException {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        this.inputStream = "input-stream-" + safeUniqueTestName;
        this.globalTableTopic = "global-table-" + safeUniqueTestName;
        this.outputStream = "output-stream-" + safeUniqueTestName;
        CLUSTER.createTopic(this.inputStream);
        CLUSTER.createTopic(this.outputStream);
        this.baseTimestamp = CLUSTER.time.milliseconds();
    }

    @After
    public void afterTest() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
            this.kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldPutGetAndDelete() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMillis(HISTORY_RETENTION)), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(() -> {
            return new VersionedStoreContentCheckerProcessor(true);
        }, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props());
        this.kafkaStreams.start();
        Iterator it = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, 0 + produceDataToTopic(this.inputStream, this.baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, this.baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, (Object) null), KeyValue.pair(3, "c5")) + produceDataToTopic(this.inputStream, this.baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, this.baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, this.baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete"))).iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(((KeyValue) it.next()).value, CoreMatchers.equalTo(0));
        }
    }

    @Test
    public void shouldSetChangelogTopicProperties() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMillis(HISTORY_RETENTION)), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(() -> {
            return new VersionedStoreContentCheckerProcessor(false);
        }, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        Properties props = props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        produceDataToTopic(this.inputStream, this.baseTimestamp, KeyValue.pair(0, "foo"));
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, 1);
        Properties logConfig = CLUSTER.getLogConfig(props.getProperty("application.id") + "-versioned-store-changelog");
        MatcherAssert.assertThat(logConfig.getProperty("cleanup.policy"), CoreMatchers.equalTo("compact"));
        MatcherAssert.assertThat(logConfig.getProperty("min.compaction.lag.ms"), CoreMatchers.equalTo(Long.toString(90000000L)));
    }

    @Test
    public void shouldRestore() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMillis(HISTORY_RETENTION)), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(() -> {
            return new VersionedStoreContentCheckerProcessor(true);
        }, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        Properties props = props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        DataTracker dataTracker = new DataTracker();
        int produceDataToTopic = 0 + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, (Object) null), KeyValue.pair(3, "c5")) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 5, KeyValue.pair(1, "a5_new"), KeyValue.pair(2, "b5"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 7, KeyValue.pair(1, "delete"), KeyValue.pair(2, "delete"), KeyValue.pair(3, "delete")) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 10, KeyValue.pair(1, "a10"), KeyValue.pair(2, "b10"), KeyValue.pair(3, "c10"));
        IntegrationTestUtils.waitUntilMinRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, produceDataToTopic);
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.addStateStore(Stores.versionedKeyValueStoreBuilder(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMillis(HISTORY_RETENTION)), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(() -> {
            return new VersionedStoreContentCheckerProcessor(true, dataTracker);
        }, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        this.kafkaStreams = new KafkaStreams(streamsBuilder2.build(), props);
        this.kafkaStreams.start();
        Iterator it = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, produceDataToTopic + produceDataToTopic(this.inputStream, this.baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12"))).iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(((KeyValue) it.next()).value, CoreMatchers.equalTo(0));
        }
    }

    @Test
    public void shouldAllowCustomIQv2ForCustomStoreImplementations() {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder(new CustomIQv2VersionedStoreSupplier(), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(() -> {
            return new VersionedStoreContentCheckerProcessor(false);
        }, new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props());
        this.kafkaStreams.start();
        MatcherAssert.assertThat(IntegrationTestUtils.iqv2WaitForResult(this.kafkaStreams, StateQueryRequest.inStore(STORE_NAME).withQuery(new TestQuery()).withPartitions(Collections.singleton(0))).getOnlyPartitionResult().getResult(), CoreMatchers.equalTo("success"));
    }

    @Test
    public void shouldManualUpgradeFromNonVersionedTimestampedToVersioned() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.timestampedKeyValueStoreBuilder(Stores.persistentTimestampedKeyValueStore(STORE_NAME), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(TimestampedStoreContentCheckerProcessor::new, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        shouldManualUpgradeFromNonVersionedToVersioned(streamsBuilder.build());
    }

    @Test
    public void shouldManualUpgradeFromNonVersionedNonTimestampedToVersioned() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_NAME), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(KeyValueStoreContentCheckerProcessor::new, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        shouldManualUpgradeFromNonVersionedToVersioned(streamsBuilder.build());
    }

    private void shouldManualUpgradeFromNonVersionedToVersioned(Topology topology) throws Exception {
        Properties props = props();
        props.put("min.compaction.lag.ms", Long.valueOf(IntegrationTestUtils.DEFAULT_TIMEOUT));
        this.kafkaStreams = new KafkaStreams(topology, props);
        this.kafkaStreams.start();
        DataTracker dataTracker = new DataTracker();
        int produceDataToTopic = 0 + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp, KeyValue.pair(1, "a0"), KeyValue.pair(2, "b0"), KeyValue.pair(3, (Object) null)) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 5, KeyValue.pair(1, "a5"), KeyValue.pair(2, (Object) null), KeyValue.pair(3, "c5")) + produceDataToTopic(this.inputStream, dataTracker, this.baseTimestamp + 2, KeyValue.pair(1, "a2"), KeyValue.pair(2, "b2"), KeyValue.pair(3, (Object) null));
        Iterator it = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, produceDataToTopic).iterator();
        while (it.hasNext()) {
            MatcherAssert.assertThat(((KeyValue) it.next()).value, CoreMatchers.equalTo(0));
        }
        this.kafkaStreams.close();
        this.kafkaStreams.cleanUp();
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.versionedKeyValueStoreBuilder(Stores.persistentVersionedKeyValueStore(STORE_NAME, Duration.ofMillis(HISTORY_RETENTION)), Serdes.Integer(), Serdes.String())).stream(this.inputStream, Consumed.with(Serdes.Integer(), Serdes.String())).process(() -> {
            return new VersionedStoreContentCheckerProcessor(true, dataTracker);
        }, new String[]{STORE_NAME}).to(this.outputStream, Produced.with(Serdes.Integer(), Serdes.Integer()));
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        Iterator it2 = IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(TestUtils.consumerConfig(CLUSTER.bootstrapServers(), IntegerDeserializer.class, IntegerDeserializer.class), this.outputStream, produceDataToTopic + produceDataToTopic(this.inputStream, this.baseTimestamp + 12, KeyValue.pair(1, "a12"), KeyValue.pair(2, "b12"), KeyValue.pair(3, "c12"))).iterator();
        while (it2.hasNext()) {
            MatcherAssert.assertThat(((KeyValue) it2.next()).value, CoreMatchers.equalTo(0));
        }
    }

    private Properties props() {
        String safeUniqueTestName = IntegrationTestUtils.safeUniqueTestName(this.testName);
        Properties properties = new Properties();
        properties.put("application.id", "app-" + safeUniqueTestName);
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("commit.interval.ms", 1000L);
        properties.put("auto.offset.reset", "earliest");
        return properties;
    }

    @SafeVarargs
    private final int produceDataToTopic(String str, long j, KeyValue<Integer, String>... keyValueArr) {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(str, Arrays.asList(keyValueArr), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, StringSerializer.class), Long.valueOf(j));
        return keyValueArr.length;
    }

    @SafeVarargs
    private final int produceDataToTopic(String str, DataTracker dataTracker, long j, KeyValue<Integer, String>... keyValueArr) {
        produceDataToTopic(str, j, keyValueArr);
        for (KeyValue<Integer, String> keyValue : keyValueArr) {
            dataTracker.add((Integer) keyValue.key, j, (String) keyValue.value);
        }
        return keyValueArr.length;
    }
}
