package org.apache.ignite.internal.processors.streamer;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.ignite.IgniteStreamer;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.cache.GridCacheReloadSelfTest;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.G;
import org.apache.ignite.internal.util.typedef.P1;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.streamer.StreamerConfiguration;
import org.apache.ignite.streamer.StreamerContext;
import org.apache.ignite.streamer.StreamerEventRouterAdapter;
import org.apache.ignite.streamer.StreamerFailureListener;
import org.apache.ignite.streamer.StreamerStage;
import org.apache.ignite.streamer.window.StreamerBoundedSizeWindow;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jdk8.backport.ThreadLocalRandom8;

/* loaded from: input_file:org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest.class */
public class GridStreamerFailoverSelfTest extends GridCommonAbstractTest {
    private static final TcpDiscoveryIpFinder IP_FINDER = new TcpDiscoveryVmIpFinder(true);
    private TestRandomRouter router;
    private int maxConcurrentSess;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/ignite/internal/processors/streamer/GridStreamerFailoverSelfTest$TestRandomRouter.class */
    public static class TestRandomRouter extends StreamerEventRouterAdapter {
        private UUID srcNodeId;
        private UUID destNodeId;

        private TestRandomRouter() {
        }

        public <T> ClusterNode route(StreamerContext streamerContext, String str, T t) {
            ClusterNode clusterNode;
            int i;
            if ("put".equals(str)) {
                return streamerContext.projection().node(this.destNodeId);
            }
            Collection nodes = streamerContext.projection().forPredicate(new P1<ClusterNode>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerFailoverSelfTest.TestRandomRouter.1
                public boolean apply(ClusterNode clusterNode2) {
                    return (TestRandomRouter.this.srcNodeId.equals(clusterNode2.id()) || TestRandomRouter.this.destNodeId.equals(clusterNode2.id())) ? false : true;
                }
            }).nodes();
            int nextInt = ThreadLocalRandom8.current().nextInt(nodes.size());
            int i2 = 0;
            Iterator it = nodes.iterator();
            do {
                if (!it.hasNext()) {
                    it = nodes.iterator();
                }
                clusterNode = (ClusterNode) it.next();
                i = i2;
                i2++;
            } while (nextInt != i);
            return clusterNode;
        }

        public void sourceNodeId(UUID uuid) {
            this.srcNodeId = uuid;
        }

        public void destinationNodeId(UUID uuid) {
            this.destNodeId = uuid;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setStreamerConfiguration(new StreamerConfiguration[]{streamerConfiguration()});
        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
        tcpDiscoverySpi.setIpFinder(IP_FINDER);
        configuration.setDiscoverySpi(tcpDiscoverySpi);
        configuration.setPeerClassLoadingEnabled(false);
        return configuration;
    }

    private StreamerConfiguration streamerConfiguration() {
        StreamerConfiguration streamerConfiguration = new StreamerConfiguration();
        streamerConfiguration.setAtLeastOnce(true);
        streamerConfiguration.setRouter(this.router);
        StreamerBoundedSizeWindow streamerBoundedSizeWindow = new StreamerBoundedSizeWindow();
        streamerBoundedSizeWindow.setMaximumSize(100);
        streamerConfiguration.setWindows(F.asList(streamerBoundedSizeWindow));
        streamerConfiguration.setMaximumConcurrentSessions(this.maxConcurrentSess);
        streamerConfiguration.setStages(F.asList(new StreamerStage[]{new GridTestStage("pass", new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerFailoverSelfTest.1
            static final /* synthetic */ boolean $assertionsDisabled;

            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                if ($assertionsDisabled || streamerContext.nextStageName() != null) {
                    return F.asMap(streamerContext.nextStageName(), collection);
                }
                throw new AssertionError();
            }

            static {
                $assertionsDisabled = !GridStreamerFailoverSelfTest.class.desiredAssertionStatus();
            }
        }), new GridTestStage("put", new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerFailoverSelfTest.2
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                ConcurrentMap localSpace = streamerContext.localSpace();
                for (Object obj : collection) {
                    AtomicInteger atomicInteger = (AtomicInteger) localSpace.get(obj);
                    if (atomicInteger == null) {
                        atomicInteger = (AtomicInteger) F.addIfAbsent(localSpace, obj, new AtomicInteger());
                    }
                    atomicInteger.incrementAndGet();
                }
                return null;
            }
        })}));
        return streamerConfiguration;
    }

    public void testEventFailover() throws Exception {
        checkEventFailover(GridCacheReloadSelfTest.MAX_CACHE_ENTRIES);
    }

    private void checkEventFailover(int i) throws Exception {
        this.router = new TestRandomRouter();
        this.maxConcurrentSess = i;
        startGrids(6);
        try {
            this.router.sourceNodeId(grid(0).localNode().id());
            this.router.destinationNodeId(grid(5).localNode().id());
            final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
            IgniteInternalFuture<?> multithreadedAsync = multithreadedAsync(new Callable<Object>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerFailoverSelfTest.3
                @Override // java.util.concurrent.Callable
                public Object call() throws Exception {
                    Random random = new Random();
                    while (!atomicBoolean.get()) {
                        int nextInt = random.nextInt(4) + 1;
                        GridStreamerFailoverSelfTest.this.info(">>>>> Stopping grid " + GridStreamerFailoverSelfTest.this.grid(nextInt).localNode().id());
                        GridStreamerFailoverSelfTest.this.stopGrid(nextInt, true);
                        U.sleep(1000L);
                        GridStreamerFailoverSelfTest.this.startGrid(nextInt);
                        GridStreamerFailoverSelfTest.this.info(">>>>>> Started grid " + GridStreamerFailoverSelfTest.this.grid(nextInt).localNode().id());
                        U.sleep(500L);
                    }
                    return null;
                }
            }, 1);
            final ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            IgniteStreamer streamer = grid(0).streamer((String) null);
            streamer.addStreamerFailureListener(new StreamerFailureListener() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerFailoverSelfTest.4
                public void onFailure(String str, Collection<Object> collection, Throwable th) {
                    GridStreamerFailoverSelfTest.this.info("Unable to failover events [stageName=" + str + ", err=" + th + ']');
                    concurrentLinkedQueue.addAll(collection);
                }
            });
            for (int i2 = 0; i2 < 300000; i2++) {
                if (i2 > 0 && i2 % 10000 == 0) {
                    info("Processed: " + i2);
                }
                streamer.addEvent(Integer.valueOf(i2), new Object[0]);
            }
            atomicBoolean.set(true);
            multithreadedAsync.get();
            G.stop(getTestGridName(0), false);
            ConcurrentMap localSpace = grid(5).streamer((String) null).context().localSpace();
            for (int i3 = 0; i3 < 300000; i3++) {
                AtomicInteger atomicInteger = (AtomicInteger) localSpace.get(Integer.valueOf(i3));
                if (atomicInteger == null) {
                    assertTrue("Missing counter for key both in result map and in failover failed map: " + i3, concurrentLinkedQueue.contains(Integer.valueOf(i3)));
                } else {
                    assertTrue(atomicInteger.get() > 0);
                }
            }
        } finally {
            stopAllGrids();
        }
    }
}
