package org.apache.ignite.internal.processors.continuous;

import java.lang.invoke.SerializedLambda;
import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterTopologyException;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.events.DiscoveryEvent;
import org.apache.ignite.events.Event;
import org.apache.ignite.events.EventType;
import org.apache.ignite.events.JobEvent;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.continuous.GridContinuousProcessor;
import org.apache.ignite.internal.util.GridConcurrentHashSet;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.P2;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteBiPredicate;
import org.apache.ignite.lang.IgniteBiTuple;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.junit.Ignore;
import org.junit.Test;

/* loaded from: input_file:org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest.class */
public class GridEventConsumeSelfTest extends GridCommonAbstractTest {
    private static final String PRJ_PRED_CLS_NAME = "org.apache.ignite.tests.p2p.GridEventConsumeProjectionPredicate";
    private static final String FILTER_CLS_NAME = "org.apache.ignite.tests.p2p.GridEventConsumeFilter";
    private static final int GRID_CNT = 3;
    private static final int CONSUME_CNT = 500;
    private static volatile CountDownLatch consumeLatch;
    private static volatile AtomicInteger consumeCnt;
    private boolean include;
    private boolean noAutoUnsubscribe;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setConsistentId(str);
        configuration.setIncludeEventTypes(EventType.EVTS_ALL);
        configuration.getCommunicationSpi().setSharedMemoryPort(-1);
        if (this.include) {
            configuration.setUserAttributes(F.asMap("include", true));
        }
        return configuration;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.common.GridCommonAbstractTest, org.apache.ignite.testframework.junits.GridAbstractTest
    public void beforeTest() throws Exception {
        assertTrue(true);
        this.include = true;
        startGrids(2);
        this.include = false;
        startGrid(2);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public void afterTest() throws Exception {
        try {
            assertEquals(3, grid(0).cluster().nodes().size());
            for (int i = 0; i < 3; i++) {
                GridContinuousProcessor continuous = grid(i).context().continuous();
                try {
                    if (!this.noAutoUnsubscribe) {
                        Map map = (Map) U.field(continuous, "rmtInfos");
                        assertTrue("Unexpected remote infos: " + map, map.isEmpty());
                    }
                    ((Map) U.field(continuous, "rmtInfos")).clear();
                    assertEquals(0, ((Map) U.field(continuous, "rmtInfos")).size());
                    assertEquals(0, ((Map) U.field(continuous, "startFuts")).size());
                    assertEquals(0, ((Map) U.field(continuous, "stopFuts")).size());
                    assertEquals(0, ((Map) U.field(continuous, "bufCheckThreads")).size());
                } finally {
                }
            }
        } finally {
            stopAllGrids();
        }
    }

    private Collection<GridContinuousProcessor.LocalRoutineInfo> localRoutines(GridContinuousProcessor gridContinuousProcessor) {
        return F.view(((Map) U.field(gridContinuousProcessor, "locInfos")).values(), new IgnitePredicate[]{new IgnitePredicate<GridContinuousProcessor.LocalRoutineInfo>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.1
            public boolean apply(GridContinuousProcessor.LocalRoutineInfo localRoutineInfo) {
                return localRoutineInfo.handler().isEvents();
            }
        }});
    }

    @Test
    public void testApi() throws Exception {
        try {
            grid(0).events().stopRemoteListen((UUID) null);
        } catch (NullPointerException e) {
        }
        grid(0).events().stopRemoteListen(UUID.randomUUID());
        UUID uuid = null;
        try {
            uuid = grid(0).events().remoteListen(new P2<UUID, DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.2
                public boolean apply(UUID uuid2, DiscoveryEvent discoveryEvent) {
                    return false;
                }
            }, new P1<DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.3
                public boolean apply(DiscoveryEvent discoveryEvent) {
                    return false;
                }
            }, EventType.EVTS_DISCOVERY);
            assertNotNull(uuid);
            grid(0).events().stopRemoteListen(uuid);
            try {
                uuid = grid(0).events().remoteListen(new P2<UUID, DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.4
                    public boolean apply(UUID uuid2, DiscoveryEvent discoveryEvent) {
                        return false;
                    }
                }, new P1<DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.5
                    public boolean apply(DiscoveryEvent discoveryEvent) {
                        return false;
                    }
                }, new int[0]);
                assertNotNull(uuid);
                grid(0).events().stopRemoteListen(uuid);
                try {
                    uuid = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.6
                        public boolean apply(UUID uuid2, Event event) {
                            return false;
                        }
                    }, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.7
                        public boolean apply(Event event) {
                            return false;
                        }
                    }, new int[0]);
                    assertNotNull(uuid);
                    grid(0).events().stopRemoteListen(uuid);
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testApiAsyncOld() throws Exception {
        IgniteEvents withAsync = grid(0).events().withAsync();
        try {
            withAsync.stopRemoteListen((UUID) null);
            withAsync.future().get();
        } catch (NullPointerException e) {
        }
        withAsync.stopRemoteListen(UUID.randomUUID());
        withAsync.future().get();
        UUID uuid = null;
        try {
            withAsync.remoteListen(new P2<UUID, DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.8
                public boolean apply(UUID uuid2, DiscoveryEvent discoveryEvent) {
                    return false;
                }
            }, new P1<DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.9
                public boolean apply(DiscoveryEvent discoveryEvent) {
                    return false;
                }
            }, EventType.EVTS_DISCOVERY);
            uuid = (UUID) withAsync.future().get();
            assertNotNull(uuid);
            withAsync.stopRemoteListen(uuid);
            withAsync.future().get();
            try {
                withAsync.remoteListen(new P2<UUID, DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.10
                    public boolean apply(UUID uuid2, DiscoveryEvent discoveryEvent) {
                        return false;
                    }
                }, new P1<DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.11
                    public boolean apply(DiscoveryEvent discoveryEvent) {
                        return false;
                    }
                }, new int[0]);
                uuid = (UUID) withAsync.future().get();
                assertNotNull(uuid);
                withAsync.stopRemoteListen(uuid);
                withAsync.future().get();
                try {
                    withAsync.remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.12
                        public boolean apply(UUID uuid2, Event event) {
                            return false;
                        }
                    }, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.13
                        public boolean apply(Event event) {
                            return false;
                        }
                    }, new int[0]);
                    uuid = (UUID) withAsync.future().get();
                    assertNotNull(uuid);
                    withAsync.stopRemoteListen(uuid);
                    withAsync.future().get();
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testApiAsync() throws Exception {
        IgniteEvents events = grid(0).events();
        try {
            events.stopRemoteListenAsync((UUID) null).get();
        } catch (NullPointerException e) {
        }
        events.stopRemoteListenAsync(UUID.randomUUID()).get();
        UUID uuid = null;
        try {
            uuid = (UUID) events.remoteListenAsync(new P2<UUID, DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.14
                public boolean apply(UUID uuid2, DiscoveryEvent discoveryEvent) {
                    return false;
                }
            }, new P1<DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.15
                public boolean apply(DiscoveryEvent discoveryEvent) {
                    return false;
                }
            }, EventType.EVTS_DISCOVERY).get();
            assertNotNull(uuid);
            events.stopRemoteListenAsync(uuid).get();
            try {
                uuid = (UUID) events.remoteListenAsync(new P2<UUID, DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.16
                    public boolean apply(UUID uuid2, DiscoveryEvent discoveryEvent) {
                        return false;
                    }
                }, new P1<DiscoveryEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.17
                    public boolean apply(DiscoveryEvent discoveryEvent) {
                        return false;
                    }
                }, new int[0]).get();
                assertNotNull(uuid);
                events.stopRemoteListenAsync(uuid).get();
                try {
                    uuid = (UUID) events.remoteListenAsync(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.18
                        public boolean apply(UUID uuid2, Event event) {
                            return false;
                        }
                    }, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.19
                        public boolean apply(Event event) {
                            return false;
                        }
                    }, new int[0]).get();
                    assertNotNull(uuid);
                    events.stopRemoteListenAsync(uuid).get();
                } finally {
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testAllEvents() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.20
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                if (event.type() != 44) {
                    return true;
                }
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[0]);
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(3, hashSet.size());
            assertEquals(3, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testEventsByType() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.21
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(3, hashSet.size());
            assertEquals(3, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testEventsByFilter() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.22
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.23
            public boolean apply(Event event) {
                return event.type() == 44;
            }
        }, new int[0]);
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(3, hashSet.size());
            assertEquals(3, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testEventsByTypeAndFilter() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, JobEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.24
            public boolean apply(UUID uuid, JobEvent jobEvent) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + jobEvent.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, jobEvent.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, new P1<JobEvent>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.25
            public boolean apply(JobEvent jobEvent) {
                return !"exclude".equals(jobEvent.taskName());
            }
        }, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            grid(0).compute().withName("exclude").run(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(3, hashSet.size());
            assertEquals(3, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testRemoteProjection() throws Exception {
        final ConcurrentSkipListSet concurrentSkipListSet = new ConcurrentSkipListSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        UUID remoteListen = events(grid(0).cluster().forRemotes()).remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.26
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                concurrentSkipListSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(2, concurrentSkipListSet.size());
            assertEquals(2, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testProjectionWithLocalNode() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(2);
        UUID remoteListen = events(grid(0).cluster().forAttribute("include", (Object) null)).remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.27
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(2, hashSet.size());
            assertEquals(2, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testLocalNodeOnly() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        UUID remoteListen = events(grid(0).cluster().forLocal()).remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.28
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(1, hashSet.size());
            assertEquals(1, atomicInteger.get());
            assertEquals(grid(0).localNode().id(), F.first(hashSet));
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testStopByCallback() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.29
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return false;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(1, hashSet.size());
            assertEquals(1, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testStopRemoteListen() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.30
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().run(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(1, hashSet.size());
            assertEquals(1, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
            grid(0).compute().run(F.noop());
            U.sleep(500L);
            assertEquals(1, hashSet.size());
            assertEquals(1, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testStopLocalListenByCallback() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        grid(0).events().localListen(new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.31
            public boolean apply(Event event) {
                GridEventConsumeSelfTest.this.info("Local event [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return false;
            }
        }, new int[]{44});
        compute(grid(0).cluster().forLocal()).run(F.noop());
        if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
            throw new AssertionError(countDownLatch);
        }
        assertEquals(1, atomicInteger.get());
        compute(grid(0).cluster().forLocal()).run(F.noop());
        U.sleep(500L);
        assertEquals(1, atomicInteger.get());
    }

    @Test
    public void testNodeJoin() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        UUID remoteListen = grid(0).events().remoteListen((IgniteBiPredicate) notSerializableProxy(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.32
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }), (IgnitePredicate) notSerializableProxy(new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.33
            public boolean apply(Event event) {
                return event.type() == 44;
            }
        }), new int[]{44, 45});
        try {
            assertNotNull(remoteListen);
            this.include = true;
            startGrid("anotherGridNodeJoin");
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(4, hashSet.size());
            assertEquals(4, atomicInteger.get());
            stopGrid("anotherGridNodeJoin");
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            stopGrid("anotherGridNodeJoin");
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testNodeJoinWithProjection() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        UUID remoteListen = events(grid(0).cluster().forAttribute("include", (Object) null)).remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.34
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) null, new int[]{44});
        try {
            assertNotNull(remoteListen);
            this.include = true;
            startGrid("anotherGridNodeJoinWithProjection1");
            this.include = false;
            startGrid("anotherGridNodeJoinWithProjection2");
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(3, hashSet.size());
            assertEquals(3, atomicInteger.get());
            stopGrid("anotherGridNodeJoinWithProjection1");
            stopGrid("anotherGridNodeJoinWithProjection2");
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            stopGrid("anotherGridNodeJoinWithProjection1");
            stopGrid("anotherGridNodeJoinWithProjection2");
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-585")
    public void testNodeJoinWithP2P() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(4);
        ClassLoader externalClassLoader = getExternalClassLoader();
        IgnitePredicate ignitePredicate = (IgnitePredicate) externalClassLoader.loadClass(PRJ_PRED_CLS_NAME).newInstance();
        UUID remoteListen = events(grid(0).cluster().forPredicate(ignitePredicate)).remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.35
            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, (IgnitePredicate) externalClassLoader.loadClass(FILTER_CLS_NAME).newInstance(), new int[]{44});
        try {
            assertNotNull(remoteListen);
            startGrid("anotherGridNodeJoinWithP2P");
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(4, hashSet.size());
            assertEquals(4, atomicInteger.get());
            stopGrid("anotherGridNodeJoinWithP2P");
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            stopGrid("anotherGridNodeJoinWithP2P");
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testResources() throws Exception {
        final HashSet hashSet = new HashSet();
        final AtomicInteger atomicInteger = new AtomicInteger();
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        UUID remoteListen = grid(0).events().remoteListen(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.36

            @IgniteInstanceResource
            private Ignite grid;

            public boolean apply(UUID uuid, Event event) {
                GridEventConsumeSelfTest.this.info("Event from " + uuid + " [" + event.shortDisplay() + ']');
                GridEventConsumeSelfTest.assertEquals(44, event.type());
                GridEventConsumeSelfTest.assertNotNull(this.grid);
                hashSet.add(uuid);
                atomicInteger.incrementAndGet();
                countDownLatch.countDown();
                return true;
            }
        }, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.37

            @IgniteInstanceResource
            private Ignite grid;

            public boolean apply(Event event) {
                GridEventConsumeSelfTest.assertNotNull(this.grid);
                return true;
            }
        }, new int[]{44});
        try {
            assertNotNull(remoteListen);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError(countDownLatch);
            }
            assertEquals(3, hashSet.size());
            assertEquals(3, atomicInteger.get());
            grid(0).events().stopRemoteListen(remoteListen);
        } catch (Throwable th) {
            grid(0).events().stopRemoteListen(remoteListen);
            throw th;
        }
    }

    @Test
    public void testMasterNodeLeave() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(3);
        IgniteEx startGrid = startGrid("anotherGridMasterNodeLeave");
        try {
            final UUID id = startGrid.cluster().localNode().id();
            for (int i = 0; i < 3; i++) {
                grid(i).events().localListen(new IgnitePredicate<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.38
                    public boolean apply(Event event) {
                        if (!id.equals(((DiscoveryEvent) event).eventNode().id())) {
                            return true;
                        }
                        countDownLatch.countDown();
                        return true;
                    }
                }, new int[]{11, 12});
            }
            startGrid.events().remoteListen((IgniteBiPredicate) null, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.39
                public boolean apply(Event event) {
                    return true;
                }
            }, EventType.EVTS_ALL);
            stopGrid("anotherGridMasterNodeLeave");
            if (!$assertionsDisabled && !countDownLatch.await(3000L, TimeUnit.MILLISECONDS)) {
                throw new AssertionError();
            }
        } catch (Throwable th) {
            stopGrid("anotherGridMasterNodeLeave");
            throw th;
        }
    }

    @Test
    public void testMasterNodeLeaveNoAutoUnsubscribe() throws Exception {
        IgniteEx startGrid = startGrid("anotherGridMasterNodeLeaveNoAutoUnsubscribe");
        try {
            final UUID id = startGrid.cluster().localNode().id();
            final CountDownLatch countDownLatch = new CountDownLatch(3);
            for (int i = 0; i < 3; i++) {
                grid(0).events().localListen(new IgnitePredicate<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.40
                    public boolean apply(Event event) {
                        if (!id.equals(((DiscoveryEvent) event).eventNode().id())) {
                            return true;
                        }
                        countDownLatch.countDown();
                        return true;
                    }
                }, new int[]{11});
            }
            consumeLatch = new CountDownLatch(7);
            consumeCnt = new AtomicInteger();
            this.noAutoUnsubscribe = true;
            startGrid.events().remoteListen(1, 0L, false, (IgniteBiPredicate) null, new P1<Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.41
                public boolean apply(Event event) {
                    GridEventConsumeSelfTest.consumeLatch.countDown();
                    GridEventConsumeSelfTest.consumeCnt.incrementAndGet();
                    return true;
                }
            }, new int[]{44});
            grid(0).compute().broadcast(F.noop());
            stopGrid("anotherGridMasterNodeLeaveNoAutoUnsubscribe");
            countDownLatch.await(3000L, TimeUnit.MILLISECONDS);
            grid(0).compute().broadcast(F.noop());
            if (!$assertionsDisabled && !consumeLatch.await(2L, TimeUnit.SECONDS)) {
                throw new AssertionError();
            }
            assertEquals(7, consumeCnt.get());
        } catch (Throwable th) {
            stopGrid("anotherGridMasterNodeLeaveNoAutoUnsubscribe");
            throw th;
        }
    }

    @Test
    @Ignore("https://issues.apache.org/jira/browse/IGNITE-11861")
    public void testMultithreadedWithNodeRestart() throws Exception {
        final AtomicBoolean atomicBoolean = new AtomicBoolean();
        final LinkedBlockingQueue linkedBlockingQueue = new LinkedBlockingQueue();
        final GridConcurrentHashSet gridConcurrentHashSet = new GridConcurrentHashSet();
        final GridConcurrentHashSet gridConcurrentHashSet2 = new GridConcurrentHashSet();
        final Random random = new Random();
        final int i = tcpDiscovery() ? 500 : 250;
        IgniteInternalFuture<?> multithreadedAsync = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.42
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                for (int i2 = 0; i2 < i && !atomicBoolean.get(); i2++) {
                    int nextInt = random.nextInt(3);
                    try {
                        UUID uuid = (UUID) GridEventConsumeSelfTest.this.grid(nextInt).events().remoteListenAsync(new P2<UUID, Event>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.42.1
                            public boolean apply(UUID uuid2, Event event) {
                                return true;
                            }
                        }, (IgnitePredicate) null, new int[]{44}).get(3000L);
                        gridConcurrentHashSet.add(uuid);
                        linkedBlockingQueue.add(F.t(Integer.valueOf(nextInt), uuid));
                    } catch (ClusterTopologyException e) {
                    }
                    U.sleep(10L);
                }
                atomicBoolean.set(true);
                return null;
            }
        }, 8, "consume-starter");
        multithreadedAsync.listen(igniteInternalFuture -> {
            atomicBoolean.set(true);
        });
        GridTestUtils.waitForAllFutures(multithreadedAsync, multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.43
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                while (!atomicBoolean.get()) {
                    IgniteBiTuple igniteBiTuple = (IgniteBiTuple) linkedBlockingQueue.poll(1L, TimeUnit.SECONDS);
                    if (igniteBiTuple != null) {
                        int intValue = ((Integer) igniteBiTuple.get1()).intValue();
                        UUID uuid = (UUID) igniteBiTuple.get2();
                        try {
                            GridEventConsumeSelfTest.this.grid(intValue).events().stopRemoteListenAsync(uuid).get(3000L);
                            gridConcurrentHashSet2.add(uuid);
                        } catch (ClusterTopologyException e) {
                        }
                    }
                }
                return null;
            }
        }, 4, "consume-stopper"), multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.44
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                while (!atomicBoolean.get()) {
                    GridEventConsumeSelfTest.this.startGrid("anotherGridMultithreadedWithNodeRestart");
                    GridEventConsumeSelfTest.this.stopGrid("anotherGridMultithreadedWithNodeRestart");
                }
                return null;
            }
        }, 1, "node-restarter"), multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.continuous.GridEventConsumeSelfTest.45
            @Override // java.util.concurrent.Callable
            public Object call() throws Exception {
                while (!atomicBoolean.get()) {
                    try {
                        GridEventConsumeSelfTest.this.grid(random.nextInt(3)).compute().runAsync(F.noop()).get(3000L);
                    } catch (IgniteException e) {
                    }
                }
                return null;
            }
        }, 1, "job-runner"));
        while (true) {
            IgniteBiTuple igniteBiTuple = (IgniteBiTuple) linkedBlockingQueue.poll();
            if (igniteBiTuple == null) {
                Collection lose = F.lose(gridConcurrentHashSet, true, gridConcurrentHashSet2);
                assertEquals("Not stopped IDs: " + lose, 0, lose.size());
                return;
            } else {
                int intValue = ((Integer) igniteBiTuple.get1()).intValue();
                UUID uuid = (UUID) igniteBiTuple.get2();
                grid(intValue).events().stopRemoteListenAsync(uuid).get(3000L);
                gridConcurrentHashSet2.add(uuid);
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -486394564:
                if (implMethodName.equals("lambda$testMultithreadedWithNodeRestart$7c54b6ff$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/ignite/lang/IgniteInClosure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("org/apache/ignite/internal/processors/continuous/GridEventConsumeSelfTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicBoolean;Lorg/apache/ignite/internal/IgniteInternalFuture;)V")) {
                    AtomicBoolean atomicBoolean = (AtomicBoolean) serializedLambda.getCapturedArg(0);
                    return igniteInternalFuture -> {
                        atomicBoolean.set(true);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        $assertionsDisabled = !GridEventConsumeSelfTest.class.desiredAssertionStatus();
    }
}
