package org.apache.kafka.streams.state.internals;

import java.io.File;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.WindowStore;
import org.apache.kafka.streams.state.WindowStoreIterator;
import org.apache.kafka.test.StreamsTestUtils;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.class */
public class RocksDBWindowStoreTest extends AbstractWindowBytesStoreTest {
    private static final String STORE_NAME = "rocksDB window store";
    private static final String METRICS_SCOPE = "test-state-id";
    private final KeyValueSegments segments = new KeyValueSegments(STORE_NAME, METRICS_SCOPE, 120000, IntegrationTestUtils.DEFAULT_TIMEOUT);

    @Override // org.apache.kafka.streams.state.internals.AbstractWindowBytesStoreTest
    <K, V> WindowStore<K, V> buildWindowStore(long j, long j2, boolean z, Serde<K> serde, Serde<V> serde2) {
        return Stores.windowStoreBuilder(Stores.persistentWindowStore(STORE_NAME, Duration.ofMillis(j), Duration.ofMillis(j2), z), serde, serde2).build();
    }

    @Test
    public void shouldOnlyIterateOpenSegments() {
        this.windowStore.put(1, "one", 0L);
        long j = 0 + IntegrationTestUtils.DEFAULT_TIMEOUT;
        this.windowStore.put(1, "two", j);
        long j2 = j + IntegrationTestUtils.DEFAULT_TIMEOUT;
        this.windowStore.put(1, "three", j2);
        WindowStoreIterator fetch = this.windowStore.fetch(1, 0L, j2);
        Throwable th = null;
        try {
            this.windowStore.put(1, "four", j2 + IntegrationTestUtils.DEFAULT_TIMEOUT);
            Assert.assertEquals(new KeyValue(Long.valueOf(IntegrationTestUtils.DEFAULT_TIMEOUT), "two"), fetch.next());
            Assert.assertEquals(new KeyValue(120000L, "three"), fetch.next());
            Assert.assertFalse(fetch.hasNext());
            if (fetch != null) {
                if (0 == 0) {
                    fetch.close();
                    return;
                }
                try {
                    fetch.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (fetch != null) {
                if (0 != 0) {
                    try {
                        fetch.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    fetch.close();
                }
            }
            throw th3;
        }
    }

    @Test
    public void testRolling() {
        this.windowStore.put(0, "zero", 120000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(2L)}), segmentDirs(this.baseDir));
        this.windowStore.put(1, "one", 150000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(2L)}), segmentDirs(this.baseDir));
        this.windowStore.put(2, "two", 180000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L)}), segmentDirs(this.baseDir));
        this.windowStore.put(4, "four", 240000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)}), segmentDirs(this.baseDir));
        this.windowStore.put(5, "five", 270000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(2L), this.segments.segmentName(3L), this.segments.segmentName(4L)}), segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.singletonList("zero")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("one")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        this.windowStore.put(6, "six", 300000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)}), segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        this.windowStore.put(7, "seven", 330000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(3L), this.segments.segmentName(4L), this.segments.segmentName(5L)}), segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("two")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("seven")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        this.windowStore.put(8, "eight", 360000L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), segmentDirs(this.baseDir));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("seven")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("eight")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), segmentDirs(this.baseDir));
    }

    @Test
    public void testSegmentMaintenance() {
        this.windowStore.close();
        this.windowStore = buildWindowStore(120000L, 3L, true, Serdes.Integer(), Serdes.String());
        this.windowStore.init(this.context, this.windowStore);
        this.context.setTime(0L);
        this.windowStore.put(0, "v", 0L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(0L)}), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 59999L);
        this.windowStore.put(0, "v", 59999L);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(0L)}), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", IntegrationTestUtils.DEFAULT_TIMEOUT);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(0L), this.segments.segmentName(1L)}), segmentDirs(this.baseDir));
        WindowStoreIterator fetch = this.windowStore.fetch(0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        int i = 0;
        while (fetch.hasNext()) {
            fetch.next();
            i++;
        }
        Assert.assertEquals(4L, i);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(0L), this.segments.segmentName(1L)}), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 180000L);
        WindowStoreIterator fetch2 = this.windowStore.fetch(0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(240000L));
        int i2 = 0;
        while (fetch2.hasNext()) {
            fetch2.next();
            i2++;
        }
        Assert.assertEquals(2L, i2);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(1L), this.segments.segmentName(3L)}), segmentDirs(this.baseDir));
        this.windowStore.put(0, "v", 300000L);
        WindowStoreIterator fetch3 = this.windowStore.fetch(0, Instant.ofEpochMilli(240000L), Instant.ofEpochMilli(600000L));
        int i3 = 0;
        while (fetch3.hasNext()) {
            fetch3.next();
            i3++;
        }
        Assert.assertEquals(1L, i3);
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(3L), this.segments.segmentName(5L)}), segmentDirs(this.baseDir));
    }

    @Test
    public void testInitialLoading() {
        File file = new File(this.baseDir, STORE_NAME);
        new File(file, this.segments.segmentName(0L)).mkdir();
        new File(file, this.segments.segmentName(1L)).mkdir();
        new File(file, this.segments.segmentName(2L)).mkdir();
        new File(file, this.segments.segmentName(3L)).mkdir();
        new File(file, this.segments.segmentName(4L)).mkdir();
        new File(file, this.segments.segmentName(5L)).mkdir();
        new File(file, this.segments.segmentName(6L)).mkdir();
        this.windowStore.close();
        this.windowStore = buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.windowStore.init(this.context, this.windowStore);
        this.windowStore.put(1, "v", 360000L);
        List asList = Arrays.asList(this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L));
        asList.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        List list = Utils.toList(segmentDirs(this.baseDir).iterator());
        list.sort((v0, v1) -> {
            return v0.compareTo(v1);
        });
        Assert.assertEquals(asList, list);
        WindowStoreIterator fetch = this.windowStore.fetch(0, Instant.ofEpochMilli(0L), Instant.ofEpochMilli(1000000L));
        Throwable th = null;
        while (fetch.hasNext()) {
            try {
                try {
                    fetch.next();
                } finally {
                }
            } catch (Throwable th2) {
                if (fetch != null) {
                    if (th != null) {
                        try {
                            fetch.close();
                        } catch (Throwable th3) {
                            th.addSuppressed(th3);
                        }
                    } else {
                        fetch.close();
                    }
                }
                throw th2;
            }
        }
        if (fetch != null) {
            if (0 != 0) {
                try {
                    fetch.close();
                } catch (Throwable th4) {
                    th.addSuppressed(th4);
                }
            } else {
                fetch.close();
            }
        }
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), segmentDirs(this.baseDir));
    }

    @Test
    public void testRestore() throws Exception {
        this.windowStore.put(0, "zero", 120000L);
        this.windowStore.put(1, "one", 150000L);
        this.windowStore.put(2, "two", 180000L);
        this.windowStore.put(3, "three", 210000L);
        this.windowStore.put(4, "four", 240000L);
        this.windowStore.put(5, "five", 270000L);
        this.windowStore.put(6, "six", 300000L);
        this.windowStore.put(7, "seven", 330000L);
        this.windowStore.put(8, "eight", 360000L);
        this.windowStore.flush();
        this.windowStore.close();
        Utils.delete(this.baseDir);
        this.windowStore = buildWindowStore(120000L, 3L, false, Serdes.Integer(), Serdes.String());
        this.windowStore.init(this.context, this.windowStore);
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        ArrayList arrayList = new ArrayList();
        for (ProducerRecord<Object, Object> producerRecord : this.recordCollector.collected()) {
            arrayList.add(new KeyValue(((Bytes) producerRecord.key()).get(), (byte[]) producerRecord.value()));
        }
        this.context.restore(STORE_NAME, arrayList);
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(0, Instant.ofEpochMilli(119997L), Instant.ofEpochMilli(120003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(1, Instant.ofEpochMilli(149997L), Instant.ofEpochMilli(150003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(2, Instant.ofEpochMilli(179997L), Instant.ofEpochMilli(180003L))));
        Assert.assertEquals(new HashSet(Collections.emptyList()), StreamsTestUtils.valuesToSet(this.windowStore.fetch(3, Instant.ofEpochMilli(209997L), Instant.ofEpochMilli(210003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("four")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(4, Instant.ofEpochMilli(239997L), Instant.ofEpochMilli(240003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("five")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(5, Instant.ofEpochMilli(269997L), Instant.ofEpochMilli(270003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("six")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(6, Instant.ofEpochMilli(299997L), Instant.ofEpochMilli(300003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("seven")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(7, Instant.ofEpochMilli(329997L), Instant.ofEpochMilli(330003L))));
        Assert.assertEquals(new HashSet(Collections.singletonList("eight")), StreamsTestUtils.valuesToSet(this.windowStore.fetch(8, Instant.ofEpochMilli(359997L), Instant.ofEpochMilli(360003L))));
        this.windowStore.flush();
        Assert.assertEquals(Utils.mkSet(new String[]{this.segments.segmentName(4L), this.segments.segmentName(5L), this.segments.segmentName(6L)}), segmentDirs(this.baseDir));
    }

    private Set<String> segmentDirs(File file) {
        return new HashSet(Arrays.asList((Object[]) Objects.requireNonNull(new File(file, this.windowStore.name()).list())));
    }
}
