package org.apache.ignite.internal.processors.cache.query.continuous;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import javax.cache.configuration.Factory;
import javax.cache.configuration.FactoryBuilder;
import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
import javax.cache.event.CacheEntryCreatedListener;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.CacheMode;
import org.apache.ignite.cache.CacheWriteSynchronizationMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest.class */
public class CacheContinuousQueryOperationP2PTest extends GridCommonAbstractTest {
    private static TcpDiscoveryIpFinder ipFinder = new TcpDiscoveryVmIpFinder(true);
    private static final int NODES = 5;
    private boolean client;

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/CacheContinuousQueryOperationP2PTest$TestLocalListener.class */
    private static abstract class TestLocalListener implements CacheEntryUpdatedListener<Integer, Integer>, CacheEntryCreatedListener<Integer, Integer> {
        private TestLocalListener() {
        }

        public void onCreated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> iterable) throws CacheEntryListenerException {
            onEvent(iterable);
        }

        public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> iterable) throws CacheEntryListenerException {
            onEvent(iterable);
        }

        protected abstract void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> iterable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.getDiscoverySpi().setIpFinder(ipFinder);
        configuration.setClientMode(this.client);
        configuration.setPeerClassLoadingEnabled(true);
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTestsStopped() throws Exception {
        stopAllGrids();
        super.afterTestsStopped();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTest() throws Exception {
        super.beforeTest();
        startGridsMultiThreaded(4);
        this.client = true;
        startGrid(4);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        super.afterTest();
        stopAllGrids();
    }

    public void testAtomicClient() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.PARTITIONED, 1, CacheAtomicityMode.ATOMIC), true);
    }

    public void testAtomic() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.PARTITIONED, 1, CacheAtomicityMode.ATOMIC), false);
    }

    public void testAtomicReplicated() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.REPLICATED, 0, CacheAtomicityMode.ATOMIC), false);
    }

    public void testAtomicReplicatedClient() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.REPLICATED, 0, CacheAtomicityMode.ATOMIC), true);
    }

    public void testTx() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.PARTITIONED, 1, CacheAtomicityMode.TRANSACTIONAL), false);
    }

    public void testTxClient() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.PARTITIONED, 1, CacheAtomicityMode.TRANSACTIONAL), true);
    }

    public void testTxReplicated() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.REPLICATED, 0, CacheAtomicityMode.TRANSACTIONAL), false);
    }

    public void testTxReplicatedClient() throws Exception {
        testContinuousQuery(cacheConfiguration(CacheMode.REPLICATED, 0, CacheAtomicityMode.TRANSACTIONAL), true);
    }

    protected void testContinuousQuery(CacheConfiguration<Object, Object> cacheConfiguration, boolean z) throws Exception {
        ignite(0).createCache(cacheConfiguration);
        ThreadLocalRandom current = ThreadLocalRandom.current();
        QueryCursor queryCursor = null;
        Class<?> loadClass = getExternalClassLoader().loadClass("org.apache.ignite.tests.p2p.CacheDeploymentEntryEventFilterFactory");
        final CountDownLatch countDownLatch = new CountDownLatch(10);
        ContinuousQuery continuousQuery = new ContinuousQuery();
        TestLocalListener testLocalListener = new TestLocalListener() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest.1
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super();
            }

            @Override // org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryOperationP2PTest.TestLocalListener
            public void onEvent(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> iterable) throws CacheEntryListenerException {
                for (CacheEntryEvent<? extends Integer, ? extends Integer> cacheEntryEvent : iterable) {
                    countDownLatch.countDown();
                    CacheContinuousQueryOperationP2PTest.this.log.info("Received event: " + cacheEntryEvent);
                }
            }
        };
        MutableCacheEntryListenerConfiguration mutableCacheEntryListenerConfiguration = new MutableCacheEntryListenerConfiguration(new FactoryBuilder.SingletonFactory(testLocalListener), (Factory) loadClass.newInstance(), true, true);
        continuousQuery.setLocalListener(testLocalListener);
        continuousQuery.setRemoteFilterFactory((Factory) loadClass.newInstance());
        IgniteCache igniteCache = null;
        try {
            igniteCache = z ? grid(4).cache(cacheConfiguration.getName()) : grid(current.nextInt(4)).cache(cacheConfiguration.getName());
            queryCursor = igniteCache.query(continuousQuery);
            igniteCache.registerCacheEntryListener(mutableCacheEntryListenerConfiguration);
            for (int i = 0; i < 10; i++) {
                igniteCache.put(Integer.valueOf(i), Integer.valueOf(i));
            }
            assertTrue(countDownLatch.await(3L, TimeUnit.SECONDS));
            if (queryCursor != null) {
                queryCursor.close();
            }
            if (igniteCache != null) {
                igniteCache.deregisterCacheEntryListener(mutableCacheEntryListenerConfiguration);
            }
        } catch (Throwable th) {
            if (queryCursor != null) {
                queryCursor.close();
            }
            if (igniteCache != null) {
                igniteCache.deregisterCacheEntryListener(mutableCacheEntryListenerConfiguration);
            }
            throw th;
        }
    }

    private CacheConfiguration<Object, Object> cacheConfiguration(CacheMode cacheMode, int i, CacheAtomicityMode cacheAtomicityMode) {
        CacheConfiguration<Object, Object> cacheConfiguration = new CacheConfiguration<>("default");
        cacheConfiguration.setAtomicityMode(cacheAtomicityMode);
        cacheConfiguration.setCacheMode(cacheMode);
        cacheConfiguration.setWriteSynchronizationMode(CacheWriteSynchronizationMode.FULL_SYNC);
        if (cacheMode == CacheMode.PARTITIONED) {
            cacheConfiguration.setBackups(i);
        }
        return cacheConfiguration;
    }
}
