package org.apache.ignite.internal.processors.cache.query.continuous;

import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.cache.event.CacheEntryListenerException;
import javax.cache.event.CacheEntryUpdatedListener;
import junit.framework.TestCase;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.CacheAtomicityMode;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.configuration.NearCacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.util.nio.GridNioServer;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;

/* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest.class */
public class ClientReconnectContinuousQueryTest extends GridCommonAbstractTest {
    private static final int CLIENT_IDX = 1;
    private static final int PUTS_BEFORE_RECONNECT = 50;
    private static final int PUTS_AFTER_RECONNECT = 50;
    private static final CountDownLatch reconLatch = new CountDownLatch(1);
    private static final CountDownLatch disconLatch = new CountDownLatch(1);
    private static final CountDownLatch updaterReceived = new CountDownLatch(50);
    private static final CountDownLatch receiverAfterReconnect = new CountDownLatch(50);

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest$CQListener.class */
    private static class CQListener implements CacheEntryUpdatedListener {
        private CQListener() {
        }

        public void onUpdated(Iterable iterable) throws CacheEntryListenerException {
            if (ClientReconnectContinuousQueryTest.reconLatch.getCount() != 0) {
                for (Object obj : iterable) {
                    ClientReconnectContinuousQueryTest.updaterReceived.countDown();
                }
                return;
            }
            for (Object obj2 : iterable) {
                ClientReconnectContinuousQueryTest.receiverAfterReconnect.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest$DisconnectListener.class */
    private static class DisconnectListener implements IgnitePredicate<Event> {
        private DisconnectListener() {
        }

        public boolean apply(Event event) {
            ClientReconnectContinuousQueryTest.disconLatch.countDown();
            return false;
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/processors/cache/query/continuous/ClientReconnectContinuousQueryTest$ReconnectListener.class */
    private static class ReconnectListener implements IgnitePredicate<Event> {
        private ReconnectListener() {
        }

        public boolean apply(Event event) {
            ClientReconnectContinuousQueryTest.reconLatch.countDown();
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        TcpCommunicationSpi communicationSpi = configuration.getCommunicationSpi();
        communicationSpi.setSlowClientQueueLimit(50);
        communicationSpi.setIdleConnectionTimeout(GridTestUtils.DFLT_TEST_TIMEOUT);
        if (getTestIgniteInstanceName(1).equals(str)) {
            configuration.setClientMode(true);
        } else {
            CacheConfiguration defaultCacheConfiguration = defaultCacheConfiguration();
            defaultCacheConfiguration.setAtomicityMode(atomicityMode());
            if (atomicityMode() == CacheAtomicityMode.TRANSACTIONAL_SNAPSHOT) {
                defaultCacheConfiguration.setNearConfiguration((NearCacheConfiguration) null);
            }
            configuration.setCacheConfiguration(new CacheConfiguration[]{defaultCacheConfiguration});
        }
        return configuration;
    }

    protected CacheAtomicityMode atomicityMode() {
        return CacheAtomicityMode.TRANSACTIONAL;
    }

    public void testClientReconnect() throws Exception {
        try {
            startGrids(2);
            final IgniteEx grid = grid(1);
            grid.events().localListen(new DisconnectListener(), new int[]{16});
            grid.events().localListen(new ReconnectListener(), new int[]{17});
            IgniteCache cache = grid.cache("default");
            ContinuousQuery continuousQuery = new ContinuousQuery();
            continuousQuery.setLocalListener(new CQListener());
            cache.query(continuousQuery);
            putSomeKeys(50);
            info("updaterReceived Count: " + updaterReceived.getCount());
            assertTrue(updaterReceived.await(10000L, TimeUnit.MILLISECONDS));
            skipRead(grid, true);
            IgniteInternalFuture runAsync = GridTestUtils.runAsync(new Callable<Void>() { // from class: org.apache.ignite.internal.processors.cache.query.continuous.ClientReconnectContinuousQueryTest.1
                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.concurrent.Callable
                public Void call() throws Exception {
                    TestCase.assertTrue(ClientReconnectContinuousQueryTest.disconLatch.await(10000L, TimeUnit.MILLISECONDS));
                    ClientReconnectContinuousQueryTest.this.skipRead(grid, false);
                    return null;
                }
            });
            putSomeKeys(1000);
            runAsync.get();
            assertTrue(reconLatch.await(10000L, TimeUnit.MILLISECONDS));
            putSomeKeys(50);
            info("receiverAfterReconnect Count: " + receiverAfterReconnect.getCount());
            assertTrue(receiverAfterReconnect.await(10000L, TimeUnit.MILLISECONDS));
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    private void putSomeKeys(int i) {
        IgniteCache cache = grid(0).cache("default");
        for (int i2 = 0; i2 < i; i2++) {
            cache.put(0, Integer.valueOf(i2));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void skipRead(IgniteEx igniteEx, boolean z) {
        GridTestUtils.setFieldValue((GridNioServer) U.field((TcpCommunicationSpi) ((Object[]) U.field(igniteEx.context().io(), "spis"))[0], "nioSrvr"), "skipRead", Boolean.valueOf(z));
    }
}
