package org.apache.kafka.streams.integration;

import java.io.IOException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.ReadOnlyWindowStore;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;

@Category({IntegrationTest.class})
/* loaded from: input_file:org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.class */
public class StoreUpgradeIntegrationTest {
    private static final String STORE_NAME = "store";
    private String inputStream;
    private KafkaStreams kafkaStreams;
    public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(1);

    @Rule
    public TestName testName = new TestName();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest$KeyValueProcessor.class */
    public static class KeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
        private KeyValueStore<Integer, Long> store;

        private KeyValueProcessor() {
        }

        public void init(ProcessorContext<Void, Void> processorContext) {
            this.store = processorContext.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            Long l = (Long) this.store.get(record.key());
            this.store.put(record.key(), Long.valueOf(l != null ? l.longValue() + 1 : 1L));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest$TimestampedKeyValueProcessor.class */
    public static class TimestampedKeyValueProcessor implements Processor<Integer, Integer, Void, Void> {
        private TimestampedKeyValueStore<Integer, Long> store;

        private TimestampedKeyValueProcessor() {
        }

        public void init(ProcessorContext<Void, Void> processorContext) {
            this.store = processorContext.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            long longValue;
            long max;
            ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) this.store.get(record.key());
            if (valueAndTimestamp == null) {
                longValue = 1;
                max = record.timestamp();
            } else {
                longValue = ((Long) valueAndTimestamp.value()).longValue() + 1;
                max = Math.max(valueAndTimestamp.timestamp(), record.timestamp());
            }
            this.store.put(record.key(), ValueAndTimestamp.make(Long.valueOf(longValue), max));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest$TimestampedWindowedProcessor.class */
    public static class TimestampedWindowedProcessor implements Processor<Integer, Integer, Void, Void> {
        private TimestampedWindowStore<Integer, Long> store;

        private TimestampedWindowedProcessor() {
        }

        public void init(ProcessorContext<Void, Void> processorContext) {
            this.store = processorContext.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            long longValue;
            long max;
            ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) this.store.fetch(record.key(), ((Integer) record.key()).intValue() < 10 ? 0L : 100000L);
            if (valueAndTimestamp == null) {
                longValue = 1;
                max = record.timestamp();
            } else {
                longValue = ((Long) valueAndTimestamp.value()).longValue() + 1;
                max = Math.max(valueAndTimestamp.timestamp(), record.timestamp());
            }
            this.store.put(record.key(), ValueAndTimestamp.make(Long.valueOf(longValue), max), ((Integer) record.key()).intValue() < 10 ? 0L : 100000L);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest$WindowedProcessor.class */
    public static class WindowedProcessor implements Processor<Integer, Integer, Void, Void> {
        private WindowStore<Integer, Long> store;

        private WindowedProcessor() {
        }

        public void init(ProcessorContext<Void, Void> processorContext) {
            this.store = processorContext.getStateStore(StoreUpgradeIntegrationTest.STORE_NAME);
        }

        public void process(Record<Integer, Integer> record) {
            Long l = (Long) this.store.fetch(record.key(), ((Integer) record.key()).intValue() < 10 ? 0L : 100000L);
            this.store.put(record.key(), Long.valueOf(l != null ? l.longValue() + 1 : 1L), ((Integer) record.key()).intValue() < 10 ? 0L : 100000L);
        }
    }

    @BeforeClass
    public static void startCluster() throws IOException {
        CLUSTER.start();
    }

    @AfterClass
    public static void closeCluster() {
        CLUSTER.stop();
    }

    @Before
    public void createTopics() throws Exception {
        this.inputStream = "input-stream-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName);
        CLUSTER.createTopic(this.inputStream);
    }

    private Properties props() {
        Properties properties = new Properties();
        properties.put("application.id", "app-" + IntegrationTestUtils.safeUniqueTestName(getClass(), this.testName));
        properties.put("bootstrap.servers", CLUSTER.bootstrapServers());
        properties.put("statestore.cache.max.bytes", 0);
        properties.put("state.dir", TestUtils.tempDirectory().getPath());
        properties.put("default.key.serde", Serdes.Integer().getClass());
        properties.put("default.value.serde", Serdes.Integer().getClass());
        properties.put("commit.interval.ms", 1000L);
        properties.put("auto.offset.reset", "earliest");
        return properties;
    }

    @After
    public void shutdown() {
        if (this.kafkaStreams != null) {
            this.kafkaStreams.close(Duration.ofSeconds(30L));
            this.kafkaStreams.cleanUp();
        }
    }

    @Test
    public void shouldMigrateInMemoryKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
        shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(false);
    }

    @Test
    public void shouldMigratePersistentKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
        shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(true);
    }

    private void shouldMigrateKeyValueStoreToTimestampedKeyValueStoreUsingPapi(boolean z) throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(z ? Stores.persistentKeyValueStore(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new KeyValueProcessor();
        }, new String[]{STORE_NAME});
        Properties props = props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(1, 1L)));
        processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(1, 2L)));
        long milliseconds = z ? -1L : CLUSTER.time.milliseconds() - 1;
        processKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L)));
        long milliseconds2 = z ? -1L : CLUSTER.time.milliseconds() - 1;
        processKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L)));
        long milliseconds3 = z ? -1L : CLUSTER.time.milliseconds() - 1;
        processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L), KeyValue.pair(4, 1L)));
        processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L), KeyValue.pair(4, 2L)));
        processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L), KeyValue.pair(4, 3L)));
        long milliseconds4 = z ? -1L : CLUSTER.time.milliseconds() - 1;
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.addStateStore(Stores.timestampedKeyValueStoreBuilder(z ? Stores.persistentTimestampedKeyValueStore(STORE_NAME) : Stores.inMemoryKeyValueStore(STORE_NAME), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new TimestampedKeyValueProcessor();
        }, new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilder2.build(), props);
        this.kafkaStreams.start();
        verifyCountWithTimestamp(1, 2L, milliseconds);
        verifyCountWithTimestamp(2, 1L, milliseconds2);
        verifyCountWithTimestamp(3, 1L, milliseconds3);
        verifyCountWithTimestamp(4, 3L, milliseconds4);
        long milliseconds5 = CLUSTER.time.milliseconds();
        processKeyValueAndVerifyCountWithTimestamp(1, milliseconds5 + 42, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(2, ValueAndTimestamp.make(1L, milliseconds2)), KeyValue.pair(3, ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(4, ValueAndTimestamp.make(3L, milliseconds4))));
        processKeyValueAndVerifyCountWithTimestamp(2, milliseconds5 + 45, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(2, ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(3, ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(4, ValueAndTimestamp.make(3L, milliseconds4))));
        processKeyValueAndVerifyCountWithTimestamp(4, milliseconds5 + 21, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(2, ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(3, ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(4, ValueAndTimestamp.make(4L, milliseconds5 + 21))));
        processKeyValueAndVerifyCountWithTimestamp(4, milliseconds5 + 42, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(2, ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(3, ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(4, ValueAndTimestamp.make(5L, milliseconds5 + 42))));
        processKeyValueAndVerifyCountWithTimestamp(4, milliseconds5 + 10, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(2, ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(3, ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(4, ValueAndTimestamp.make(6L, milliseconds5 + 42))));
        this.kafkaStreams.close();
    }

    @Test
    public void shouldProxyKeyValueStoreToTimestampedKeyValueStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_NAME), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new KeyValueProcessor();
        }, new String[]{STORE_NAME});
        Properties props = props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(1, 1L)));
        processKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(1, 2L)));
        processKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L)));
        processKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L)));
        processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L), KeyValue.pair(4, 1L)));
        processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L), KeyValue.pair(4, 2L)));
        processKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(1, 2L), KeyValue.pair(2, 1L), KeyValue.pair(3, 1L), KeyValue.pair(4, 3L)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.addStateStore(Stores.timestampedKeyValueStoreBuilder(Stores.persistentKeyValueStore(STORE_NAME), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new TimestampedKeyValueProcessor();
        }, new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilder2.build(), props);
        this.kafkaStreams.start();
        verifyCountWithSurrogateTimestamp(1, 2L);
        verifyCountWithSurrogateTimestamp(2, 1L);
        verifyCountWithSurrogateTimestamp(3, 1L);
        verifyCountWithSurrogateTimestamp(4, 3L);
        processKeyValueAndVerifyCount(1, 42L, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(2, ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(4, ValueAndTimestamp.make(3L, -1L))));
        processKeyValueAndVerifyCount(2, 45L, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(4, ValueAndTimestamp.make(3L, -1L))));
        processKeyValueAndVerifyCount(4, 21L, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(4, ValueAndTimestamp.make(4L, -1L))));
        processKeyValueAndVerifyCount(4, 42L, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(4, ValueAndTimestamp.make(5L, -1L))));
        processKeyValueAndVerifyCount(4, 10L, Arrays.asList(KeyValue.pair(1, ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(2, ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(3, ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(4, ValueAndTimestamp.make(6L, -1L))));
        this.kafkaStreams.close();
    }

    private <K, V> void processKeyValueAndVerifyPlainCount(K k, List<KeyValue<Integer, Object>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputStream, Collections.singletonList(KeyValue.pair(k, 0)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), CLUSTER.time);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.keyValueStore());
                if (readOnlyKeyValueStore == null) {
                    return false;
                }
                KeyValueIterator all = readOnlyKeyValueStore.all();
                Throwable th = null;
                try {
                    try {
                        LinkedList linkedList = new LinkedList();
                        while (all.hasNext()) {
                            linkedList.add(all.next());
                        }
                        boolean equals = linkedList.equals(list);
                        if (all != null) {
                            if (0 != 0) {
                                try {
                                    all.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                all.close();
                            }
                        }
                        return equals;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K> void verifyCountWithTimestamp(K k, long j, long j2) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (readOnlyKeyValueStore == null) {
                    return false;
                }
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) readOnlyKeyValueStore.get(k);
                if (((Long) valueAndTimestamp.value()).longValue() == j) {
                    if (valueAndTimestamp.timestamp() == j2) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K> void verifyCountWithSurrogateTimestamp(K k, long j) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (readOnlyKeyValueStore == null) {
                    return false;
                }
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) readOnlyKeyValueStore.get(k);
                if (((Long) valueAndTimestamp.value()).longValue() == j) {
                    if (valueAndTimestamp.timestamp() == -1) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K, V> void processKeyValueAndVerifyCount(K k, long j, List<KeyValue<Integer, Object>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputStream, Collections.singletonList(KeyValue.pair(k, 0)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), Long.valueOf(j));
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (readOnlyKeyValueStore == null) {
                    return false;
                }
                KeyValueIterator all = readOnlyKeyValueStore.all();
                Throwable th = null;
                try {
                    try {
                        LinkedList linkedList = new LinkedList();
                        while (all.hasNext()) {
                            linkedList.add(all.next());
                        }
                        boolean equals = linkedList.equals(list);
                        if (all != null) {
                            if (0 != 0) {
                                try {
                                    all.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                all.close();
                            }
                        }
                        return equals;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K, V> void processKeyValueAndVerifyCountWithTimestamp(K k, long j, List<KeyValue<Integer, Object>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputStream, Collections.singletonList(KeyValue.pair(k, 0)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), Long.valueOf(j));
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyKeyValueStore readOnlyKeyValueStore = (ReadOnlyKeyValueStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
                if (readOnlyKeyValueStore == null) {
                    return false;
                }
                KeyValueIterator all = readOnlyKeyValueStore.all();
                Throwable th = null;
                try {
                    try {
                        LinkedList linkedList = new LinkedList();
                        while (all.hasNext()) {
                            linkedList.add(all.next());
                        }
                        boolean equals = linkedList.equals(list);
                        if (all != null) {
                            if (0 != 0) {
                                try {
                                    all.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                all.close();
                            }
                        }
                        return equals;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    @Test
    public void shouldMigrateInMemoryWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.windowStoreBuilder(Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(1000L), Duration.ofMillis(1000L), false), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new WindowedProcessor();
        }, new String[]{STORE_NAME});
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.addStateStore(Stores.timestampedWindowStoreBuilder(Stores.inMemoryWindowStore(STORE_NAME, Duration.ofMillis(1000L), Duration.ofMillis(1000L), false), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new TimestampedWindowedProcessor();
        }, new String[]{STORE_NAME});
        shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(streamsBuilder, streamsBuilder2, false);
    }

    @Test
    public void shouldMigratePersistentWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(1000L), Duration.ofMillis(1000L), false), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new WindowedProcessor();
        }, new String[]{STORE_NAME});
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.addStateStore(Stores.timestampedWindowStoreBuilder(Stores.persistentTimestampedWindowStore(STORE_NAME, Duration.ofMillis(1000L), Duration.ofMillis(1000L), false), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new TimestampedWindowedProcessor();
        }, new String[]{STORE_NAME});
        shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(streamsBuilder, streamsBuilder2, true);
    }

    private void shouldMigrateWindowStoreToTimestampedWindowStoreUsingPapi(StreamsBuilder streamsBuilder, StreamsBuilder streamsBuilder2, boolean z) throws Exception {
        Properties props = props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 1L)));
        processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L)));
        long milliseconds = z ? -1L : CLUSTER.time.milliseconds() - 1;
        processWindowedKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L)));
        long milliseconds2 = z ? -1L : CLUSTER.time.milliseconds() - 1;
        processWindowedKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L)));
        long milliseconds3 = z ? -1L : CLUSTER.time.milliseconds() - 1;
        processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), 1L)));
        processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), 2L)));
        processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), 3L)));
        long milliseconds4 = z ? -1L : CLUSTER.time.milliseconds() - 1;
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        this.kafkaStreams = new KafkaStreams(streamsBuilder2.build(), props);
        this.kafkaStreams.start();
        verifyWindowedCountWithTimestamp(new Windowed(1, new TimeWindow(0L, 1000L)), 2L, milliseconds);
        verifyWindowedCountWithTimestamp(new Windowed(2, new TimeWindow(0L, 1000L)), 1L, milliseconds2);
        verifyWindowedCountWithTimestamp(new Windowed(3, new TimeWindow(0L, 1000L)), 1L, milliseconds3);
        verifyWindowedCountWithTimestamp(new Windowed(4, new TimeWindow(0L, 1000L)), 3L, milliseconds4);
        long milliseconds5 = CLUSTER.time.milliseconds();
        processKeyValueAndVerifyWindowedCountWithTimestamp(1, milliseconds5 + 42, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, milliseconds2)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds4))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(2, milliseconds5 + 45, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds4))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(4, milliseconds5 + 21, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(4L, milliseconds5 + 21))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(4, milliseconds5 + 42, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(5L, milliseconds5 + 42))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(4, milliseconds5 + 10, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, milliseconds5 + 42)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, milliseconds5 + 45)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, milliseconds3)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(6L, milliseconds5 + 42))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(10, milliseconds5 + 100001, Collections.singletonList(KeyValue.pair(new Windowed(10, new TimeWindow(100000L, 101000L)), ValueAndTimestamp.make(1L, milliseconds5 + 100001))));
        this.kafkaStreams.close();
    }

    @Test
    public void shouldProxyWindowStoreToTimestampedWindowStoreUsingPapi() throws Exception {
        StreamsBuilder streamsBuilder = new StreamsBuilder();
        streamsBuilder.addStateStore(Stores.windowStoreBuilder(Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(1000L), Duration.ofMillis(1000L), false), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new WindowedProcessor();
        }, new String[]{STORE_NAME});
        Properties props = props();
        this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), props);
        this.kafkaStreams.start();
        processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 1L)));
        processWindowedKeyValueAndVerifyPlainCount(1, Collections.singletonList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L)));
        processWindowedKeyValueAndVerifyPlainCount(2, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L)));
        processWindowedKeyValueAndVerifyPlainCount(3, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L)));
        processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), 1L)));
        processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), 2L)));
        processWindowedKeyValueAndVerifyPlainCount(4, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), 2L), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), 1L), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), 3L)));
        this.kafkaStreams.close();
        this.kafkaStreams = null;
        StreamsBuilder streamsBuilder2 = new StreamsBuilder();
        streamsBuilder2.addStateStore(Stores.timestampedWindowStoreBuilder(Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(1000L), Duration.ofMillis(1000L), false), Serdes.Integer(), Serdes.Long())).stream(this.inputStream).process(() -> {
            return new TimestampedWindowedProcessor();
        }, new String[]{STORE_NAME});
        this.kafkaStreams = new KafkaStreams(streamsBuilder2.build(), props);
        this.kafkaStreams.start();
        verifyWindowedCountWithSurrogateTimestamp(new Windowed(1, new TimeWindow(0L, 1000L)), 2L);
        verifyWindowedCountWithSurrogateTimestamp(new Windowed(2, new TimeWindow(0L, 1000L)), 1L);
        verifyWindowedCountWithSurrogateTimestamp(new Windowed(3, new TimeWindow(0L, 1000L)), 1L);
        verifyWindowedCountWithSurrogateTimestamp(new Windowed(4, new TimeWindow(0L, 1000L)), 3L);
        processKeyValueAndVerifyWindowedCountWithTimestamp(1, 42L, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(2, 45L, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(4, 21L, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(4L, -1L))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(4, 42L, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(5L, -1L))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(4, 10L, Arrays.asList(KeyValue.pair(new Windowed(1, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(3L, -1L)), KeyValue.pair(new Windowed(2, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(2L, -1L)), KeyValue.pair(new Windowed(3, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(1L, -1L)), KeyValue.pair(new Windowed(4, new TimeWindow(0L, 1000L)), ValueAndTimestamp.make(6L, -1L))));
        processKeyValueAndVerifyWindowedCountWithTimestamp(10, 100001L, Collections.singletonList(KeyValue.pair(new Windowed(10, new TimeWindow(100000L, 101000L)), ValueAndTimestamp.make(1L, -1L))));
        this.kafkaStreams.close();
    }

    private <K, V> void processWindowedKeyValueAndVerifyPlainCount(K k, List<KeyValue<Windowed<Integer>, Object>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronously(this.inputStream, Collections.singletonList(KeyValue.pair(k, 0)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), CLUSTER.time);
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.windowStore());
                if (readOnlyWindowStore == null) {
                    return false;
                }
                KeyValueIterator all = readOnlyWindowStore.all();
                Throwable th = null;
                try {
                    try {
                        LinkedList linkedList = new LinkedList();
                        while (all.hasNext()) {
                            linkedList.add(all.next());
                        }
                        boolean equals = linkedList.equals(list);
                        if (all != null) {
                            if (0 != 0) {
                                try {
                                    all.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                all.close();
                            }
                        }
                        return equals;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K> void verifyWindowedCountWithSurrogateTimestamp(Windowed<K> windowed, long j) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
                if (readOnlyWindowStore == null) {
                    return false;
                }
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) readOnlyWindowStore.fetch(windowed.key(), windowed.window().start());
                if (((Long) valueAndTimestamp.value()).longValue() == j) {
                    if (valueAndTimestamp.timestamp() == -1) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K> void verifyWindowedCountWithTimestamp(Windowed<K> windowed, long j, long j2) throws Exception {
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
                if (readOnlyWindowStore == null) {
                    return false;
                }
                ValueAndTimestamp valueAndTimestamp = (ValueAndTimestamp) readOnlyWindowStore.fetch(windowed.key(), windowed.window().start());
                if (((Long) valueAndTimestamp.value()).longValue() == j) {
                    if (valueAndTimestamp.timestamp() == j2) {
                        return true;
                    }
                }
                return false;
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }

    private <K, V> void processKeyValueAndVerifyWindowedCountWithTimestamp(K k, long j, List<KeyValue<Windowed<Integer>, Object>> list) throws Exception {
        IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(this.inputStream, Collections.singletonList(KeyValue.pair(k, 0)), TestUtils.producerConfig(CLUSTER.bootstrapServers(), IntegerSerializer.class, IntegerSerializer.class), Long.valueOf(j));
        TestUtils.waitForCondition(() -> {
            try {
                ReadOnlyWindowStore readOnlyWindowStore = (ReadOnlyWindowStore) IntegrationTestUtils.getStore(STORE_NAME, this.kafkaStreams, QueryableStoreTypes.timestampedWindowStore());
                if (readOnlyWindowStore == null) {
                    return false;
                }
                KeyValueIterator all = readOnlyWindowStore.all();
                Throwable th = null;
                try {
                    try {
                        LinkedList linkedList = new LinkedList();
                        while (all.hasNext()) {
                            linkedList.add(all.next());
                        }
                        boolean equals = linkedList.equals(list);
                        if (all != null) {
                            if (0 != 0) {
                                try {
                                    all.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                all.close();
                            }
                        }
                        return equals;
                    } finally {
                    }
                } finally {
                }
            } catch (Exception e) {
                e.printStackTrace();
                System.err.println(e.getMessage());
                return false;
            }
        }, IntegrationTestUtils.DEFAULT_TIMEOUT, "Could not get expected result in time.");
    }
}
