package org.apache.ignite.internal.cdc;

import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.binary.BinaryType;
import org.apache.ignite.cache.query.annotations.QuerySqlField;
import org.apache.ignite.cdc.AbstractCdcTest;
import org.apache.ignite.cdc.CdcCacheEvent;
import org.apache.ignite.cdc.CdcConsumer;
import org.apache.ignite.cdc.CdcEvent;
import org.apache.ignite.cdc.TypeMapping;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.DataRegionConfiguration;
import org.apache.ignite.configuration.DataStorageConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.processors.cache.GridCacheContext;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.util.KillCommandsTests;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/cdc/CdcIndexRebuildTest.class */
public class CdcIndexRebuildTest extends AbstractCdcTest {
    public static final int VALS_CNT = 30720;

    /* loaded from: input_file:org/apache/ignite/internal/cdc/CdcIndexRebuildTest$CountingCdcConsumer.class */
    public static class CountingCdcConsumer implements CdcConsumer {
        volatile boolean commited;
        volatile int evtsCnt;

        public void reset() {
            this.commited = false;
            this.evtsCnt = 0;
        }

        public void start(MetricRegistry metricRegistry) {
        }

        public boolean onEvents(Iterator<CdcEvent> it) {
            while (it.hasNext()) {
                it.next();
                this.evtsCnt++;
            }
            this.commited = true;
            return true;
        }

        public void onTypes(Iterator<BinaryType> it) {
            it.forEachRemaining(binaryType -> {
            });
        }

        public void onMappings(Iterator<TypeMapping> it) {
            it.forEachRemaining(typeMapping -> {
            });
        }

        public void onCacheChange(Iterator<CdcCacheEvent> it) {
            it.forEachRemaining(cdcCacheEvent -> {
            });
        }

        public void onCacheDestroy(Iterator<Integer> it) {
            it.forEachRemaining(num -> {
            });
        }

        public void stop() {
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/cdc/CdcIndexRebuildTest$TestVal.class */
    public static class TestVal {

        @QuerySqlField(index = true, inlineSize = 256)
        private final String f0 = UUID.randomUUID().toString();

        @QuerySqlField(index = true, inlineSize = 256)
        private final String f1 = UUID.randomUUID().toString();
    }

    protected IgniteConfiguration getConfiguration(String str) throws Exception {
        return super.getConfiguration(str).setDataStorageConfiguration(new DataStorageConfiguration().setWalForceArchiveTimeout(5000L).setDefaultDataRegionConfiguration(new DataRegionConfiguration().setPersistenceEnabled(true).setCdcEnabled(true)).setWalSegmentSize(16777216));
    }

    @Test
    public void testIndexRebuild() throws Exception {
        IgniteEx startGrid = startGrid(0);
        startGrid.cluster().state(ClusterState.ACTIVE);
        IgniteCache orCreateCache = startGrid.getOrCreateCache(new CacheConfiguration(KillCommandsTests.DEFAULT_CACHE_NAME).setIndexedTypes(new Class[]{Integer.class, TestVal.class}));
        HashMap hashMap = new HashMap(VALS_CNT);
        for (int i = 0; i < 30720; i++) {
            hashMap.put(Integer.valueOf(i), new TestVal());
        }
        orCreateCache.putAll(hashMap);
        CountingCdcConsumer countingCdcConsumer = new CountingCdcConsumer();
        CdcMain createCdc = createCdc(countingCdcConsumer, getConfiguration(startGrid.name()));
        IgniteInternalFuture runAsync = GridTestUtils.runAsync(createCdc);
        assertTrue(GridTestUtils.waitForCondition(() -> {
            return countingCdcConsumer.commited && countingCdcConsumer.evtsCnt == 30720;
        }, getTestTimeout()));
        waitForWalSegmentsHandling(startGrid, createCdc);
        countingCdcConsumer.reset();
        forceRebuildIndexes(startGrid, new GridCacheContext[]{startGrid.cachex(KillCommandsTests.DEFAULT_CACHE_NAME).context()});
        indexRebuildFuture(startGrid, GridCacheUtils.cacheId(KillCommandsTests.DEFAULT_CACHE_NAME)).get(getTestTimeout());
        waitForWalSegmentsHandling(startGrid, createCdc);
        runAsync.cancel();
    }

    private void waitForWalSegmentsHandling(IgniteEx igniteEx, CdcMain cdcMain) throws IgniteInterruptedCheckedException {
        assertTrue("Wal segments was not committed by CdcConsumer", GridTestUtils.waitForCondition(() -> {
            long lastArchivedSegment = igniteEx.context().cache().context().wal().lastArchivedSegment();
            MetricRegistry metricRegistry = (MetricRegistry) GridTestUtils.getFieldValue(cdcMain, new String[]{"mreg"});
            long value = metricRegistry.findMetric("CommittedSegmentIndex").value();
            log.warning(String.format(">>>>>> Information about CDC and WAL: [lastArchivedSeg=%d, committedCdcSegIdx=%d, curCdcSegIdx=%d]", Long.valueOf(lastArchivedSegment), Long.valueOf(value), Long.valueOf(metricRegistry.findMetric("CurrentSegmentIndex").value())));
            return lastArchivedSegment == value;
        }, 20000L, 2500L));
    }
}
