package org.apache.ignite.internal;

import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.cache.event.CacheEntryEvent;
import javax.cache.event.CacheEntryUpdatedListener;
import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.query.ContinuousQuery;
import org.apache.ignite.cache.query.QueryCursor;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.events.Event;
import org.apache.ignite.internal.managers.discovery.IgniteDiscoverySpi;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.lang.IgniteRunnable;
import org.apache.ignite.resources.IgniteInstanceResource;

/* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest.class */
public class IgniteClientReconnectContinuousProcessorTest extends IgniteClientReconnectAbstractTest {
    private static volatile CountDownLatch latch;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest$CacheEventListener.class */
    public static class CacheEventListener implements CacheEntryUpdatedListener<Object, Object> {
        private volatile CountDownLatch latch;

        @IgniteInstanceResource
        private Ignite ignite;

        private CacheEventListener() {
        }

        public void onUpdated(Iterable<CacheEntryEvent<?, ?>> iterable) {
            int i = 0;
            Iterator<CacheEntryEvent<?, ?>> it = iterable.iterator();
            while (it.hasNext()) {
                this.ignite.log().info("Received cache event: " + it.next());
                i++;
            }
            TestCase.assertEquals(1, i);
            if (this.latch != null) {
                this.latch.countDown();
            }
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest$DummyJob.class */
    static class DummyJob implements IgniteRunnable {

        @IgniteInstanceResource
        private Ignite ignite;

        DummyJob() {
        }

        public void run() {
            this.ignite.log().info("Job run.");
        }
    }

    /* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest$EventListener.class */
    private static class EventListener implements P2<UUID, Event> {
        private volatile CountDownLatch latch;

        @IgniteInstanceResource
        private Ignite ignite;

        private EventListener() {
        }

        public boolean apply(UUID uuid, Event event) {
            TestCase.assertTrue(this.ignite.cluster().localNode().isClient());
            this.ignite.log().info("Received event: " + event);
            if (this.latch == null) {
                return true;
            }
            this.latch.countDown();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest$MessageListener.class */
    public static class MessageListener implements P2<UUID, Object> {
        private volatile CountDownLatch latch;

        @IgniteInstanceResource
        private Ignite ignite;

        private MessageListener() {
        }

        public boolean apply(UUID uuid, Object obj) {
            TestCase.assertTrue(this.ignite.cluster().localNode().isClient());
            this.ignite.log().info("Local listener received message: " + obj);
            if (this.latch == null) {
                return true;
            }
            this.latch.countDown();
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/IgniteClientReconnectContinuousProcessorTest$RemoteMessageListener.class */
    public static class RemoteMessageListener implements P2<UUID, Object> {

        @IgniteInstanceResource
        private Ignite ignite;

        private RemoteMessageListener() {
        }

        public boolean apply(UUID uuid, Object obj) {
            this.ignite.log().info("Remote listener received message: " + obj);
            if (IgniteClientReconnectContinuousProcessorTest.latch == null) {
                return true;
            }
            IgniteClientReconnectContinuousProcessorTest.latch.countDown();
            return true;
        }
    }

    @Override // org.apache.ignite.internal.IgniteClientReconnectAbstractTest
    protected int serverCount() {
        return 3;
    }

    @Override // org.apache.ignite.internal.IgniteClientReconnectAbstractTest
    protected int clientCount() {
        return 1;
    }

    public void testEventListenerReconnect() throws Exception {
        IgniteEx grid = grid(serverCount());
        assertTrue(grid.cluster().localNode().isClient());
        Ignite ignite = ignite(0);
        IgniteDiscoverySpi spi0 = spi0(ignite);
        EventListener eventListener = new EventListener();
        UUID remoteListen = grid.events().remoteListen(eventListener, (IgnitePredicate) null, new int[]{44});
        eventListener.latch = new CountDownLatch(1);
        this.log.info("Created remote listener: " + remoteListen);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        grid.events().localListen(new IgnitePredicate<Event>() { // from class: org.apache.ignite.internal.IgniteClientReconnectContinuousProcessorTest.1
            public boolean apply(Event event) {
                if (event.type() != 17) {
                    return true;
                }
                IgniteClientReconnectContinuousProcessorTest.this.info("Reconnected: " + event);
                countDownLatch.countDown();
                return true;
            }
        }, new int[]{17});
        spi0.failNode(grid.cluster().localNode().id(), (String) null);
        waitReconnectEvent(countDownLatch);
        grid.compute().run(new DummyJob());
        assertTrue(eventListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        eventListener.latch = new CountDownLatch(1);
        ignite.compute().run(new DummyJob());
        assertTrue(eventListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        eventListener.latch = new CountDownLatch(1);
        this.log.info("Stop listen, should not get events anymore.");
        grid.events().stopRemoteListen(remoteListen);
        assertFalse(eventListener.latch.await(3000L, TimeUnit.MILLISECONDS));
    }

    public void testMessageListenerReconnectAndStopFromServer() throws Exception {
        testMessageListenerReconnect(false);
    }

    public void testMessageListenerReconnectAndStopFromClient() throws Exception {
        testMessageListenerReconnect(true);
    }

    private void testMessageListenerReconnect(boolean z) throws Exception {
        Ignite grid = grid(serverCount());
        assertTrue(grid.cluster().localNode().isClient());
        Ignite ignite = ignite(0);
        IgniteDiscoverySpi spi0 = spi0(ignite);
        MessageListener messageListener = new MessageListener();
        UUID remoteListen = grid.message().remoteListen("testTopic", new RemoteMessageListener());
        grid.message().localListen("testTopic", messageListener);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        grid.events().localListen(new IgnitePredicate<Event>() { // from class: org.apache.ignite.internal.IgniteClientReconnectContinuousProcessorTest.2
            public boolean apply(Event event) {
                if (event.type() != 17) {
                    return true;
                }
                IgniteClientReconnectContinuousProcessorTest.this.info("Reconnected: " + event);
                countDownLatch.countDown();
                return true;
            }
        }, new int[]{17});
        spi0.failNode(grid.cluster().localNode().id(), (String) null);
        waitReconnectEvent(countDownLatch);
        messageListener.latch = new CountDownLatch(1);
        latch = new CountDownLatch(2);
        grid.message().send("testTopic", "msg1");
        assertTrue(messageListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        assertTrue(latch.await(5000L, TimeUnit.MILLISECONDS));
        messageListener.latch = new CountDownLatch(1);
        latch = new CountDownLatch(2);
        ignite.message().send("testTopic", "msg2");
        assertTrue(messageListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        assertTrue(latch.await(5000L, TimeUnit.MILLISECONDS));
        Ignite ignite2 = z ? grid : ignite;
        this.log.info("Stop listen, should not get remote messages anymore [from=" + ignite2.name() + ']');
        ignite2.message().stopRemoteListen(remoteListen);
        ignite.message().send("testTopic", "msg3");
        messageListener.latch = new CountDownLatch(1);
        latch = new CountDownLatch(1);
        assertTrue(messageListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        assertFalse(latch.await(3000L, TimeUnit.MILLISECONDS));
        this.log.info("New nodes should not register stopped listeners.");
        startGrid(serverCount() + 1);
        ignite.message().send("testTopic", "msg4");
        messageListener.latch = new CountDownLatch(1);
        latch = new CountDownLatch(1);
        assertTrue(messageListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        assertFalse(latch.await(3000L, TimeUnit.MILLISECONDS));
        stopGrid(serverCount() + 1);
    }

    public void testCacheContinuousQueryReconnect() throws Exception {
        IgniteEx grid = grid(serverCount());
        assertTrue(grid.cluster().localNode().isClient());
        IgniteCache<Object, Object> orCreateCache = grid.getOrCreateCache(new CacheConfiguration("default"));
        CacheEventListener cacheEventListener = new CacheEventListener();
        ContinuousQuery continuousQuery = new ContinuousQuery();
        continuousQuery.setAutoUnsubscribe(true);
        continuousQuery.setLocalListener(cacheEventListener);
        QueryCursor query = orCreateCache.query(continuousQuery);
        for (int i = 0; i < 5; i++) {
            this.log.info("Iteration: " + i);
            continuousQueryReconnect(grid, orCreateCache, cacheEventListener);
        }
        this.log.info("Close cursor, should not get cache events anymore.");
        query.close();
        cacheEventListener.latch = new CountDownLatch(1);
        orCreateCache.put(3, 3);
        assertFalse(cacheEventListener.latch.await(3000L, TimeUnit.MILLISECONDS));
    }

    public void testCacheContinuousQueryReconnectNewServer() throws Exception {
        IgniteEx grid = grid(serverCount());
        assertTrue(grid.cluster().localNode().isClient());
        IgniteCache<Object, Object> orCreateCache = grid.getOrCreateCache(new CacheConfiguration("default"));
        CacheEventListener cacheEventListener = new CacheEventListener();
        ContinuousQuery continuousQuery = new ContinuousQuery();
        continuousQuery.setAutoUnsubscribe(true);
        continuousQuery.setLocalListener(cacheEventListener);
        QueryCursor query = orCreateCache.query(continuousQuery);
        continuousQueryReconnect(grid, orCreateCache, cacheEventListener);
        IgniteEx startGrid = startGrid(serverCount() + 1);
        Throwable th = null;
        try {
            try {
                awaitPartitionMapExchange();
                cacheEventListener.latch = new CountDownLatch(10);
                IgniteCache<?, ?> cache = startGrid.cache("default");
                for (Integer num : primaryKeys(cache, 10)) {
                    cache.put(num, num);
                }
                assertTrue(cacheEventListener.latch.await(5000L, TimeUnit.MILLISECONDS));
                if (startGrid != null) {
                    if (0 != 0) {
                        try {
                            startGrid.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        startGrid.close();
                    }
                }
                query.close();
                IgniteEx startGrid2 = startGrid(serverCount() + 1);
                Throwable th3 = null;
                try {
                    awaitPartitionMapExchange();
                    cacheEventListener.latch = new CountDownLatch(5);
                    IgniteCache<?, ?> cache2 = startGrid2.cache("default");
                    for (Integer num2 : primaryKeys(cache2, 5)) {
                        cache2.put(num2, num2);
                    }
                    assertFalse(cacheEventListener.latch.await(3000L, TimeUnit.MILLISECONDS));
                    if (startGrid2 != null) {
                        if (0 == 0) {
                            startGrid2.close();
                            return;
                        }
                        try {
                            startGrid2.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                } catch (Throwable th5) {
                    if (startGrid2 != null) {
                        if (0 != 0) {
                            try {
                                startGrid2.close();
                            } catch (Throwable th6) {
                                th3.addSuppressed(th6);
                            }
                        } else {
                            startGrid2.close();
                        }
                    }
                    throw th5;
                }
            } catch (Throwable th7) {
                th = th7;
                throw th7;
            }
        } catch (Throwable th8) {
            if (startGrid != null) {
                if (th != null) {
                    try {
                        startGrid.close();
                    } catch (Throwable th9) {
                        th.addSuppressed(th9);
                    }
                } else {
                    startGrid.close();
                }
            }
            throw th8;
        }
    }

    private void continuousQueryReconnect(Ignite ignite, IgniteCache<Object, Object> igniteCache, CacheEventListener cacheEventListener) throws Exception {
        Ignite ignite2 = ignite(0);
        IgniteDiscoverySpi spi0 = spi0(ignite2);
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        IgnitePredicate<Event> ignitePredicate = new IgnitePredicate<Event>() { // from class: org.apache.ignite.internal.IgniteClientReconnectContinuousProcessorTest.3
            public boolean apply(Event event) {
                if (event.type() != 17) {
                    return true;
                }
                IgniteClientReconnectContinuousProcessorTest.this.info("Reconnected: " + event);
                countDownLatch.countDown();
                return true;
            }
        };
        ignite.events().localListen(ignitePredicate, new int[]{17});
        spi0.failNode(ignite.cluster().localNode().id(), (String) null);
        waitReconnectEvent(countDownLatch);
        ignite.events().stopLocalListen(ignitePredicate, new int[0]);
        cacheEventListener.latch = new CountDownLatch(1);
        igniteCache.put(1, 1);
        assertTrue(cacheEventListener.latch.await(5000L, TimeUnit.MILLISECONDS));
        cacheEventListener.latch = new CountDownLatch(1);
        ignite2.cache("default").put(2, 2);
        assertTrue(cacheEventListener.latch.await(5000L, TimeUnit.MILLISECONDS));
    }
}
