package com.datatorrent.stram.engine;

import com.datatorrent.api.AutoMetric;
import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.Operator;
import com.datatorrent.api.Partitioner;
import com.datatorrent.api.Stats;
import com.datatorrent.api.StatsListener;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.stram.StramLocalCluster;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.logical.LogicalPlanConfiguration;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import javax.validation.constraints.NotNull;
import org.apache.hadoop.conf.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest.class */
public class AutoMetricTest {
    private static final Logger LOG = LoggerFactory.getLogger(AutoMetricTest.class);
    private LogicalPlan dag;

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();

    /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$MockAggregator.class */
    private static class MockAggregator implements AutoMetric.Aggregator, Serializable {
        long cachedSum;
        Map<String, Object> result;
        transient CountDownLatch latch;
        private static final long serialVersionUID = 201503311744L;

        private MockAggregator(CountDownLatch countDownLatch) {
            this.cachedSum = -1L;
            this.result = Maps.newHashMap();
            this.latch = (CountDownLatch) Preconditions.checkNotNull(countDownLatch);
        }

        public Map<String, Object> aggregate(long j, Collection<AutoMetric.PhysicalMetricsContext> collection) {
            long j2 = 0;
            int i = 0;
            for (AutoMetric.PhysicalMetricsContext physicalMetricsContext : collection) {
                j2 += ((Integer) physicalMetricsContext.getMetrics().get("progress")).intValue();
                if (physicalMetricsContext.getMetrics().containsKey("myMetric")) {
                    i += ((Integer) physicalMetricsContext.getMetrics().get("myMetric")).intValue();
                }
            }
            this.cachedSum = j2;
            this.result.put("progress", Long.valueOf(this.cachedSum));
            this.result.put("myMetric", Integer.valueOf(i));
            this.latch.countDown();
            return this.result;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$OperatorAndAggregator.class */
    public static class OperatorAndAggregator extends OperatorWithMetrics implements AutoMetric.Aggregator {
        Map<String, Object> result;
        private final transient CountDownLatch latch;

        private OperatorAndAggregator() {
            this.result = Maps.newHashMap();
            this.latch = null;
        }

        OperatorAndAggregator(@NotNull CountDownLatch countDownLatch) {
            this.result = Maps.newHashMap();
            this.latch = (CountDownLatch) Preconditions.checkNotNull(countDownLatch);
        }

        public Map<String, Object> aggregate(long j, Collection<AutoMetric.PhysicalMetricsContext> collection) {
            this.result.put("progress", collection.iterator().next().getMetrics().get("progress"));
            this.latch.countDown();
            return this.result;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$OperatorWithMetricMethod.class */
    public static class OperatorWithMetricMethod extends OperatorWithMetrics {
        @AutoMetric
        public int getMyMetric() {
            return 3;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$OperatorWithMetrics.class */
    public static class OperatorWithMetrics extends GenericTestOperator {

        @AutoMetric
        protected int progress;

        public void endWindow() {
            this.progress = 1;
            super.endWindow();
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$TestOperator.class */
    public static class TestOperator extends TestGeneratorInputOperator implements Partitioner<TestOperator>, StatsListener {
        private static TestOperatorStats lastMetric = null;
        private static Thread processStatsThread = null;
        private static Thread definePartitionsThread = null;
        private transient boolean propVal;

        @AutoMetric
        private TestOperatorStats operatorMetric;

        /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$TestOperator$TestOperatorStats.class */
        static class TestOperatorStats implements Serializable {
            private String message;
            private boolean attributeListenerCalled;
            private static final long serialVersionUID = -8096838101190642798L;
            private boolean currentPropVal;

            TestOperatorStats() {
            }
        }

        /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$TestOperator$TestStatsListener.class */
        public static class TestStatsListener implements StatsListener, Serializable {
            private static final long serialVersionUID = 1;
            private boolean lastPropVal;

            /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$TestOperator$TestStatsListener$SetPropertyRequest.class */
            public static class SetPropertyRequest implements StatsListener.OperatorRequest, Serializable {
                private static final long serialVersionUID = 1;

                public StatsListener.OperatorResponse execute(Operator operator, int i, long j) throws IOException {
                    if (operator instanceof TestOperator) {
                        AutoMetricTest.LOG.debug("Setting property");
                        ((TestOperator) operator).propVal = true;
                    }
                    return new TestOperatorResponse();
                }
            }

            /* loaded from: input_file:com/datatorrent/stram/engine/AutoMetricTest$TestOperator$TestStatsListener$TestOperatorResponse.class */
            public static class TestOperatorResponse implements StatsListener.OperatorResponse, Serializable {
                private static final long serialVersionUID = 2;

                public Object getResponseId() {
                    return 1;
                }

                public Object getResponse() {
                    return "test";
                }
            }

            public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
                for (Stats.OperatorStats operatorStats : batchedOperatorStats.getLastWindowedStats()) {
                    Assert.assertNotNull("metrics", operatorStats.metrics.get("operatorMetric"));
                    TestOperatorStats testOperatorStats = (TestOperatorStats) operatorStats.metrics.get("operatorMetric");
                    testOperatorStats.attributeListenerCalled = true;
                    this.lastPropVal = testOperatorStats.currentPropVal;
                }
                if (this.lastPropVal) {
                    Assert.assertNotNull(batchedOperatorStats.getOperatorResponse());
                    Assert.assertTrue(1 == batchedOperatorStats.getOperatorResponse().size());
                    Assert.assertEquals("test", ((StatsListener.OperatorResponse) batchedOperatorStats.getOperatorResponse().get(0)).getResponse());
                }
                StatsListener.Response response = new StatsListener.Response();
                response.operatorRequests = Lists.newArrayList(new SetPropertyRequest[]{new SetPropertyRequest()});
                return response;
            }
        }

        public void partitioned(Map<Integer, Partitioner.Partition<TestOperator>> map) {
        }

        public Collection<Partitioner.Partition<TestOperator>> definePartitions(Collection<Partitioner.Partition<TestOperator>> collection, Partitioner.PartitioningContext partitioningContext) {
            ArrayList newArrayList = Lists.newArrayList();
            newArrayList.addAll(collection);
            Iterator<Partitioner.Partition<TestOperator>> it = collection.iterator();
            while (it.hasNext()) {
                StatsListener.BatchedOperatorStats stats = it.next().getStats();
                if (stats != null) {
                    definePartitionsThread = Thread.currentThread();
                    for (Stats.OperatorStats operatorStats : stats.getLastWindowedStats()) {
                        if (operatorStats.metrics.get("operatorMetric") != null) {
                            lastMetric = (TestOperatorStats) operatorStats.metrics.get("operatorMetric");
                        }
                    }
                }
            }
            return newArrayList;
        }

        @Override // com.datatorrent.stram.engine.TestGeneratorInputOperator
        public void endWindow() {
            super.endWindow();
            this.operatorMetric = new TestOperatorStats();
            this.operatorMetric.message = "interesting";
            this.operatorMetric.currentPropVal = this.propVal;
        }

        public StatsListener.Response processStats(StatsListener.BatchedOperatorStats batchedOperatorStats) {
            processStatsThread = Thread.currentThread();
            Iterator it = batchedOperatorStats.getLastWindowedStats().iterator();
            while (it.hasNext()) {
                Assert.assertNotNull("metric in listener", ((Stats.OperatorStats) it.next()).metrics.get("operatorMetric"));
            }
            StatsListener.Response response = new StatsListener.Response();
            response.repartitionRequired = true;
            return response;
        }
    }

    @Before
    public void setup() {
        this.dag = StramTestSupport.createDAG(this.testMeta);
    }

    @Test
    public void testMetricPropagation() throws Exception {
        this.dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
        this.dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        TestOperator addOperator = this.dag.addOperator("TestOperator", TestOperator.class);
        TestOperator.TestStatsListener testStatsListener = new TestOperator.TestStatsListener();
        this.dag.setAttribute(addOperator, Context.OperatorContext.STATS_LISTENERS, Lists.newArrayList(new StatsListener[]{testStatsListener}));
        this.dag.addStream("TestTuples", addOperator.outport, this.dag.addOperator("Collector", new GenericTestOperator()).inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        long currentTimeMillis = System.currentTimeMillis();
        while (TestOperator.lastMetric == null && StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - currentTimeMillis) {
            Thread.sleep(300L);
            LOG.debug("Waiting for stats");
        }
        while (StramTestSupport.DEFAULT_TIMEOUT_MILLIS > System.currentTimeMillis() - currentTimeMillis && !testStatsListener.lastPropVal) {
            Thread.sleep(100L);
            LOG.debug("Waiting for property set");
        }
        stramLocalCluster.shutdown();
        Assert.assertNotNull("metric received", TestOperator.lastMetric);
        Assert.assertEquals("metric message", "interesting", TestOperator.lastMetric.message);
        Assert.assertTrue("attribute defined stats listener called", TestOperator.lastMetric.attributeListenerCalled);
        Assert.assertSame("single thread", TestOperator.definePartitionsThread, TestOperator.processStatsThread);
        Assert.assertTrue("property set", testStatsListener.lastPropVal);
    }

    @Test
    public void testMetrics() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration());
        TestGeneratorInputOperator addOperator = this.dag.addOperator("input", TestGeneratorInputOperator.class);
        OperatorWithMetrics addOperator2 = this.dag.addOperator("o1", OperatorWithMetrics.class);
        MockAggregator mockAggregator = new MockAggregator(countDownLatch);
        this.dag.setAttribute(addOperator2, Context.OperatorContext.METRICS_AGGREGATOR, mockAggregator);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.addStream("TestTuples", addOperator.outport, addOperator2.inport1);
        logicalPlanConfiguration.prepareDAG(this.dag, (StreamingApplication) null, "AutoMetricTest");
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        countDownLatch.await();
        Assert.assertEquals("progress", 1L, ((Long) mockAggregator.result.get("progress")).longValue());
        stramLocalCluster.shutdown();
    }

    @Test
    @Ignore
    public void testMetricsAggregations() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration());
        TestGeneratorInputOperator addOperator = this.dag.addOperator("input", TestGeneratorInputOperator.class);
        OperatorWithMetrics addOperator2 = this.dag.addOperator("o1", OperatorWithMetrics.class);
        MockAggregator mockAggregator = new MockAggregator(countDownLatch);
        this.dag.setAttribute(addOperator2, Context.OperatorContext.METRICS_AGGREGATOR, mockAggregator);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.setAttribute(addOperator2, Context.OperatorContext.PARTITIONER, new StatelessPartitioner(2));
        this.dag.addStream("TestTuples", addOperator.outport, addOperator2.inport1);
        logicalPlanConfiguration.prepareDAG(this.dag, (StreamingApplication) null, "AutoMetricTest");
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        countDownLatch.await();
        Assert.assertEquals("progress", 2L, ((Long) mockAggregator.result.get("progress")).longValue());
        stramLocalCluster.shutdown();
    }

    @Test
    public void testInjectionOfDefaultMetricsAggregator() throws Exception {
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration());
        TestGeneratorInputOperator addOperator = this.dag.addOperator("input", TestGeneratorInputOperator.class);
        OperatorWithMetricMethod addOperator2 = this.dag.addOperator("o1", OperatorWithMetricMethod.class);
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.addStream("TestTuples", addOperator.outport, addOperator2.inport1);
        logicalPlanConfiguration.prepareDAG(this.dag, (StreamingApplication) null, "AutoMetricTest");
        Assert.assertNotNull("default aggregator injected", this.dag.getOperatorMeta("o1").getMetricAggregatorMeta().getAggregator());
    }

    @Test
    public void testDefaultMetricsAggregator() throws Exception {
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration());
        TestGeneratorInputOperator addOperator = this.dag.addOperator("input", TestGeneratorInputOperator.class);
        CountDownLatch countDownLatch = new CountDownLatch(1);
        OperatorAndAggregator addOperator2 = this.dag.addOperator("o1", new OperatorAndAggregator(countDownLatch));
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.addStream("TestTuples", addOperator.outport, addOperator2.inport1);
        logicalPlanConfiguration.prepareDAG(this.dag, (StreamingApplication) null, "AutoMetricTest");
        Assert.assertNotNull("default aggregator injected", this.dag.getOperatorMeta("o1").getMetricAggregatorMeta().getAggregator());
        logicalPlanConfiguration.prepareDAG(this.dag, (StreamingApplication) null, "AutoMetricTest");
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        countDownLatch.await();
        Assert.assertEquals("progress", 1, addOperator2.result.get("progress"));
        stramLocalCluster.shutdown();
    }

    @Test
    public void testMetricsAnnotatedMethod() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LogicalPlanConfiguration logicalPlanConfiguration = new LogicalPlanConfiguration(new Configuration());
        TestGeneratorInputOperator addOperator = this.dag.addOperator("input", TestGeneratorInputOperator.class);
        OperatorWithMetricMethod addOperator2 = this.dag.addOperator("o1", OperatorWithMetricMethod.class);
        this.dag.setAttribute(addOperator2, Context.OperatorContext.METRICS_AGGREGATOR, new MockAggregator(countDownLatch));
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.addStream("TestTuples", addOperator.outport, addOperator2.inport1);
        logicalPlanConfiguration.prepareDAG(this.dag, (StreamingApplication) null, "AutoMetricTest");
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.runAsync();
        countDownLatch.await();
        Assert.assertEquals("myMetric", 3L, ((Integer) r0.result.get("myMetric")).intValue());
        stramLocalCluster.shutdown();
    }
}
