package org.apache.ignite.internal.processors.streamer;

import java.net.URL;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteLogger;
import org.apache.ignite.IgniteStreamer;
import org.apache.ignite.cluster.ClusterNode;
import org.apache.ignite.configuration.DeploymentMode;
import org.apache.ignite.configuration.IgniteConfiguration;
import org.apache.ignite.internal.IgniteEx;
import org.apache.ignite.internal.IgniteInterruptedCheckedException;
import org.apache.ignite.internal.util.typedef.C1;
import org.apache.ignite.internal.util.typedef.CA;
import org.apache.ignite.internal.util.typedef.CI1;
import org.apache.ignite.internal.util.typedef.F;
import org.apache.ignite.internal.util.typedef.internal.U;
import org.apache.ignite.lang.IgniteClosure;
import org.apache.ignite.lang.IgnitePredicate;
import org.apache.ignite.marshaller.optimized.OptimizedMarshaller;
import org.apache.ignite.resources.IgniteInstanceResource;
import org.apache.ignite.resources.LoggerResource;
import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
import org.apache.ignite.spi.discovery.tcp.ipfinder.TcpDiscoveryIpFinder;
import org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder;
import org.apache.ignite.streamer.StreamerConfiguration;
import org.apache.ignite.streamer.StreamerContext;
import org.apache.ignite.streamer.StreamerEventRouter;
import org.apache.ignite.streamer.StreamerFailureListener;
import org.apache.ignite.streamer.StreamerMetrics;
import org.apache.ignite.streamer.StreamerStage;
import org.apache.ignite.streamer.StreamerStageMetrics;
import org.apache.ignite.streamer.router.StreamerRandomEventRouter;
import org.apache.ignite.streamer.window.StreamerUnboundedWindow;
import org.apache.ignite.testframework.GridTestExternalClassLoader;
import org.apache.ignite.testframework.GridTestUtils;
import org.apache.ignite.testframework.config.GridTestProperties;
import org.apache.ignite.testframework.junits.common.GridCommonAbstractTest;
import org.jetbrains.annotations.Nullable;

/* loaded from: input_file:org/apache/ignite/internal/processors/streamer/GridStreamerSelfTest.class */
public class GridStreamerSelfTest extends GridCommonAbstractTest {
    private static final TcpDiscoveryIpFinder IP_FINDER;
    private boolean atLeastOnce = true;
    private Collection<StreamerStage> stages;
    private StreamerEventRouter router;
    private boolean p2pEnabled;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.ignite.testframework.junits.GridAbstractTest
    public IgniteConfiguration getConfiguration(String str) throws Exception {
        IgniteConfiguration configuration = super.getConfiguration(str);
        configuration.setStreamerConfiguration(new StreamerConfiguration[]{streamerConfiguration()});
        TcpDiscoverySpi tcpDiscoverySpi = new TcpDiscoverySpi();
        tcpDiscoverySpi.setIpFinder(IP_FINDER);
        configuration.setDiscoverySpi(tcpDiscoverySpi);
        configuration.setPeerClassLoadingEnabled(this.p2pEnabled);
        if (this.p2pEnabled) {
            configuration.setDeploymentMode(DeploymentMode.SHARED);
        }
        configuration.setMarshaller(new OptimizedMarshaller(false));
        return configuration;
    }

    private StreamerConfiguration streamerConfiguration() {
        StreamerConfiguration streamerConfiguration = new StreamerConfiguration();
        streamerConfiguration.setAtLeastOnce(this.atLeastOnce);
        streamerConfiguration.setRouter(this.router);
        streamerConfiguration.setWindows(F.asList(new StreamerUnboundedWindow()));
        streamerConfiguration.setStages(this.stages);
        return streamerConfiguration;
    }

    public void testInjections() throws Exception {
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        this.stages = F.asList(new StreamerStage() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.1

            @IgniteInstanceResource
            private Ignite g;

            @LoggerResource
            private IgniteLogger log;
            static final /* synthetic */ boolean $assertionsDisabled;

            public String name() {
                return "name";
            }

            @Nullable
            public Map<String, Collection<?>> run(StreamerContext streamerContext, Collection collection) {
                if (!$assertionsDisabled && this.g == null) {
                    throw new AssertionError();
                }
                if (!$assertionsDisabled && this.log == null) {
                    throw new AssertionError();
                }
                this.log.info("Processing events: " + collection);
                countDownLatch.countDown();
                return null;
            }

            static {
                $assertionsDisabled = !GridStreamerSelfTest.class.desiredAssertionStatus();
            }
        });
        try {
            IgniteStreamer streamer = startGrid(0).streamer((String) null);
            for (int i = 0; i < 100; i++) {
                streamer.addEvent("event1", new Object[0]);
            }
            if (!$assertionsDisabled && !countDownLatch.await(10L, TimeUnit.SECONDS)) {
                throw new AssertionError();
            }
        } finally {
            stopAllGrids();
        }
    }

    public void testStreamerMetrics() throws Exception {
        this.atLeastOnce = true;
        this.p2pEnabled = false;
        this.router = new GridTestStreamerEventRouter();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.2
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) throws IgniteCheckedException {
                String nextStageName = streamerContext.nextStageName();
                U.sleep(50L);
                if (nextStageName != null) {
                    return F.asMap(nextStageName, collection);
                }
                countDownLatch.countDown();
                return null;
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("a", sc), new GridTestStage("b", sc), new GridTestStage("c", sc)});
        startGrids(4);
        try {
            final IgniteEx grid = grid(0);
            IgniteEx grid2 = grid(1);
            IgniteEx grid3 = grid(2);
            IgniteEx grid4 = grid(3);
            System.out.println("Grid 0: " + grid.cluster().localNode().id());
            System.out.println("Grid 1: " + grid2.cluster().localNode().id());
            System.out.println("Grid 2: " + grid3.cluster().localNode().id());
            System.out.println("Grid 3: " + grid4.cluster().localNode().id());
            GridTestStreamerEventRouter gridTestStreamerEventRouter = this.router;
            gridTestStreamerEventRouter.put("a", grid2.cluster().localNode().id());
            gridTestStreamerEventRouter.put("b", grid3.cluster().localNode().id());
            gridTestStreamerEventRouter.put("c", grid4.cluster().localNode().id());
            IgniteStreamer streamer = grid.streamer((String) null);
            for (int i = 0; i < 100; i++) {
                streamer.addEvent("event1", new Object[0]);
            }
            countDownLatch.await();
            checkZeroMetrics(grid, "a", "b", "c");
            checkZeroMetrics(grid2, "b", "c");
            checkZeroMetrics(grid3, "a", "c");
            checkZeroMetrics(grid4, "a", "b");
            checkMetrics(grid2, "a", 100, false);
            checkMetrics(grid3, "b", 100, false);
            checkMetrics(grid4, "c", 100, true);
            GridTestUtils.retryAssert(this.log, 100, 50L, new CA() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.3
                public void apply() {
                    TestCase.assertEquals(0, grid.streamer((String) null).metrics().currentActiveSessions());
                }
            });
            assertTrue(grid.streamer((String) null).metrics().maximumActiveSessions() > 0);
            grid.streamer((String) null).context().query(new IgniteClosure<StreamerContext, Object>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.4
                public Object apply(StreamerContext streamerContext) {
                    try {
                        U.sleep(1000L);
                        return null;
                    } catch (IgniteInterruptedCheckedException e) {
                        return null;
                    }
                }
            });
            StreamerMetrics metrics = grid.streamer((String) null).metrics();
            if (!$assertionsDisabled && metrics.queryMaximumExecutionNodes() != 4) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && metrics.queryMinimumExecutionNodes() != 4) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && metrics.queryAverageExecutionNodes() != 4) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && metrics.queryMaximumExecutionTime() <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && metrics.queryMinimumExecutionTime() <= 0) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && metrics.queryAverageExecutionTime() <= 0) {
                throw new AssertionError();
            }
        } finally {
            stopAllGrids();
        }
    }

    public void testContextNextStage() throws Exception {
        this.atLeastOnce = true;
        this.router = new GridTestStreamerEventRouter();
        this.p2pEnabled = false;
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.5
            static final /* synthetic */ boolean $assertionsDisabled;

            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                if (streamerContext.nextStageName() == null) {
                    countDownLatch.countDown();
                    return null;
                }
                if (!$assertionsDisabled && collection.size() != 1) {
                    throw new AssertionError();
                }
                Integer valueOf = Integer.valueOf(((Integer) F.first(collection)).intValue() + 1);
                if (!String.valueOf(valueOf).equals(streamerContext.nextStageName())) {
                    atomicReference.compareAndSet(null, new IgniteCheckedException("Stage name comparison failed [exp=" + valueOf + ", actual=" + streamerContext.nextStageName() + ']'));
                }
                return F.asMap(streamerContext.nextStageName(), F.asList(valueOf));
            }

            static {
                $assertionsDisabled = !GridStreamerSelfTest.class.desiredAssertionStatus();
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("0", sc), new GridTestStage("1", sc), new GridTestStage("2", sc), new GridTestStage("3", sc), new GridTestStage("4", sc)});
        startGrids(4);
        try {
            GridTestStreamerEventRouter gridTestStreamerEventRouter = this.router;
            gridTestStreamerEventRouter.put("0", grid(1).localNode().id());
            gridTestStreamerEventRouter.put("1", grid(2).localNode().id());
            gridTestStreamerEventRouter.put("2", grid(3).localNode().id());
            gridTestStreamerEventRouter.put("3", grid(0).localNode().id());
            gridTestStreamerEventRouter.put("4", grid(1).localNode().id());
            grid(0).streamer((String) null).addEvent(0, new Object[0]);
            countDownLatch.await();
            if (atomicReference.get() != null) {
                throw ((IgniteCheckedException) atomicReference.get());
            }
        } finally {
            stopAllGrids(false);
        }
    }

    public void testAddEventWithNullStageName() throws Exception {
        this.atLeastOnce = true;
        this.router = new GridTestStreamerEventRouter();
        this.p2pEnabled = false;
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.6
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                if (streamerContext.nextStageName() == null) {
                    return null;
                }
                return F.asMap(streamerContext.nextStageName(), F.asList(Integer.valueOf(((Integer) F.first(collection)).intValue() + 1)));
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("0", sc), new GridTestStage("1", sc)});
        startGrids(2);
        try {
            GridTestStreamerEventRouter gridTestStreamerEventRouter = this.router;
            gridTestStreamerEventRouter.put("0", grid(0).localNode().id());
            gridTestStreamerEventRouter.put("1", grid(1).localNode().id());
            try {
                grid(0).streamer((String) null).addEventToStage((String) null, 0, new Object[0]);
                fail();
            } catch (NullPointerException e) {
                assertTrue(e.getMessage().contains("Argument cannot be null: stageName"));
                info("Caught expected exception: " + e.getMessage());
            }
            try {
                grid(0).streamer((String) null).addEventsToStage((String) null, Collections.singletonList(0));
                fail();
            } catch (NullPointerException e2) {
                assertTrue(e2.getMessage().contains("Argument cannot be null: stageName"));
                info("Caught expected exception: " + e2.getMessage());
            }
        } finally {
            stopAllGrids(false);
        }
    }

    public void testNullStageNameInResultMap() throws Exception {
        this.atLeastOnce = true;
        this.router = new GridTestStreamerEventRouter();
        this.p2pEnabled = false;
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.7
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                if (streamerContext.nextStageName() == null) {
                    return null;
                }
                Integer num = (Integer) F.first(collection);
                HashMap hashMap = new HashMap();
                hashMap.put(null, F.asList(Integer.valueOf(num.intValue() + 1)));
                return hashMap;
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("0", sc), new GridTestStage("1", sc)});
        startGrids(2);
        try {
            GridTestStreamerEventRouter gridTestStreamerEventRouter = this.router;
            final CountDownLatch countDownLatch = new CountDownLatch(1);
            grid(0).streamer((String) null).addStreamerFailureListener(new StreamerFailureListener() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.8
                public void onFailure(String str, Collection<Object> collection, Throwable th) {
                    GridStreamerSelfTest.this.info("Expected failure: " + th.getMessage());
                    countDownLatch.countDown();
                }
            });
            gridTestStreamerEventRouter.put("0", grid(0).localNode().id());
            gridTestStreamerEventRouter.put("1", grid(1).localNode().id());
            grid(0).streamer((String) null).addEvent(0, new Object[0]);
            if ($assertionsDisabled || countDownLatch.await(5L, TimeUnit.SECONDS)) {
            } else {
                throw new AssertionError();
            }
        } finally {
            stopAllGrids(false);
        }
    }

    public void testPeerDeployment() throws Exception {
        Class loadClass = new GridTestExternalClassLoader(new URL[]{new URL(GridTestProperties.getProperty("p2p.uri.cls"))}, new String[0]).loadClass("org.apache.ignite.tests.p2p.CacheDeploymentTestKey");
        if (!$assertionsDisabled && loadClass == null) {
            throw new AssertionError();
        }
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.9
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                String nextStageName = streamerContext.nextStageName();
                streamerContext.window().enqueueAll(collection);
                if (nextStageName != null) {
                    return F.asMap(nextStageName, collection);
                }
                countDownLatch.countDown();
                return null;
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("a", sc), new GridTestStage("b", sc), new GridTestStage("c", sc)});
        this.router = new GridTestStreamerEventRouter();
        this.atLeastOnce = true;
        this.p2pEnabled = true;
        startGrids(4);
        try {
            IgniteEx grid = grid(0);
            IgniteEx grid2 = grid(1);
            IgniteEx grid3 = grid(2);
            IgniteEx grid4 = grid(3);
            System.out.println("Grid 0: " + grid.cluster().localNode().id());
            System.out.println("Grid 1: " + grid2.cluster().localNode().id());
            System.out.println("Grid 2: " + grid3.cluster().localNode().id());
            System.out.println("Grid 3: " + grid4.cluster().localNode().id());
            GridTestStreamerEventRouter gridTestStreamerEventRouter = this.router;
            gridTestStreamerEventRouter.put("a", grid2.cluster().localNode().id());
            gridTestStreamerEventRouter.put("b", grid3.cluster().localNode().id());
            gridTestStreamerEventRouter.put("c", grid4.cluster().localNode().id());
            IgniteStreamer streamer = grid.streamer((String) null);
            for (int i = 0; i < 100; i++) {
                streamer.addEvent(loadClass.newInstance(), new Object[0]);
            }
            countDownLatch.await();
            for (int i2 = 1; i2 < 4; i2++) {
                assertEquals(100, grid(i2).streamer((String) null).context().window().size());
            }
            stopGrid(0, false);
            GridTestUtils.retryAssert(this.log, 50, 50L, new CA() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.10
                public void apply() {
                    for (int i3 = 1; i3 < 4; i3++) {
                        TestCase.assertEquals(0, GridStreamerSelfTest.this.grid(i3).streamer((String) null).context().window().size());
                    }
                }
            });
            stopAllGrids();
        } catch (Throwable th) {
            stopAllGrids();
            throw th;
        }
    }

    public void testQuery() throws Exception {
        this.atLeastOnce = true;
        this.router = new StreamerRandomEventRouter();
        this.p2pEnabled = false;
        final CountDownLatch countDownLatch = new CountDownLatch(1000);
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.11
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                ConcurrentMap localSpace = streamerContext.localSpace();
                AtomicInteger atomicInteger = (AtomicInteger) localSpace.get(str);
                if (atomicInteger == null) {
                    atomicInteger = (AtomicInteger) F.addIfAbsent(localSpace, str, new AtomicInteger());
                }
                Iterator<Object> it = collection.iterator();
                while (it.hasNext()) {
                    atomicInteger.addAndGet(((Integer) it.next()).intValue());
                }
                String nextStageName = streamerContext.nextStageName();
                if (nextStageName != null) {
                    return F.asMap(nextStageName, collection);
                }
                countDownLatch.countDown();
                return null;
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("a", sc), new GridTestStage("b", sc), new GridTestStage("c", sc), new GridTestStage("d", sc)});
        startGrids(4);
        try {
            int i = 0;
            Random random = new Random();
            for (int i2 = 0; i2 < 1000; i2++) {
                int nextInt = random.nextInt(1000);
                grid(0).streamer((String) null).addEvent(Integer.valueOf(nextInt), new Object[0]);
                i += nextInt;
            }
            countDownLatch.await();
            HashMap hashMap = new HashMap(4);
            final String[] strArr = {"a", "b", "c", "d"};
            for (int i3 = 0; i3 < 4; i3++) {
                IgniteEx grid = grid(i3);
                ConcurrentMap localSpace = grid.streamer((String) null).context().localSpace();
                for (String str : strArr) {
                    AtomicInteger atomicInteger = (AtomicInteger) localSpace.get(str);
                    assertNotNull(atomicInteger);
                    info(">>>>> grid=" + grid.cluster().localNode().id() + ", s=" + str + ", val=" + atomicInteger.get());
                    Integer num = (Integer) hashMap.get(str);
                    if (num == null) {
                        hashMap.put(str, Integer.valueOf(atomicInteger.get()));
                    } else {
                        hashMap.put(str, Integer.valueOf(num.intValue() + atomicInteger.get()));
                    }
                }
            }
            for (String str2 : strArr) {
                assertEquals(Integer.valueOf(i), hashMap.get(str2));
            }
            StreamerContext context = grid(0).streamer((String) null).context();
            for (final String str3 : strArr) {
                assertEquals(i, F.sumInt(context.query(new C1<StreamerContext, Integer>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.12
                    public Integer apply(StreamerContext streamerContext) {
                        return Integer.valueOf(((AtomicInteger) streamerContext.localSpace().get(str3)).get());
                    }
                })));
            }
            context.broadcast(new CI1<StreamerContext>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.13
                public void apply(StreamerContext streamerContext) {
                    int i4 = 0;
                    ConcurrentMap localSpace2 = streamerContext.localSpace();
                    for (String str4 : strArr) {
                        i4 += ((AtomicInteger) localSpace2.get(str4)).get();
                    }
                    localSpace2.put("bcast", new AtomicInteger(i4));
                }
            });
            int i4 = 0;
            for (int i5 = 0; i5 < 4; i5++) {
                i4 += ((AtomicInteger) grid(i5).streamer((String) null).context().localSpace().get("bcast")).get();
            }
            assertEquals(i * strArr.length, i4);
            for (final String str4 : strArr) {
                assertEquals(Integer.valueOf(i), (Integer) context.reduce(new C1<StreamerContext, Integer>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.14
                    public Integer apply(StreamerContext streamerContext) {
                        return Integer.valueOf(((AtomicInteger) streamerContext.localSpace().get(str4)).get());
                    }
                }, F.sumIntReducer()));
            }
        } finally {
            stopAllGrids(false);
        }
    }

    public void testRandomRouterWithEmptyTopology() throws Exception {
        this.atLeastOnce = true;
        this.router = new StreamerRandomEventRouter(new IgnitePredicate[]{new IgnitePredicate<ClusterNode>() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.15
            public boolean apply(ClusterNode clusterNode) {
                return false;
            }
        }});
        this.p2pEnabled = false;
        SC sc = new SC() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.16
            public Map<String, Collection<?>> applyx(String str, StreamerContext streamerContext, Collection<Object> collection) {
                if (streamerContext.nextStageName() == null) {
                    return null;
                }
                return F.asMap(streamerContext.nextStageName(), F.asList(0));
            }
        };
        this.stages = F.asList(new StreamerStage[]{new GridTestStage("0", sc), new GridTestStage("1", sc), new GridTestStage("2", sc)});
        startGrids(1);
        try {
            final CountDownLatch countDownLatch = new CountDownLatch(10);
            grid(0).streamer((String) null).addStreamerFailureListener(new StreamerFailureListener() { // from class: org.apache.ignite.internal.processors.streamer.GridStreamerSelfTest.17
                public void onFailure(String str, Collection<Object> collection, Throwable th) {
                    GridStreamerSelfTest.this.info("Expected failure: " + th.getMessage());
                    countDownLatch.countDown();
                }
            });
            for (int i = 0; i < 10; i++) {
                grid(0).streamer((String) null).addEvent(0, new Object[0]);
            }
            if (!$assertionsDisabled && !countDownLatch.await(5L, TimeUnit.SECONDS)) {
                throw new AssertionError();
            }
        } finally {
            stopAllGrids(false);
        }
    }

    private void checkMetrics(Ignite ignite, String str, int i, boolean z) {
        IgniteStreamer streamer = ignite.streamer((String) null);
        StreamerMetrics metrics = streamer.metrics();
        assertEquals(i, metrics.stageTotalExecutionCount());
        assertEquals(0, metrics.stageWaitingExecutionCount());
        assertEquals(0, metrics.currentActiveSessions());
        assertEquals(0, metrics.maximumActiveSessions());
        assertEquals(0, metrics.failuresCount());
        if (z) {
            assertEquals(4, metrics.pipelineMaximumExecutionNodes());
            assertEquals(4, metrics.pipelineMinimumExecutionNodes());
            assertEquals(4, metrics.pipelineAverageExecutionNodes());
            assertTrue(metrics.pipelineMaximumExecutionTime() > 0);
            assertTrue(metrics.pipelineMinimumExecutionTime() > 0);
            assertTrue(metrics.pipelineAverageExecutionTime() > 0);
        } else {
            assertEquals(0, metrics.pipelineMaximumExecutionNodes());
            assertEquals(0, metrics.pipelineMinimumExecutionNodes());
            assertEquals(0, metrics.pipelineAverageExecutionNodes());
            assertEquals(0L, metrics.pipelineMaximumExecutionTime());
            assertEquals(0L, metrics.pipelineMinimumExecutionTime());
            assertEquals(0L, metrics.pipelineAverageExecutionTime());
        }
        StreamerStageMetrics stageMetrics = streamer.metrics().stageMetrics(str);
        assertNotNull(stageMetrics);
        assertTrue(stageMetrics.averageExecutionTime() > 0);
        assertTrue(stageMetrics.minimumExecutionTime() > 0);
        assertTrue(stageMetrics.maximumExecutionTime() > 0);
        assertEquals(i, stageMetrics.totalExecutionCount());
        assertEquals(0, stageMetrics.failuresCount());
        assertFalse(stageMetrics.executing());
    }

    private void checkZeroMetrics(Ignite ignite, String... strArr) {
        for (String str : strArr) {
            StreamerStageMetrics stageMetrics = ignite.streamer((String) null).metrics().stageMetrics(str);
            assertNotNull(stageMetrics);
            assertEquals(0, stageMetrics.failuresCount());
            assertEquals(0L, stageMetrics.averageExecutionTime());
            assertEquals(0L, stageMetrics.minimumExecutionTime());
            assertEquals(0L, stageMetrics.maximumExecutionTime());
            assertFalse(stageMetrics.executing());
        }
    }

    static {
        $assertionsDisabled = !GridStreamerSelfTest.class.desiredAssertionStatus();
        IP_FINDER = new TcpDiscoveryVmIpFinder(true);
    }
}
