package org.apache.ignite.cdc;

import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cdc.AbstractCdcTest;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.GridKernalContext;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.pagemem.wal.IgniteWriteAheadLogManager;
import org.apache.ignite.internal.pagemem.wal.record.DataEntry;
import org.apache.ignite.internal.pagemem.wal.record.DataRecord;
import org.apache.ignite.internal.pagemem.wal.record.WALRecord;
import org.apache.ignite.internal.processors.cache.CacheConflictResolutionManager;
import org.apache.ignite.internal.processors.cache.CacheObjectImpl;
import org.apache.ignite.internal.processors.cache.CacheObjectValueContext;
import org.apache.ignite.internal.processors.cache.GridCacheManagerAdapter;
import org.apache.ignite.internal.processors.cache.IgniteInternalCache;
import org.apache.ignite.internal.processors.cache.KeyCacheObjectImpl;
import org.apache.ignite.internal.processors.cache.dr.GridCacheDrInfo;
import org.apache.ignite.internal.processors.cache.persistence.standbycluster.AbstractNodeJoinTemplate;
import org.apache.ignite.internal.processors.cache.persistence.wal.FileWriteAheadLogManager;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.cache.version.CacheVersionConflictResolver;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersion;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionConflictContext;
import org.apache.ignite.internal.processors.cache.version.GridCacheVersionedEntryEx;
import org.apache.ignite.internal.util.typedef.internal.CU;
import org.apache.ignite.plugin.AbstractCachePluginProvider;
import org.apache.ignite.plugin.AbstractTestPluginProvider;
import org.apache.ignite.plugin.CachePluginContext;
import org.apache.ignite.plugin.CachePluginProvider;
import org.apache.ignite.plugin.PluginContext;
import org.apache.ignite.plugin.PluginProvider;
import org.apache.ignite.spi.systemview.view.CacheView;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.transactions.Transaction;
import org.apache.ignite.transactions.TransactionConcurrency;
import org.apache.ignite.transactions.TransactionIsolation;
import org.jetbrains.annotations.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/ignite/cdc/CdcCacheVersionTest.class */
public class CdcCacheVersionTest extends AbstractCdcTest {
    public static final byte DFLT_CLUSTER_ID = 1;
    public static final byte OTHER_CLUSTER_ID = 2;
    public static final int KEY_TO_UPD = 42;
    public static final String NOT_CDC = "not-cdc";
    public static final String CDC = "cdc";

    @Parameterized.Parameter
    public CacheAtomicityMode atomicityMode;

    @Parameterized.Parameter(1)
    public CacheMode cacheMode;

    @Parameterized.Parameter(2)
    public int gridCnt;

    @Parameterized.Parameter(3)
    public boolean persistenceEnabled;
    private final AtomicLong walRecCheckedCntr = new AtomicLong();
    private final AtomicLong conflictCheckedCntr = new AtomicLong();
    private volatile Function<GridKernalContext, IgniteWriteAheadLogManager> walProvider;
    private volatile Supplier<CacheVersionConflictResolver> conflictResolutionMgrSupplier;

    /* loaded from: input_file:org/apache/ignite/cdc/CdcCacheVersionTest$TestCacheConflictResolutionManager.class */
    public class TestCacheConflictResolutionManager<K, V> extends GridCacheManagerAdapter<K, V> implements CacheConflictResolutionManager<K, V> {
        public TestCacheConflictResolutionManager() {
        }

        public CacheVersionConflictResolver conflictResolver() {
            return (CacheVersionConflictResolver) CdcCacheVersionTest.this.conflictResolutionMgrSupplier.get();
        }
    }

    @Parameterized.Parameters(name = "atomicity={0}, mode={1}, gridCnt={2}, persistenceEnabled={3}")
    public static Collection<?> parameters() {
        ArrayList arrayList = new ArrayList();
        Iterator it = EnumSet.of(CacheAtomicityMode.ATOMIC, CacheAtomicityMode.TRANSACTIONAL).iterator();
        while (it.hasNext()) {
            CacheAtomicityMode cacheAtomicityMode = (CacheAtomicityMode) it.next();
            Iterator it2 = EnumSet.of(CacheMode.PARTITIONED, CacheMode.REPLICATED).iterator();
            while (it2.hasNext()) {
                CacheMode cacheMode = (CacheMode) it2.next();
                for (int i : new int[]{1, 3}) {
                    for (boolean z : new boolean[]{false, true}) {
                        arrayList.add(new Object[]{cacheAtomicityMode, cacheMode, Integer.valueOf(i), Boolean.valueOf(z)});
                    }
                }
            }
        }
        return arrayList;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setDataStorageConfiguration(new DataStorageConfiguration().setWalForceArchiveTimeout(5000L).setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(this.persistenceEnabled)).setDataRegionConfigurations(new DataRegionConfiguration[]{new DataRegionConfiguration().setName(CDC).setPersistenceEnabled(this.persistenceEnabled).setCdcEnabled(true), new DataRegionConfiguration().setName(NOT_CDC).setPersistenceEnabled(false).setCdcEnabled(false)}));
        configuration.setPluginProviders(new PluginProvider[]{new AbstractTestPluginProvider() { // from class: org.apache.ignite.cdc.CdcCacheVersionTest.1
            public String name() {
                return "ConflictResolverProvider";
            }

            @Override // org.apache.ignite.plugin.AbstractTestPluginProvider
            public CachePluginProvider createCacheProvider(CachePluginContext cachePluginContext) {
                if (cachePluginContext.igniteCacheConfiguration().getName().equals("default")) {
                    return new AbstractCachePluginProvider() { // from class: org.apache.ignite.cdc.CdcCacheVersionTest.1.1
                        @Override // org.apache.ignite.plugin.AbstractCachePluginProvider
                        @Nullable
                        public Object createComponent(Class cls) {
                            if (cls != CacheConflictResolutionManager.class || CdcCacheVersionTest.this.conflictResolutionMgrSupplier == null) {
                                return null;
                            }
                            return new TestCacheConflictResolutionManager();
                        }
                    };
                }
                return null;
            }

            @Override // org.apache.ignite.plugin.AbstractTestPluginProvider
            @Nullable
            public <T> T createComponent(PluginContext pluginContext, Class<T> cls) {
                if (IgniteWriteAheadLogManager.class.equals(cls)) {
                    return (T) CdcCacheVersionTest.this.walProvider.apply(pluginContext.grid().context());
                }
                return null;
            }
        }});
        return configuration;
    }

    @Test
    public void testConflictVersionWritten() throws Exception {
        this.walProvider = gridKernalContext -> {
            return new FileWriteAheadLogManager(gridKernalContext) { // from class: org.apache.ignite.cdc.CdcCacheVersionTest.2
                public WALPointer log(WALRecord wALRecord) throws IgniteCheckedException {
                    if (wALRecord.type() != WALRecord.RecordType.DATA_RECORD_V2) {
                        return super.log(wALRecord);
                    }
                    DataRecord dataRecord = (DataRecord) wALRecord;
                    for (int i = 0; i < dataRecord.entryCount(); i++) {
                        DataEntry dataEntry = (DataEntry) dataRecord.writeEntries().get(i);
                        CdcCacheVersionTest.assertEquals(CU.cacheId("default"), dataEntry.cacheId());
                        CdcCacheVersionTest.assertEquals((byte) 1, dataEntry.writeVersion().dataCenterId());
                        CdcCacheVersionTest.assertNotNull(dataEntry.writeVersion().conflictVersion());
                        CdcCacheVersionTest.assertEquals((byte) 2, dataEntry.writeVersion().conflictVersion().dataCenterId());
                        CdcCacheVersionTest.this.walRecCheckedCntr.incrementAndGet();
                    }
                    return super.log(wALRecord);
                }
            };
        };
        this.conflictResolutionMgrSupplier = () -> {
            return new CacheVersionConflictResolver() { // from class: org.apache.ignite.cdc.CdcCacheVersionTest.3
                public <K1, V1> GridCacheVersionConflictContext<K1, V1> resolve(CacheObjectValueContext cacheObjectValueContext, GridCacheVersionedEntryEx<K1, V1> gridCacheVersionedEntryEx, GridCacheVersionedEntryEx<K1, V1> gridCacheVersionedEntryEx2, boolean z) {
                    GridCacheVersionConflictContext<K1, V1> gridCacheVersionConflictContext = new GridCacheVersionConflictContext<>(cacheObjectValueContext, gridCacheVersionedEntryEx, gridCacheVersionedEntryEx2);
                    gridCacheVersionConflictContext.useNew();
                    CdcCacheVersionTest.assertEquals((byte) 2, gridCacheVersionedEntryEx2.version().dataCenterId());
                    if (!gridCacheVersionedEntryEx.isStartVersion()) {
                        CdcCacheVersionTest.assertEquals((byte) 2, gridCacheVersionedEntryEx.version().dataCenterId());
                    }
                    CdcCacheVersionTest.this.conflictCheckedCntr.incrementAndGet();
                    return gridCacheVersionConflictContext;
                }

                public String toString() {
                    return "TestCacheConflictResolutionManager";
                }
            };
        };
        startGrids(this.gridCnt);
        IgniteEx startClientGrid = startClientGrid(this.gridCnt);
        for (int i = 0; i < this.gridCnt; i++) {
            grid(i).context().cache().context().versions().dataCenterId((byte) 1);
            assertEquals(1, grid(i).context().metric().registry("cache").findMetric("DataVersionClusterId").value());
        }
        startClientGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache<Integer, AbstractCdcTest.User> orCreateCache = startClientGrid.getOrCreateCache(new CacheConfiguration("default").setCacheMode(this.cacheMode).setAtomicityMode(this.atomicityMode).setDataRegionName(CDC).setBackups(Integer.MAX_VALUE));
        if (this.atomicityMode == CacheAtomicityMode.ATOMIC) {
            putRemoveCheck(startClientGrid, orCreateCache, null, null);
        } else {
            putRemoveCheck(startClientGrid, orCreateCache, null, null);
            for (TransactionConcurrency transactionConcurrency : TransactionConcurrency.values()) {
                for (TransactionIsolation transactionIsolation : TransactionIsolation.values()) {
                    putRemoveCheck(startClientGrid, orCreateCache, transactionConcurrency, transactionIsolation);
                }
            }
        }
        for (int i2 = 0; i2 < this.gridCnt; i2++) {
            boolean z = false;
            assertFalse(grid(i2).context().clientNode());
            for (CacheView cacheView : grid(i2).context().systemView().view(AbstractNodeJoinTemplate.CACHES)) {
                if (cacheView.cacheName().equals("default")) {
                    assertEquals(cacheView.conflictResolver(), "TestCacheConflictResolutionManager");
                    z = true;
                } else {
                    assertNull(cacheView.conflictResolver());
                }
            }
            assertTrue(z);
        }
    }

    private void putRemoveCheck(IgniteEx igniteEx, IgniteCache<Integer, AbstractCdcTest.User> igniteCache, TransactionConcurrency transactionConcurrency, TransactionIsolation transactionIsolation) throws Exception {
        this.conflictCheckedCntr.set(0L);
        this.walRecCheckedCntr.set(0L);
        addConflictData(igniteEx, igniteCache, 0, 50, transactionConcurrency, transactionIsolation);
        checkResolverAndWal();
        addConflictData(igniteEx, igniteCache, 0, 50, transactionConcurrency, transactionIsolation);
        checkResolverAndWal();
        removeConflictData(igniteEx, igniteCache, 0, 50, transactionConcurrency, transactionIsolation);
        checkResolverAndWal();
    }

    private void checkResolverAndWal() throws IgniteInterruptedCheckedException {
        long j = this.atomicityMode == CacheAtomicityMode.ATOMIC ? 50L : 50 * this.gridCnt;
        if (!GridTestUtils.waitForCondition(() -> {
            return this.conflictCheckedCntr.get() == j;
        }, 5000L)) {
            fail("Expected " + j + " but was " + this.conflictCheckedCntr.get());
        }
        long j2 = 50 * this.gridCnt;
        if (!GridTestUtils.waitForCondition(() -> {
            return this.walRecCheckedCntr.get() == j2;
        }, 5000L)) {
            fail("Expected " + j2 + " but was " + this.walRecCheckedCntr.get());
        }
        this.conflictCheckedCntr.set(0L);
        this.walRecCheckedCntr.set(0L);
    }

    @Test
    public void testOrderIncrease() throws Exception {
        this.walProvider = gridKernalContext -> {
            return new FileWriteAheadLogManager(gridKernalContext) { // from class: org.apache.ignite.cdc.CdcCacheVersionTest.4
                private long prevOrder = -1;

                public WALPointer log(WALRecord wALRecord) throws IgniteCheckedException {
                    if (wALRecord.type() != WALRecord.RecordType.DATA_RECORD_V2) {
                        return super.log(wALRecord);
                    }
                    DataRecord dataRecord = (DataRecord) wALRecord;
                    for (int i = 0; i < dataRecord.entryCount(); i++) {
                        CdcCacheVersionTest.assertEquals(CU.cacheId("default"), dataRecord.get(i).cacheId());
                        CdcCacheVersionTest.assertEquals(42, ((Integer) dataRecord.get(i).key().value((CacheObjectValueContext) null, false)).intValue());
                        CdcCacheVersionTest.assertTrue(dataRecord.get(i).writeVersion().order() > this.prevOrder);
                        this.prevOrder = dataRecord.get(i).writeVersion().order();
                        CdcCacheVersionTest.this.walRecCheckedCntr.incrementAndGet();
                    }
                    return super.log(wALRecord);
                }
            };
        };
        IgniteEx startGrid = startGrid(getConfiguration("ignite-0"));
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache orCreateCache = startGrid.getOrCreateCache(new CacheConfiguration("default").setAtomicityMode(this.atomicityMode).setDataRegionName(CDC).setCacheMode(this.cacheMode));
        IgniteCache orCreateCache2 = startGrid.getOrCreateCache(new CacheConfiguration(NOT_CDC).setDataRegionName(NOT_CDC));
        this.walRecCheckedCntr.set(0L);
        for (int i = 0; i < 50; i++) {
            orCreateCache.put(42, createUser(i));
            orCreateCache2.put(42, createUser(i));
        }
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return this.walRecCheckedCntr.get() == 50;
        }, getTestTimeout()));
    }

    private void addConflictData(IgniteEx igniteEx, IgniteCache<Integer, AbstractCdcTest.User> igniteCache, int i, int i2, TransactionConcurrency transactionConcurrency, TransactionIsolation transactionIsolation) {
        try {
            IgniteInternalCache cachex = igniteEx.cachex(igniteCache.getName());
            HashMap hashMap = new HashMap();
            for (int i3 = i; i3 < i2; i3++) {
                KeyCacheObjectImpl keyCacheObjectImpl = new KeyCacheObjectImpl(Integer.valueOf(i3), (byte[]) null, cachex.affinity().partition(Integer.valueOf(i3)));
                CacheObjectImpl cacheObjectImpl = new CacheObjectImpl(createUser(i3), (byte[]) null);
                cacheObjectImpl.prepareMarshal(cachex.context().cacheObjectContext());
                hashMap.put(keyCacheObjectImpl, new GridCacheDrInfo(cacheObjectImpl, new GridCacheVersion(1, i3, 1, 2)));
            }
            if (transactionConcurrency != null) {
                Transaction txStart = igniteEx.transactions().txStart(transactionConcurrency, transactionIsolation);
                Throwable th = null;
                try {
                    try {
                        cachex.putAllConflict(hashMap);
                        txStart.commit();
                        if (txStart != null) {
                            if (0 != 0) {
                                try {
                                    txStart.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                txStart.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } else {
                cachex.putAllConflict(hashMap);
            }
        } catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
    }

    private void removeConflictData(IgniteEx igniteEx, IgniteCache<Integer, AbstractCdcTest.User> igniteCache, int i, int i2, TransactionConcurrency transactionConcurrency, TransactionIsolation transactionIsolation) {
        try {
            IgniteInternalCache cachex = igniteEx.cachex(igniteCache.getName());
            HashMap hashMap = new HashMap();
            for (int i3 = i; i3 < i2; i3++) {
                hashMap.put(new KeyCacheObjectImpl(Integer.valueOf(i3), (byte[]) null, cachex.affinity().partition(Integer.valueOf(i3))), new GridCacheVersion(1, i3, 1, 2));
            }
            if (transactionConcurrency != null) {
                Transaction txStart = igniteEx.transactions().txStart(transactionConcurrency, transactionIsolation);
                Throwable th = null;
                try {
                    try {
                        cachex.removeAllConflict(hashMap);
                        txStart.commit();
                        if (txStart != null) {
                            if (0 != 0) {
                                try {
                                    txStart.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                txStart.close();
                            }
                        }
                    } finally {
                    }
                } finally {
                }
            } else {
                cachex.removeAllConflict(hashMap);
            }
        } catch (IgniteCheckedException e) {
            throw new IgniteException(e);
        }
    }
}
