package org.apache.ignite.cdc;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import javax.management.DynamicMBean;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.IgniteException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.cdc.CdcConsumerState;
import org.apache.ignite.internal.cdc.CdcMain;
import org.apache.ignite.internal.processors.cache.GridCacheUtils;
import org.apache.ignite.internal.processors.cache.persistence.wal.WALPointer;
import org.apache.ignite.internal.processors.metric.MetricRegistry;
import org.apache.ignite.internal.util.lang.GridAbsPredicate;
import org.apache.ignite.internal.util.typedef.CI3;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.T2;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.metric.MetricExporterSpi;
import org.apache.ignite.spi.metric.jmx.JmxMetricExporterSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/* loaded from: input_file:org/apache/ignite/cdc/AbstractCdcTest.class */
public abstract class AbstractCdcTest extends GridCommonAbstractTest {
    public static final String JOHN = "John Connor";
    public static final int WAL_ARCHIVE_TIMEOUT = 5000;
    public static final int KEYS_CNT = 50;

    /* loaded from: input_file:org/apache/ignite/cdc/AbstractCdcTest$ChangeEventType.class */
    public enum ChangeEventType {
        UPDATE,
        DELETE
    }

    /* loaded from: input_file:org/apache/ignite/cdc/AbstractCdcTest$TestCdcConsumer.class */
    public static abstract class TestCdcConsumer<T> implements CdcConsumer {
        final ConcurrentMap<IgniteBiTuple<ChangeEventType, Integer>, List<T>> data = new ConcurrentHashMap();
        private volatile boolean stopped;

        public void start(MetricRegistry metricRegistry) {
            this.stopped = false;
        }

        public void stop() {
            this.stopped = true;
        }

        public boolean onEvents(Iterator<CdcEvent> it) {
            it.forEachRemaining(cdcEvent -> {
                if (cdcEvent.primary()) {
                    this.data.computeIfAbsent(F.t(cdcEvent.value() == null ? ChangeEventType.DELETE : ChangeEventType.UPDATE, Integer.valueOf(cdcEvent.cacheId())), igniteBiTuple -> {
                        return new ArrayList();
                    }).add(extract(cdcEvent));
                    checkEvent(cdcEvent);
                }
            });
            return commit();
        }

        public abstract void checkEvent(CdcEvent cdcEvent);

        public abstract T extract(CdcEvent cdcEvent);

        protected boolean commit() {
            return true;
        }

        public List<T> data(ChangeEventType changeEventType, int i) {
            return this.data.get(F.t(changeEventType, Integer.valueOf(i)));
        }

        public boolean stopped() {
            return this.stopped;
        }
    }

    /* loaded from: input_file:org/apache/ignite/cdc/AbstractCdcTest$User.class */
    public static class User {
        private final String name;
        private final int age;
        private final byte[] payload;

        public User(String str, int i, byte[] bArr) {
            this.name = str;
            this.age = i;
            this.payload = bArr;
        }

        public String getName() {
            return this.name;
        }

        public int getAge() {
            return this.age;
        }

        public byte[] getPayload() {
            return this.payload;
        }
    }

    /* loaded from: input_file:org/apache/ignite/cdc/AbstractCdcTest$UserCdcConsumer.class */
    public static class UserCdcConsumer extends TestCdcConsumer<Integer> {
        @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
        public void checkEvent(CdcEvent cdcEvent) {
            AbstractCdcTest.assertNull(cdcEvent.version().otherClusterVersion());
            if (cdcEvent.value() == null) {
                return;
            }
            User user = (User) cdcEvent.value();
            AbstractCdcTest.assertTrue(user.getName().startsWith(AbstractCdcTest.JOHN));
            AbstractCdcTest.assertTrue(user.getAge() >= 42);
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.ignite.cdc.AbstractCdcTest.TestCdcConsumer
        public Integer extract(CdcEvent cdcEvent) {
            return (Integer) cdcEvent.key();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.common.GridCommonAbstractTest, org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTest() throws Exception {
        stopAllGrids();
        cleanPersistenceDir();
        super.beforeTest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTestsStopped() throws Exception {
        super.afterTestsStopped();
        stopAllGrids();
        cleanPersistenceDir();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CdcMain createCdc(CdcConsumer cdcConsumer, IgniteConfiguration igniteConfiguration) {
        return createCdc(cdcConsumer, igniteConfiguration, null, new GridAbsPredicate[0]);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CdcMain createCdc(CdcConsumer cdcConsumer, IgniteConfiguration igniteConfiguration, final CountDownLatch countDownLatch, final GridAbsPredicate... gridAbsPredicateArr) {
        CdcConfiguration cdcConfiguration = new CdcConfiguration();
        cdcConfiguration.setConsumer(cdcConsumer);
        cdcConfiguration.setKeepBinary(keepBinary());
        cdcConfiguration.setMetricExporterSpi(new MetricExporterSpi[]{new JmxMetricExporterSpi()});
        return new CdcMain(igniteConfiguration, null, cdcConfiguration) { // from class: org.apache.ignite.cdc.AbstractCdcTest.1
            protected CdcConsumerState createState(Path path) {
                return new CdcConsumerState(path) { // from class: org.apache.ignite.cdc.AbstractCdcTest.1.1
                    public void save(T2<WALPointer, Integer> t2) throws IOException {
                        super.save(t2);
                        if (F.isEmpty(gridAbsPredicateArr)) {
                            return;
                        }
                        for (GridAbsPredicate gridAbsPredicate : gridAbsPredicateArr) {
                            if (!gridAbsPredicate.apply()) {
                                return;
                            }
                        }
                        countDownLatch.countDown();
                    }
                };
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void addAndWaitForConsumption(UserCdcConsumer userCdcConsumer, IgniteConfiguration igniteConfiguration, IgniteCache<Integer, User> igniteCache, IgniteCache<Integer, User> igniteCache2, CI3<IgniteCache<Integer, User>, Integer, Integer> ci3, int i, int i2, boolean z) throws Exception {
        GridAbsPredicate sizePredicate = sizePredicate(i2 - i, igniteCache.getName(), ChangeEventType.UPDATE, userCdcConsumer);
        GridAbsPredicate sizePredicate2 = igniteCache2 == null ? null : sizePredicate(i2 - i, igniteCache2.getName(), ChangeEventType.UPDATE, userCdcConsumer);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        CdcMain createCdc = z ? igniteCache2 == null ? createCdc(userCdcConsumer, igniteConfiguration, countDownLatch, sizePredicate) : createCdc(userCdcConsumer, igniteConfiguration, countDownLatch, sizePredicate, sizePredicate2) : createCdc(userCdcConsumer, igniteConfiguration);
        IgniteInternalFuture runAsync = GridTestUtils.runAsync((Runnable) createCdc);
        ci3.apply(igniteCache, Integer.valueOf(i), Integer.valueOf(i2));
        if (igniteCache2 != null) {
            ci3.apply(igniteCache2, Integer.valueOf(i), Integer.valueOf(i2));
        }
        if (z) {
            countDownLatch.await(getTestTimeout(), TimeUnit.MILLISECONDS);
        } else {
            assertTrue(GridTestUtils.waitForCondition(sizePredicate, getTestTimeout()));
            if (igniteCache2 != null) {
                assertTrue(GridTestUtils.waitForCondition(sizePredicate2, getTestTimeout()));
            }
        }
        checkMetrics(createCdc, igniteCache2 == null ? i2 : i2 * 2);
        runAsync.cancel();
        List<Integer> data = userCdcConsumer.data(ChangeEventType.UPDATE, GridCacheUtils.cacheId(igniteCache.getName()));
        assertEquals(i2 - i, data.size());
        for (int i3 = i; i3 < i2; i3++) {
            assertTrue(Integer.toString(i3), data.contains(Integer.valueOf(i3)));
        }
        assertTrue(userCdcConsumer.stopped());
    }

    public void waitForSize(int i, String str, ChangeEventType changeEventType, TestCdcConsumer<?>... testCdcConsumerArr) throws IgniteInterruptedCheckedException {
        assertTrue(GridTestUtils.waitForCondition(sizePredicate(i, str, changeEventType, testCdcConsumerArr), getTestTimeout()));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public GridAbsPredicate sizePredicate(int i, String str, ChangeEventType changeEventType, TestCdcConsumer<?>... testCdcConsumerArr) {
        return () -> {
            return Arrays.stream(testCdcConsumerArr).mapToInt(testCdcConsumer -> {
                return F.size(testCdcConsumer.data(changeEventType, GridCacheUtils.cacheId(str)), new IgnitePredicate[0]);
            }).sum() == i;
        };
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void checkMetrics(CdcMain cdcMain, int i) throws Exception {
        DynamicMBean metricRegistry = metricRegistry(CdcMain.cdcInstanceName(((IgniteConfiguration) GridTestUtils.getFieldValue(cdcMain, "igniteCfg")).getIgniteInstanceName()), null, CdcCacheVersionTest.CDC);
        Function<String, Long> function = str -> {
            try {
                return metricRegistry.getAttribute(str);
            } catch (Exception e) {
                throw new IgniteException(e);
            }
        };
        checkMetrics(i, function, function);
        MetricRegistry metricRegistry2 = (MetricRegistry) GridTestUtils.getFieldValue(cdcMain, "mreg");
        assertNotNull(metricRegistry2);
        checkMetrics(i, str2 -> {
            return Long.valueOf(metricRegistry2.findMetric(str2).value());
        }, str3 -> {
            return (String) metricRegistry2.findMetric(str3).value();
        });
    }

    private void checkMetrics(long j, Function<String, Long> function, Function<String, String> function2) {
        assertTrue(function.apply("CommittedSegmentIndex").longValue() <= function.apply("CurrentSegmentIndex").longValue());
        assertTrue(function.apply("CommittedSegmentOffset").longValue() >= 0);
        assertTrue(function.apply("LastSegmentConsumptionTime").longValue() > 0);
        assertTrue(function.apply("LastEventTime").longValue() > 0);
        for (String str : new String[]{"BinaryMetaDir", "MarshallerDir", "CdcDir"}) {
            assertTrue(new File(function2.apply(str)).exists());
        }
        assertEquals(j, function.apply("EventsCount").longValue());
    }

    protected boolean keepBinary() {
        return false;
    }

    protected MetricExporterSpi[] metricExporters() {
        return null;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static User createUser(int i) {
        byte[] bArr = new byte[1024];
        ThreadLocalRandom.current().nextBytes(bArr);
        return new User("John Connor " + i, 42 + i, bArr);
    }
}
