package com.datatorrent.stram.engine;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.support.StramTestSupport;
import java.io.File;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/StatsTest.class */
public class StatsTest {
    private static final Logger LOG = LoggerFactory.getLogger(StatsTest.class);

    @StatsListener.DataQueueSize
    /* loaded from: input_file:com/datatorrent/stram/engine/StatsTest$TestCollector.class */
    public static class TestCollector extends GenericTestOperator implements StatsListener {
        transient long windowId;
        List<Stats.OperatorStats> collectorOperatorStats = new ArrayList();

        @StatsListener.DataQueueSize
        /* loaded from: input_file:com/datatorrent/stram/engine/StatsTest$TestCollector$QueueAwareTestCollectorStatsListener.class */
        public static class QueueAwareTestCollectorStatsListener extends TestCollectorStatsListener {
            private static final long serialVersionUID = 2;

            @Override // com.datatorrent.stram.engine.StatsTest.TestCollector.TestCollectorStatsListener
            public void validateStats() {
                Iterator<Stats.OperatorStats> it = this.collectorOperatorStats.iterator();
                while (it.hasNext()) {
                    Iterator it2 = it.next().inputPorts.iterator();
                    while (it2.hasNext()) {
                        Stats.OperatorStats.PortStats portStats = (Stats.OperatorStats.PortStats) it2.next();
                        Assert.assertTrue("Validate input port queue size " + portStats.queueSize, portStats.queueSize == 0);
                    }
                }
            }
        }

        /* loaded from: input_file:com/datatorrent/stram/engine/StatsTest$TestCollector$TestCollectorStatsListener.class */
        public static class TestCollectorStatsListener implements StatsListener, Serializable {
            private static final long serialVersionUID = 1;
            List<Stats.OperatorStats> collectorOperatorStats = new ArrayList();

            public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
                this.collectorOperatorStats.addAll(batchedOperatorStats.getLastWindowedStats());
                return new StatsListener.Response();
            }

            public void validateStats() {
                Iterator<Stats.OperatorStats> it = this.collectorOperatorStats.iterator();
                while (it.hasNext()) {
                    Iterator it2 = it.next().inputPorts.iterator();
                    while (it2.hasNext()) {
                        Stats.OperatorStats.PortStats portStats = (Stats.OperatorStats.PortStats) it2.next();
                        Assert.assertTrue("Validate input port queue size " + portStats.queueSize, portStats.queueSize >= 0);
                    }
                }
            }
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            this.collectorOperatorStats.addAll(batchedOperatorStats.getLastWindowedStats());
            return new StatsListener.Response();
        }

        public void validateStats() {
            Iterator<Stats.OperatorStats> it = this.collectorOperatorStats.iterator();
            while (it.hasNext()) {
                Iterator it2 = it.next().inputPorts.iterator();
                while (it2.hasNext()) {
                    Stats.OperatorStats.PortStats portStats = (Stats.OperatorStats.PortStats) it2.next();
                    Assert.assertTrue("Validate input port queue size " + portStats.queueSize, portStats.queueSize == 0);
                }
            }
        }

        public void beginWindow(long j) {
            this.windowId = j;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/StatsTest$TestOperator.class */
    public static class TestOperator extends TestGeneratorInputOperator {
        transient long windowId;
        boolean shutdown = false;

        /* loaded from: input_file:com/datatorrent/stram/engine/StatsTest$TestOperator$TestInputStatsListener.class */
        public static class TestInputStatsListener implements StatsListener, Serializable {
            private static final long serialVersionUID = 1;
            private List<Stats.OperatorStats> inputOperatorStats = new ArrayList();

            public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
                this.inputOperatorStats.addAll(batchedOperatorStats.getLastWindowedStats());
                return new StatsListener.Response();
            }
        }

        @Override // com.datatorrent.stram.engine.TestGeneratorInputOperator
        public void beginWindow(long j) {
            if (this.shutdown) {
                BaseOperator.shutdown();
            }
            this.windowId = j;
        }

        @Override // com.datatorrent.stram.engine.TestGeneratorInputOperator
        public void emitTuples() {
            try {
                if (!this.shutdown) {
                    super.emitTuples();
                }
            } catch (Operator.ShutdownException e) {
                this.shutdown = true;
            }
        }
    }

    @Test
    public void testPortStatsPropagation() throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target").getAbsolutePath(), (Configuration) null));
        TestOperator addOperator = logicalPlan.addOperator("TestOperator", TestOperator.class);
        TestOperator.TestInputStatsListener testInputStatsListener = new TestOperator.TestInputStatsListener();
        logicalPlan.setOperatorAttribute(addOperator, OperatorContext.STATS_LISTENERS, Arrays.asList(testInputStatsListener));
        addOperator.setMaxTuples(10);
        addOperator.setEmitInterval(0);
        TestCollector addOperator2 = logicalPlan.addOperator("Collector", new TestCollector());
        TestCollector.TestCollectorStatsListener testCollectorStatsListener = new TestCollector.TestCollectorStatsListener();
        logicalPlan.setOperatorAttribute(addOperator2, OperatorContext.STATS_LISTENERS, Arrays.asList(testCollectorStatsListener));
        logicalPlan.addStream("TestTuples", addOperator.outport, addOperator2.inport1).setLocality((DAG.Locality) null);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.run();
        Assert.assertFalse("input operator stats", testInputStatsListener.inputOperatorStats.isEmpty());
        Assert.assertFalse("collector operator stats", testCollectorStatsListener.collectorOperatorStats.isEmpty());
        try {
            int i = 0;
            long j = 0;
            Iterator it = testInputStatsListener.inputOperatorStats.iterator();
            while (it.hasNext()) {
                Iterator it2 = ((Stats.OperatorStats) it.next()).outputPorts.iterator();
                while (it2.hasNext()) {
                    Stats.OperatorStats.PortStats portStats = (Stats.OperatorStats.PortStats) it2.next();
                    i += portStats.tupleCount;
                    j += portStats.bufferServerBytes;
                }
            }
            int i2 = 0;
            long j2 = 0;
            Iterator<Stats.OperatorStats> it3 = testCollectorStatsListener.collectorOperatorStats.iterator();
            while (it3.hasNext()) {
                Iterator it4 = it3.next().inputPorts.iterator();
                while (it4.hasNext()) {
                    Stats.OperatorStats.PortStats portStats2 = (Stats.OperatorStats.PortStats) it4.next();
                    i2 += portStats2.tupleCount;
                    j2 += portStats2.bufferServerBytes;
                }
            }
            Assert.assertEquals("Tuple Count emitted", 10, i);
            Assert.assertTrue("Buffer server bytes", j2 > 0);
            Assert.assertEquals("Tuple Count processed", 10, i2);
            Assert.assertTrue("Buffer server bytes", j > 0);
            stramLocalCluster.shutdown();
        } catch (Throwable th) {
            stramLocalCluster.shutdown();
            throw th;
        }
    }

    private void baseTestForQueueSize(int i, TestCollector.TestCollectorStatsListener testCollectorStatsListener, DAG.Locality locality) throws Exception {
        LogicalPlan logicalPlan = new LogicalPlan();
        logicalPlan.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(new File("target/baseTestForQueueSize").getAbsolutePath(), (Configuration) null));
        logicalPlan.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 200);
        TestOperator addOperator = logicalPlan.addOperator("TestOperator", TestOperator.class);
        addOperator.setMaxTuples(i);
        TestCollector addOperator2 = logicalPlan.addOperator("Collector", new TestCollector());
        if (testCollectorStatsListener != null) {
            logicalPlan.setOperatorAttribute(addOperator2, OperatorContext.STATS_LISTENERS, Arrays.asList(testCollectorStatsListener));
        }
        logicalPlan.addStream("TestTuples", addOperator.outport, addOperator2.inport1).setLocality(locality);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(logicalPlan);
        stramLocalCluster.runAsync();
        Iterator it = stramLocalCluster.getStreamingContainerManager().getPhysicalPlan().getAllOperators().values().iterator();
        while (it.hasNext()) {
            StramTestSupport.waitForActivation(stramLocalCluster, (PTOperator) it.next());
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (testCollectorStatsListener != null) {
            while (testCollectorStatsListener.collectorOperatorStats.isEmpty() && StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - currentTimeMillis) {
                Thread.sleep(300L);
                LOG.debug("Waiting for stats");
            }
        } else {
            while (addOperator2.collectorOperatorStats.isEmpty() && StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - currentTimeMillis) {
                Thread.sleep(300L);
                LOG.debug("Waiting for stats");
            }
        }
        if (testCollectorStatsListener != null) {
            testCollectorStatsListener.validateStats();
        } else {
            addOperator2.validateStats();
        }
        stramLocalCluster.shutdown();
    }

    @Test
    public void testQueueSizeForContainerLocalOperators() throws Exception {
        baseTestForQueueSize(10, new TestCollector.TestCollectorStatsListener(), DAG.Locality.CONTAINER_LOCAL);
    }

    @Test
    public void testQueueSize() throws Exception {
        baseTestForQueueSize(10, new TestCollector.TestCollectorStatsListener(), null);
    }

    @Test
    public void testQueueSizeWithQueueAwareStatsListenerForContainerLocalOperators() throws Exception {
        baseTestForQueueSize(0, new TestCollector.QueueAwareTestCollectorStatsListener(), DAG.Locality.CONTAINER_LOCAL);
    }

    @Test
    public void testQueueSizeWithQueueAwareStatsListener() throws Exception {
        baseTestForQueueSize(0, new TestCollector.QueueAwareTestCollectorStatsListener(), null);
    }

    @Test
    public void testQueueSizeWithOperatorAsStatsListener() throws Exception {
        baseTestForQueueSize(0, null, null);
    }
}
