package org.apache.ignite.internal.storage;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.storage.BaseMvStoragesTest;
import org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher;
import org.apache.ignite.internal.tx.Timestamp;
import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.lang.IgniteBiTuple;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.class */
public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest {
    private static final UUID COMMIT_TABLE_ID = UUID.randomUUID();
    protected static final int PARTITION_ID = 1;
    protected MvPartitionStorage storage;
    protected final UUID txId = newTransactionId();
    protected final HybridClock clock = new HybridClockImpl();
    protected final BaseMvStoragesTest.TestKey key = new BaseMvStoragesTest.TestKey(10, "foo");
    private final BaseMvStoragesTest.TestValue value = new BaseMvStoragesTest.TestValue(20, "bar");
    protected final BinaryRow binaryRow = binaryRow(this.key, this.value);
    private final BaseMvStoragesTest.TestValue value2 = new BaseMvStoragesTest.TestValue(21, "bar2");
    protected final BinaryRow binaryRow2 = binaryRow(this.key, this.value2);
    private final BinaryRow binaryRow3 = binaryRow(this.key, new BaseMvStoragesTest.TestValue(22, "bar3"));

    /* loaded from: input_file:org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest$ScanTimestampProvider.class */
    private enum ScanTimestampProvider {
        NOW { // from class: org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest.ScanTimestampProvider.1
            @Override // org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest.ScanTimestampProvider
            HybridTimestamp scanTimestamp(HybridClock hybridClock) {
                return hybridClock.now();
            }
        },
        MAX_VALUE { // from class: org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest.ScanTimestampProvider.2
            @Override // org.apache.ignite.internal.storage.AbstractMvPartitionStorageTest.ScanTimestampProvider
            HybridTimestamp scanTimestamp(HybridClock hybridClock) {
                return HybridTimestamp.MAX_VALUE;
            }
        };

        abstract HybridTimestamp scanTimestamp(HybridClock hybridClock);
    }

    protected BinaryRow read(RowId rowId, HybridTimestamp hybridTimestamp) {
        return this.storage.read(rowId, hybridTimestamp).binaryRow();
    }

    protected PartitionTimestampCursor scan(HybridTimestamp hybridTimestamp) {
        return this.storage.scan(hybridTimestamp);
    }

    protected RowId insert(BinaryRow binaryRow, UUID uuid) {
        RowId rowId = new RowId(PARTITION_ID);
        this.storage.runConsistently(() -> {
            return this.storage.addWrite(rowId, binaryRow, uuid, UUID.randomUUID(), 0);
        });
        return rowId;
    }

    protected BinaryRow addWrite(RowId rowId, BinaryRow binaryRow, UUID uuid) {
        return (BinaryRow) this.storage.runConsistently(() -> {
            return this.storage.addWrite(rowId, binaryRow, uuid, COMMIT_TABLE_ID, PARTITION_ID);
        });
    }

    protected void commitWrite(RowId rowId, HybridTimestamp hybridTimestamp) {
        this.storage.runConsistently(() -> {
            this.storage.commitWrite(rowId, hybridTimestamp);
            return null;
        });
    }

    protected void addWriteCommitted(RowId rowId, BinaryRow binaryRow, HybridTimestamp hybridTimestamp) {
        this.storage.runConsistently(() -> {
            this.storage.addWriteCommitted(rowId, binaryRow, hybridTimestamp);
            return null;
        });
    }

    protected BinaryRow abortWrite(RowId rowId) {
        return (BinaryRow) this.storage.runConsistently(() -> {
            return this.storage.abortWrite(rowId);
        });
    }

    protected UUID newTransactionId() {
        return Timestamp.nextVersion().toUuid();
    }

    @Test
    public void testReadsFromEmpty() {
        RowId rowId = new RowId(PARTITION_ID);
        Assertions.assertEquals(PARTITION_ID, rowId.partitionId());
        Assertions.assertNull(read(rowId, this.clock.now()));
    }

    @EnumSource
    @ParameterizedTest
    public void testScanOverEmpty(ScanTimestampProvider scanTimestampProvider) throws Exception {
        Assertions.assertEquals(List.of(), convert(scan(scanTimestampProvider.scanTimestamp(this.clock))));
    }

    @Test
    public void testAddWrite() {
        RowId insert = insert(this.binaryRow, this.txId);
        Assertions.assertThrows(TxIdMismatchException.class, () -> {
            addWrite(insert, this.binaryRow, newTransactionId());
        });
        addWrite(insert, this.binaryRow, this.txId);
        assertRowMatches(read(insert, this.clock.now()), this.binaryRow);
    }

    @Test
    public void testAbortWrite() {
        RowId insert = insert(binaryRow(this.key, this.value), this.txId);
        abortWrite(insert);
        Assertions.assertNull(read(insert, HybridTimestamp.MAX_VALUE));
    }

    @Test
    public void testCommitWrite() {
        RowId insert = insert(this.binaryRow, this.txId);
        HybridTimestamp now = this.clock.now();
        HybridTimestamp now2 = this.clock.now();
        commitWrite(insert, now2);
        HybridTimestamp now3 = this.clock.now();
        Assertions.assertNull(read(insert, now));
        assertRowMatches(read(insert, now2), this.binaryRow);
        assertRowMatches(read(insert, now3), this.binaryRow);
        BaseMvStoragesTest.TestValue testValue = new BaseMvStoragesTest.TestValue(30, "duh");
        UUID newTransactionId = newTransactionId();
        BinaryRow binaryRow = binaryRow(this.key, testValue);
        addWrite(insert, binaryRow, newTransactionId);
        Assertions.assertNull(read(insert, now));
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), binaryRow);
        assertRowMatches(read(insert, now2), this.binaryRow);
        assertRowMatches(read(insert, now3), binaryRow);
        assertRowMatches(read(insert, this.clock.now()), binaryRow);
        HybridTimestamp now4 = this.clock.now();
        commitWrite(insert, now4);
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), binaryRow);
        assertRowMatches(read(insert, now2), this.binaryRow);
        assertRowMatches(read(insert, now3), this.binaryRow);
        assertRowMatches(read(insert, this.clock.now()), binaryRow);
        addWrite(insert, null, newTransactionId());
        Assertions.assertNull(read(insert, now));
        Assertions.assertNull(read(insert, HybridTimestamp.MAX_VALUE));
        assertRowMatches(read(insert, now2), this.binaryRow);
        assertRowMatches(read(insert, now3), this.binaryRow);
        assertRowMatches(read(insert, now4), binaryRow);
        Assertions.assertNull(read(insert, this.clock.now()));
        HybridTimestamp now5 = this.clock.now();
        commitWrite(insert, now5);
        Assertions.assertNull(read(insert, now));
        Assertions.assertNull(read(insert, HybridTimestamp.MAX_VALUE));
        Assertions.assertNull(read(insert, now5));
        Assertions.assertNull(read(insert, this.clock.now()));
        assertRowMatches(read(insert, now2), this.binaryRow);
        assertRowMatches(read(insert, now3), this.binaryRow);
    }

    @Test
    public void testScan() throws Exception {
        BaseMvStoragesTest.TestKey testKey = new BaseMvStoragesTest.TestKey(PARTITION_ID, "1");
        BaseMvStoragesTest.TestValue testValue = new BaseMvStoragesTest.TestValue(10, "xxx");
        BaseMvStoragesTest.TestKey testKey2 = new BaseMvStoragesTest.TestKey(2, "2");
        BaseMvStoragesTest.TestValue testValue2 = new BaseMvStoragesTest.TestValue(20, "yyy");
        UUID newTransactionId = newTransactionId();
        RowId insert = insert(binaryRow(testKey, testValue), newTransactionId);
        RowId insert2 = insert(binaryRow(testKey2, testValue2), newTransactionId);
        HybridTimestamp now = this.clock.now();
        HybridTimestamp now2 = this.clock.now();
        commitWrite(insert, now2);
        HybridTimestamp now3 = this.clock.now();
        HybridTimestamp now4 = this.clock.now();
        commitWrite(insert2, now4);
        HybridTimestamp now5 = this.clock.now();
        Assertions.assertEquals(List.of(), convert(scan(now)));
        Assertions.assertEquals(List.of(testValue), convert(scan(now2)));
        Assertions.assertEquals(List.of(testValue), convert(scan(now3)));
        Assertions.assertEquals(List.of(testValue, testValue2), convert(scan(now4)));
        Assertions.assertEquals(List.of(testValue, testValue2), convert(scan(now5)));
        Assertions.assertEquals(List.of(testValue, testValue2), convert(scan(HybridTimestamp.MAX_VALUE)));
    }

    @Test
    public void testTransactionScanCursorInvariants() throws Exception {
        BaseMvStoragesTest.TestValue testValue = new BaseMvStoragesTest.TestValue(10, "xxx");
        BaseMvStoragesTest.TestValue testValue2 = new BaseMvStoragesTest.TestValue(20, "yyy");
        commitWrite(insert(binaryRow(new BaseMvStoragesTest.TestKey(PARTITION_ID, "1"), testValue), this.txId), this.clock.now());
        commitWrite(insert(binaryRow(new BaseMvStoragesTest.TestKey(2, "2"), testValue2), this.txId), this.clock.now());
        PartitionTimestampCursor scan = scan(HybridTimestamp.MAX_VALUE);
        try {
            Assertions.assertTrue(scan.hasNext());
            Assertions.assertTrue(scan.hasNext());
            ArrayList arrayList = new ArrayList();
            arrayList.add(value(((ReadResult) scan.next()).binaryRow()));
            Assertions.assertTrue(scan.hasNext());
            Assertions.assertTrue(scan.hasNext());
            arrayList.add(value(((ReadResult) scan.next()).binaryRow()));
            Assertions.assertFalse(scan.hasNext());
            Assertions.assertFalse(scan.hasNext());
            Assertions.assertThrows(NoSuchElementException.class, () -> {
                scan.next();
            });
            MatcherAssert.assertThat(arrayList, Matchers.hasItems(new BaseMvStoragesTest.TestValue[]{testValue, testValue2}));
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void testTimestampScanCursorInvariants() throws Exception {
        BinaryRow binaryRow;
        BinaryRow binaryRow2;
        HybridTimestamp hybridTimestamp;
        BaseMvStoragesTest.TestValue testValue = new BaseMvStoragesTest.TestValue(10, "xxx");
        BaseMvStoragesTest.TestValue testValue2 = new BaseMvStoragesTest.TestValue(11, "xxx");
        BaseMvStoragesTest.TestValue testValue3 = new BaseMvStoragesTest.TestValue(20, "yyy");
        BaseMvStoragesTest.TestValue testValue4 = new BaseMvStoragesTest.TestValue(21, "yyy");
        RowId rowId = new RowId(PARTITION_ID, 10L, 10L);
        RowId rowId2 = new RowId(PARTITION_ID, 10L, 20L);
        BaseMvStoragesTest.TestKey testKey = new BaseMvStoragesTest.TestKey(PARTITION_ID, "1");
        BinaryRow binaryRow3 = binaryRow(testKey, testValue);
        BinaryRow binaryRow4 = binaryRow(testKey, testValue2);
        addWrite(rowId, binaryRow3, this.txId);
        HybridTimestamp now = this.clock.now();
        commitWrite(rowId, now);
        addWrite(rowId, binaryRow4, newTransactionId());
        BaseMvStoragesTest.TestKey testKey2 = new BaseMvStoragesTest.TestKey(2, "2");
        BinaryRow binaryRow5 = binaryRow(testKey2, testValue3);
        BinaryRow binaryRow6 = binaryRow(testKey2, testValue4);
        addWrite(rowId2, binaryRow5, this.txId);
        HybridTimestamp now2 = this.clock.now();
        commitWrite(rowId2, now2);
        addWrite(rowId2, binaryRow6, newTransactionId());
        PartitionTimestampCursor scan = scan(this.clock.now());
        try {
            Assertions.assertThrows(IllegalStateException.class, () -> {
                scan.committed(now);
            });
            Assertions.assertTrue(scan.hasNext());
            while (scan.hasNext()) {
                Assertions.assertTrue(scan.hasNext());
                ReadResult readResult = (ReadResult) scan.next();
                Assertions.assertNotNull(readResult);
                Assertions.assertTrue(readResult.isWriteIntent());
                Assertions.assertFalse(readResult.isEmpty());
                if (key(readResult.binaryRow()).equals(testKey)) {
                    binaryRow = binaryRow3;
                    binaryRow2 = binaryRow4;
                    hybridTimestamp = now;
                } else {
                    binaryRow = binaryRow5;
                    binaryRow2 = binaryRow6;
                    hybridTimestamp = now2;
                }
                assertRowMatches(readResult.binaryRow(), binaryRow2);
                BinaryRow committed = scan.committed(hybridTimestamp);
                Assertions.assertNotNull(committed);
                assertRowMatches(committed, binaryRow);
            }
            Assertions.assertFalse(scan.hasNext());
            Assertions.assertFalse(scan.hasNext());
            Assertions.assertThrows(NoSuchElementException.class, () -> {
                scan.next();
            });
            Assertions.assertThrows(IllegalStateException.class, () -> {
                scan.committed(now);
            });
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private List<BaseMvStoragesTest.TestValue> convert(PartitionTimestampCursor partitionTimestampCursor) throws Exception {
        try {
            List<BaseMvStoragesTest.TestValue> list = (List) partitionTimestampCursor.stream().map(readResult -> {
                return BaseMvStoragesTest.value(readResult.binaryRow());
            }).sorted(Comparator.nullsFirst(Comparator.naturalOrder())).collect(Collectors.toList());
            if (partitionTimestampCursor != null) {
                partitionTimestampCursor.close();
            }
            return list;
        } catch (Throwable th) {
            if (partitionTimestampCursor != null) {
                try {
                    partitionTimestampCursor.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() {
        assertRowMatches(read(insert(this.binaryRow, this.txId), HybridTimestamp.MAX_VALUE), this.binaryRow);
    }

    protected final void assertRowMatches(BinaryRow binaryRow, BinaryRow binaryRow2) {
        MatcherAssert.assertThat(binaryRow, Matchers.is(Matchers.notNullValue()));
        MatcherAssert.assertThat(binaryRow.bytes(), Matchers.is(Matchers.equalTo(binaryRow2.bytes())));
    }

    @Test
    void readOfCommittedRowWithAnyTransactionIdReturnsTheRow() {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), this.binaryRow);
    }

    @Test
    void readsUncommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
        commitWrite(insert(this.binaryRow, this.txId), this.clock.now());
        assertRowMatches(read(insert(this.binaryRow2, this.txId), HybridTimestamp.MAX_VALUE), this.binaryRow2);
    }

    @Test
    void readsCommittedVersionEvenWhenThereIsCommittedVersionBeforeIt() {
        commitWrite(insert(this.binaryRow, this.txId), this.clock.now());
        RowId insert = insert(this.binaryRow2, this.txId);
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), this.binaryRow2);
    }

    @Test
    void readByExactlyCommitTimestampFindsRow() {
        RowId insert = insert(this.binaryRow, this.txId);
        HybridTimestamp now = this.clock.now();
        commitWrite(insert, now);
        assertRowMatches(read(insert, now), this.binaryRow);
    }

    @Test
    void readByTimestampAfterCommitTimestampFindsRow() {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, this.clock.now()), this.binaryRow);
    }

    @Test
    void readByTimestampBeforeFirstVersionCommitTimestampFindsNothing() {
        HybridTimestamp now = this.clock.now();
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        MatcherAssert.assertThat(read(insert, now), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void readByTimestampOfLastVersionFindsLastVersion() {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        addWrite(insert, this.binaryRow2, newTransactionId());
        HybridTimestamp now = this.clock.now();
        commitWrite(insert, now);
        assertRowMatches(read(insert, now), this.binaryRow2);
    }

    @Test
    void readByTimestampOfPreviousVersionFindsPreviousVersion() {
        RowId insert = insert(this.binaryRow, this.txId);
        HybridTimestamp now = this.clock.now();
        commitWrite(insert, now);
        addWrite(insert, this.binaryRow2, newTransactionId());
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, now), this.binaryRow);
    }

    @Test
    void readByTimestampBetweenVersionsFindsPreviousVersion() {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        HybridTimestamp now = this.clock.now();
        addWrite(insert, this.binaryRow2, newTransactionId());
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, now), this.binaryRow);
    }

    @Test
    void readByTimestampAfterWriteFindsUncommittedVersion() {
        RowId rowId = new RowId(PARTITION_ID);
        addWrite(rowId, this.binaryRow, newTransactionId());
        assertRowMatches(read(rowId, this.clock.now()), this.binaryRow);
    }

    @Test
    void readByTimestampAfterCommitAndWriteFindsUncommittedVersion() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        addWrite(insert, this.binaryRow2, newTransactionId());
        assertRowMatches(read(insert, this.clock.now()), this.binaryRow2);
    }

    @Test
    void addWriteWithDifferentTxIdThrows() {
        RowId insert = insert(this.binaryRow, this.txId);
        Assertions.assertThrows(TxIdMismatchException.class, () -> {
            addWrite(insert, this.binaryRow2, newTransactionId());
        });
    }

    @Test
    void secondUncommittedWriteWithSameTxIdReplacesExistingUncommittedWrite() {
        RowId insert = insert(this.binaryRow, this.txId);
        addWrite(insert, this.binaryRow2, this.txId);
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), this.binaryRow2);
    }

    @Test
    void addWriteReturnsUncommittedVersionIfItExists() {
        assertRowMatches(addWrite(insert(this.binaryRow, this.txId), this.binaryRow2, this.txId), this.binaryRow);
    }

    @Test
    void addWriteReturnsNullIfNoUncommittedVersionExists() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        MatcherAssert.assertThat(addWrite(insert, this.binaryRow2, this.txId), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void addWriteCreatesUncommittedVersion() {
        Assertions.assertTrue(this.storage.read(insert(this.binaryRow, this.txId), this.clock.now()).isWriteIntent());
    }

    @Test
    void afterRemovalReadWithTxIdFindsNothing() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        addWrite(insert, null, this.txId);
        MatcherAssert.assertThat(read(insert, HybridTimestamp.MAX_VALUE), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void afterRemovalReadByLatestTimestampFindsNothing() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        addWrite(insert, null, newTransactionId());
        commitWrite(insert, this.clock.now());
        MatcherAssert.assertThat(read(insert, this.clock.now()), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void afterRemovalPreviousVersionRemainsAccessibleByTimestamp() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        HybridTimestamp now = this.clock.now();
        commitWrite(insert, now);
        addWrite(insert, null, newTransactionId());
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, now), this.binaryRow);
    }

    @Test
    void removalReturnsUncommittedRowVersionIfItExists() {
        assertRowMatches(addWrite(insert(this.binaryRow, this.txId), null, this.txId), this.binaryRow);
    }

    @Test
    void removalReturnsNullIfNoUncommittedVersionExists() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        MatcherAssert.assertThat(addWrite(insert, null, newTransactionId()), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void commitWriteCommitsWriteIntentVersion() {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        Assertions.assertFalse(this.storage.read(insert, this.clock.now()).isWriteIntent());
    }

    @Test
    void commitWriteMakesVersionAvailableToReadByTimestamp() {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, this.clock.now()), this.binaryRow);
    }

    @Test
    void commitAndAbortWriteNoOpIfNoUncommittedVersionExists() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        abortWrite(insert);
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), this.binaryRow);
        commitWrite(insert, this.clock.now());
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), this.binaryRow);
    }

    @Test
    void abortWriteRemovesUncommittedVersion() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        commitWrite(insert, this.clock.now());
        addWrite(insert, this.binaryRow2, this.txId);
        abortWrite(insert);
        assertRowMatches(read(insert, HybridTimestamp.MAX_VALUE), this.binaryRow);
    }

    @Test
    void abortOfInsertMakesRowNonExistentForReadByTimestamp() {
        RowId insert = insert(this.binaryRow, newTransactionId());
        abortWrite(insert);
        MatcherAssert.assertThat(read(insert, this.clock.now()), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void abortOfInsertMakesRowNonExistentForReadWithTxId() {
        MatcherAssert.assertThat(read(new RowId(PARTITION_ID), HybridTimestamp.MAX_VALUE), Matchers.is(Matchers.nullValue()));
    }

    @Test
    void abortWriteReturnsTheRemovedVersion() {
        assertRowMatches(abortWrite(insert(this.binaryRow, this.txId)), this.binaryRow);
    }

    @Test
    void readByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() {
        assertRowMatches(this.storage.read(commitAbortAndAddUncommitted(), this.clock.now()).binaryRow(), this.binaryRow3);
    }

    @Test
    void readByTimestampBeforeAndAfterUncommittedWrite() {
        RowId rowId = new RowId(PARTITION_ID);
        HybridTimestamp now = this.clock.now();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow, this.txId);
            commitWrite(rowId, now);
            return null;
        });
        UUID randomUUID = UUID.randomUUID();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow2, randomUUID);
            return null;
        });
        ReadResult read = this.storage.read(rowId, now);
        Assertions.assertNotNull(read);
        Assertions.assertNull(read.transactionId());
        Assertions.assertNull(read.commitTableId());
        Assertions.assertEquals(-1, read.commitPartitionId());
        assertRowMatches(read.binaryRow(), this.binaryRow);
        ReadResult read2 = this.storage.read(rowId, this.clock.now());
        Assertions.assertNotNull(read2);
        Assertions.assertEquals(randomUUID, read2.transactionId());
        Assertions.assertEquals(COMMIT_TABLE_ID, read2.commitTableId());
        Assertions.assertEquals(PARTITION_ID, read2.commitPartitionId());
        assertRowMatches(read2.binaryRow(), this.binaryRow2);
    }

    private RowId commitAbortAndAddUncommitted() {
        return (RowId) this.storage.runConsistently(() -> {
            RowId rowId = new RowId(PARTITION_ID);
            this.storage.addWrite(rowId, this.binaryRow, this.txId, UUID.randomUUID(), 0);
            commitWrite(rowId, this.clock.now());
            addWrite(rowId, this.binaryRow2, newTransactionId());
            this.storage.abortWrite(rowId);
            addWrite(rowId, this.binaryRow3, newTransactionId());
            return rowId;
        });
    }

    @EnumSource(ScanTimestampProvider.class)
    @ParameterizedTest
    void scanWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite(ScanTimestampProvider scanTimestampProvider) throws Exception {
        commitAbortAndAddUncommitted();
        PartitionTimestampCursor scan = this.storage.scan(scanTimestampProvider.scanTimestamp(this.clock));
        try {
            assertRowMatches(((ReadResult) scan.next()).binaryRow(), this.binaryRow3);
            Assertions.assertFalse(scan.hasNext());
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    void readByTimestampWorksCorrectlyIfNoUncommittedValueExists() {
        assertRowMatches(read(insert(this.binaryRow, this.txId), this.clock.now()), this.binaryRow);
    }

    @Test
    void testAppliedIndex() {
        this.storage.runConsistently(() -> {
            Assertions.assertEquals(0L, this.storage.lastAppliedIndex());
            Assertions.assertEquals(0L, this.storage.persistedIndex());
            this.storage.lastAppliedIndex(1L);
            Assertions.assertEquals(1L, this.storage.lastAppliedIndex());
            MatcherAssert.assertThat(Long.valueOf(this.storage.persistedIndex()), Matchers.is(Matchers.lessThanOrEqualTo(1L)));
            return null;
        });
        MatcherAssert.assertThat(this.storage.flush(), CompletableFutureMatcher.willCompleteSuccessfully());
        Assertions.assertEquals(1L, this.storage.persistedIndex());
    }

    @Test
    void testReadWithinBeforeAndAfterTwoCommits() {
        HybridTimestamp now = this.clock.now();
        RowId rowId = new RowId(PARTITION_ID);
        HybridTimestamp now2 = this.clock.now();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow, newTransactionId());
            commitWrite(rowId, now2);
            return null;
        });
        HybridTimestamp now3 = this.clock.now();
        HybridTimestamp now4 = this.clock.now();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow2, newTransactionId());
            commitWrite(rowId, now4);
            return null;
        });
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow3, newTransactionId());
            return null;
        });
        HybridTimestamp now5 = this.clock.now();
        Assertions.assertNull(this.storage.read(rowId, now).binaryRow());
        ReadResult read = this.storage.read(rowId, now2);
        Assertions.assertNotNull(read);
        Assertions.assertNull(read.newestCommitTimestamp());
        assertRowMatches(read.binaryRow(), this.binaryRow);
        ReadResult read2 = this.storage.read(rowId, now3);
        Assertions.assertNotNull(read2);
        Assertions.assertNull(read2.newestCommitTimestamp());
        assertRowMatches(read2.binaryRow(), this.binaryRow);
        ReadResult read3 = this.storage.read(rowId, now4);
        Assertions.assertNotNull(read3);
        Assertions.assertNull(read3.newestCommitTimestamp());
        assertRowMatches(read3.binaryRow(), this.binaryRow2);
        ReadResult read4 = this.storage.read(rowId, now5);
        Assertions.assertNotNull(read4);
        Assertions.assertNotNull(read4.newestCommitTimestamp());
        Assertions.assertEquals(now4, read4.newestCommitTimestamp());
        assertRowMatches(read4.binaryRow(), this.binaryRow3);
    }

    @Test
    void testWrongPartition() {
        RowId commitAbortAndAddUncommitted = commitAbortAndAddUncommitted();
        RowId rowId = new RowId(commitAbortAndAddUncommitted.partitionId() + PARTITION_ID, commitAbortAndAddUncommitted.mostSignificantBits(), commitAbortAndAddUncommitted.leastSignificantBits());
        Assertions.assertThrows(IllegalArgumentException.class, () -> {
            read(rowId, this.clock.now());
        });
    }

    @Test
    void testReadingNothingWithLowerRowIdIfHigherRowIdWritesExist() {
        Assertions.assertNull(read(decrement(commitAbortAndAddUncommitted()), this.clock.now()));
    }

    @Test
    void testReadingNothingByTxIdWithLowerRowId() {
        RowId rowId = new RowId(PARTITION_ID);
        RowId decrement = decrement(rowId);
        UUID randomUUID = UUID.randomUUID();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow, randomUUID);
            return null;
        });
        Assertions.assertNull(read(decrement, HybridTimestamp.MAX_VALUE));
    }

    @Test
    void testReadingCorrectWriteIntentByTimestampIfLowerRowIdWriteIntentExists() {
        RowId rowId = new RowId(PARTITION_ID);
        RowId decrement = decrement(rowId);
        this.storage.runConsistently(() -> {
            addWrite(decrement, this.binaryRow2, newTransactionId());
            addWrite(rowId, this.binaryRow, newTransactionId());
            commitWrite(rowId, this.clock.now());
            return null;
        });
        assertRowMatches(read(rowId, this.clock.now()), this.binaryRow);
    }

    @Test
    void testReadingCorrectWriteIntentByTimestampIfHigherRowIdWriteIntentExists() {
        RowId rowId = new RowId(PARTITION_ID);
        RowId decrement = decrement(rowId);
        this.storage.runConsistently(() -> {
            addWrite(decrement, this.binaryRow, newTransactionId());
            addWrite(rowId, this.binaryRow2, newTransactionId());
            return null;
        });
        assertRowMatches(read(decrement, this.clock.now()), this.binaryRow);
    }

    @Test
    void testReadingTombstoneIfPreviousCommitExists() {
        RowId rowId = new RowId(PARTITION_ID);
        HybridTimestamp now = this.clock.now();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow, newTransactionId());
            commitWrite(rowId, now);
            addWrite(rowId, null, newTransactionId());
            return null;
        });
        ReadResult read = this.storage.read(rowId, this.clock.now());
        Assertions.assertNotNull(read);
        Assertions.assertNull(read.binaryRow());
        Assertions.assertEquals(now, read.newestCommitTimestamp());
    }

    @Test
    void testReadingTombstoneIfPreviousCommitNotExists() {
        RowId rowId = new RowId(PARTITION_ID);
        this.storage.runConsistently(() -> {
            addWrite(rowId, null, newTransactionId());
            return null;
        });
        ReadResult read = this.storage.read(rowId, this.clock.now());
        Assertions.assertNotNull(read);
        Assertions.assertNull(read.binaryRow());
        Assertions.assertNull(read.newestCommitTimestamp());
    }

    @Test
    void testScanVersions() throws Exception {
        RowId rowId = new RowId(PARTITION_ID, 100L, 0L);
        ArrayList arrayList = new ArrayList(List.of(this.value, this.value2));
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            addWrite(rowId, binaryRow(this.key, (BaseMvStoragesTest.TestValue) it.next()), newTransactionId());
            commitWrite(rowId, this.clock.now());
        }
        List.of(new RowId(PARTITION_ID, 99L, 0L), new RowId(PARTITION_ID, 101L, 0L)).forEach(rowId2 -> {
            addWrite(rowId2, binaryRow(this.key, this.value), newTransactionId());
            commitWrite(rowId2, this.clock.now());
        });
        Collections.reverse(arrayList);
        List<IgniteBiTuple<BaseMvStoragesTest.TestKey, BaseMvStoragesTest.TestValue>> list = toList(this.storage.scanVersions(rowId));
        Assertions.assertEquals(arrayList.size(), list.size());
        for (int i = 0; i < list.size(); i += PARTITION_ID) {
            IgniteBiTuple<BaseMvStoragesTest.TestKey, BaseMvStoragesTest.TestValue> igniteBiTuple = list.get(i);
            Assertions.assertEquals(this.key, igniteBiTuple.getKey());
            Assertions.assertEquals(arrayList.get(i), igniteBiTuple.getValue());
        }
    }

    @EnumSource(ScanTimestampProvider.class)
    @ParameterizedTest
    void testScanWithWriteIntent(ScanTimestampProvider scanTimestampProvider) throws Exception {
        HybridTimestamp addCommittedVersionAndWriteIntent = addCommittedVersionAndWriteIntent();
        PartitionTimestampCursor scan = this.storage.scan(scanTimestampProvider.scanTimestamp(this.clock));
        try {
            Assertions.assertTrue(scan.hasNext());
            ReadResult readResult = (ReadResult) scan.next();
            Assertions.assertTrue(readResult.isWriteIntent());
            assertRowMatches(readResult.binaryRow(), this.binaryRow2);
            assertRowMatches(scan.committed(addCommittedVersionAndWriteIntent), this.binaryRow);
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private HybridTimestamp addCommittedVersionAndWriteIntent() {
        RowId rowId = new RowId(PARTITION_ID);
        HybridTimestamp now = this.clock.now();
        this.storage.runConsistently(() -> {
            addWrite(rowId, this.binaryRow, newTransactionId());
            commitWrite(rowId, now);
            addWrite(rowId, this.binaryRow2, newTransactionId());
            return null;
        });
        return now;
    }

    @Test
    void testScanVersionsWithWriteIntent() throws Exception {
        RowId rowId = new RowId(PARTITION_ID, 100L, 0L);
        addWrite(rowId, binaryRow(this.key, this.value), newTransactionId());
        commitWrite(rowId, this.clock.now());
        addWrite(rowId, binaryRow(this.key, this.value2), newTransactionId());
        List.of(new RowId(PARTITION_ID, 99L, 0L), new RowId(PARTITION_ID, 101L, 0L)).forEach(rowId2 -> {
            addWrite(rowId2, binaryRow(this.key, this.value), newTransactionId());
            commitWrite(rowId2, this.clock.now());
        });
        List<IgniteBiTuple<BaseMvStoragesTest.TestKey, BaseMvStoragesTest.TestValue>> list = toList(this.storage.scanVersions(rowId));
        Assertions.assertEquals(2, list.size());
        Iterator<IgniteBiTuple<BaseMvStoragesTest.TestKey, BaseMvStoragesTest.TestValue>> it = list.iterator();
        while (it.hasNext()) {
            Assertions.assertEquals(this.key, it.next().getKey());
        }
        Assertions.assertEquals(this.value2, list.get(0).getValue());
        Assertions.assertEquals(this.value, list.get(PARTITION_ID).getValue());
    }

    @Test
    void testClosestRowId() {
        RowId rowId = new RowId(PARTITION_ID, 1L, -1L);
        RowId rowId2 = new RowId(PARTITION_ID, 1L, 0L);
        RowId rowId3 = new RowId(PARTITION_ID, 1L, 1L);
        addWrite(rowId2, this.binaryRow, this.txId);
        addWrite(rowId3, this.binaryRow2, this.txId);
        Assertions.assertEquals(rowId2, this.storage.closestRowId(rowId));
        Assertions.assertEquals(rowId2, this.storage.closestRowId(rowId.increment()));
        Assertions.assertEquals(rowId2, this.storage.closestRowId(rowId2));
        Assertions.assertEquals(rowId3, this.storage.closestRowId(rowId3));
        Assertions.assertNull(this.storage.closestRowId(rowId3.increment()));
    }

    @Test
    public void addWriteCommittedAddsCommittedVersion() {
        RowId rowId = new RowId(PARTITION_ID);
        addWriteCommitted(rowId, this.binaryRow, this.clock.now());
        assertRowMatches(this.storage.read(rowId, this.clock.now()).binaryRow(), this.binaryRow);
    }

    @Test
    public void addWriteCommittedLeavesExistingCommittedVersionsUntouched() {
        RowId rowId = new RowId(PARTITION_ID);
        HybridTimestamp now = this.clock.now();
        addWriteCommitted(rowId, this.binaryRow, now);
        addWriteCommitted(rowId, this.binaryRow2, this.clock.now());
        assertRowMatches(this.storage.read(rowId, this.clock.now()).binaryRow(), this.binaryRow2);
        assertRowMatches(this.storage.read(rowId, now).binaryRow(), this.binaryRow);
    }

    @Test
    public void addWriteCommittedThrowsIfUncommittedVersionExists() {
        RowId insert = insert(this.binaryRow, this.txId);
        MatcherAssert.assertThat(Assertions.assertThrows(StorageException.class, () -> {
            addWriteCommitted(insert, this.binaryRow2, this.clock.now());
        }).getMessage(), Matchers.is("Write intent exists for " + insert));
    }

    @Test
    public void scanVersionsReturnsUncommittedVersionsAsUncommitted() throws Exception {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        addWrite(insert, this.binaryRow2, newTransactionId());
        Cursor scanVersions = this.storage.scanVersions(insert);
        try {
            ReadResult readResult = (ReadResult) scanVersions.next();
            Assertions.assertTrue(readResult.isWriteIntent());
            MatcherAssert.assertThat(Integer.valueOf(readResult.commitPartitionId()), Matchers.is(Matchers.not(-1)));
            MatcherAssert.assertThat(readResult.commitTableId(), Matchers.is(Matchers.notNullValue()));
            MatcherAssert.assertThat(readResult.transactionId(), Matchers.is(Matchers.notNullValue()));
            MatcherAssert.assertThat(readResult.commitTimestamp(), Matchers.is(Matchers.nullValue()));
            MatcherAssert.assertThat(readResult.newestCommitTimestamp(), Matchers.is(Matchers.nullValue()));
            if (scanVersions != null) {
                scanVersions.close();
            }
        } catch (Throwable th) {
            if (scanVersions != null) {
                try {
                    scanVersions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @Test
    public void scanVersionsReturnsCommittedVersionsAsCommitted() throws Exception {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        Cursor scanVersions = this.storage.scanVersions(insert);
        try {
            ReadResult readResult = (ReadResult) scanVersions.next();
            Assertions.assertFalse(readResult.isWriteIntent());
            MatcherAssert.assertThat(Integer.valueOf(readResult.commitPartitionId()), Matchers.is(-1));
            MatcherAssert.assertThat(readResult.commitTableId(), Matchers.is(Matchers.nullValue()));
            MatcherAssert.assertThat(readResult.transactionId(), Matchers.is(Matchers.nullValue()));
            MatcherAssert.assertThat(readResult.commitTimestamp(), Matchers.is(Matchers.notNullValue()));
            MatcherAssert.assertThat(readResult.newestCommitTimestamp(), Matchers.is(Matchers.nullValue()));
            if (scanVersions != null) {
                scanVersions.close();
            }
        } catch (Throwable th) {
            if (scanVersions != null) {
                try {
                    scanVersions.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(ScanTimestampProvider.class)
    @ParameterizedTest
    public void scanCursorHasNextReturnsFalseEachTimeAfterExhaustion(ScanTimestampProvider scanTimestampProvider) throws Exception {
        commitWrite(insert(this.binaryRow, this.txId), this.clock.now());
        PartitionTimestampCursor scan = scan(scanTimestampProvider.scanTimestamp(this.clock));
        try {
            scan.next();
            Assertions.assertFalse(scan.hasNext());
            Assertions.assertFalse(scan.hasNext());
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(ScanTimestampProvider.class)
    @ParameterizedTest
    public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider scanTimestampProvider) throws Exception {
        RowId insert = insert(this.binaryRow, this.txId);
        HybridTimestamp now = this.clock.now();
        commitWrite(insert, now);
        addWrite(insert, null, newTransactionId());
        PartitionTimestampCursor scan = scan(scanTimestampProvider.scanTimestamp(this.clock));
        try {
            Assertions.assertTrue(scan.hasNext());
            ReadResult readResult = (ReadResult) scan.next();
            Assertions.assertNull(readResult.binaryRow());
            Assertions.assertEquals(now, readResult.newestCommitTimestamp());
            Assertions.assertFalse(scan.hasNext());
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(ScanTimestampProvider.class)
    @ParameterizedTest
    public void scanDoesNotSeeTombstonesWhenTombstoneIsCommitted(ScanTimestampProvider scanTimestampProvider) throws Exception {
        RowId insert = insert(this.binaryRow, this.txId);
        commitWrite(insert, this.clock.now());
        addWrite(insert, null, newTransactionId());
        commitWrite(insert, this.clock.now());
        PartitionTimestampCursor scan = scan(scanTimestampProvider.scanTimestamp(this.clock));
        try {
            Assertions.assertFalse(scan.hasNext());
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    @EnumSource(ScanTimestampProvider.class)
    @ParameterizedTest
    void committedMethodCallDoesNotInterfereWithIteratingOverScanCursor(ScanTimestampProvider scanTimestampProvider) throws Exception {
        RowId insert = insert(this.binaryRow, this.txId);
        HybridTimestamp now = this.clock.now();
        commitWrite(insert, now);
        insert(this.binaryRow2, this.txId);
        PartitionTimestampCursor scan = scan(scanTimestampProvider.scanTimestamp(this.clock));
        try {
            scan.next();
            scan.committed(now);
            assertRowMatches(((ReadResult) scan.next()).binaryRow(), this.binaryRow2);
            Assertions.assertFalse(scan.hasNext());
            if (scan != null) {
                scan.close();
            }
        } catch (Throwable th) {
            if (scan != null) {
                try {
                    scan.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private RowId decrement(RowId rowId) {
        long mostSignificantBits = rowId.mostSignificantBits();
        long leastSignificantBits = rowId.leastSignificantBits() - 1;
        if (leastSignificantBits == Long.MAX_VALUE) {
            long j = mostSignificantBits - 1;
            mostSignificantBits = j;
            if (j == Long.MAX_VALUE) {
                throw new IllegalArgumentException();
            }
        }
        return new RowId(rowId.partitionId(), mostSignificantBits, leastSignificantBits);
    }
}
