package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/LatencyTest.class */
public class LatencyTest {
    private static final Logger LOG = LoggerFactory.getLogger(LatencyTest.class);

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    private LogicalPlan dag;
    private StreamingContainerManager scm;
    private PTOperator o1p1;
    private PTOperator o2p1;
    private PTOperator o3p1;
    private static final int windowWidthMillis = 600;
    private static final int heartbeatTimeoutMillis = 30000;

    @Before
    public void setup() {
        this.dag = StramTestSupport.createDAG(this.testMeta);
        this.dag.setAttribute(Context.DAGContext.STREAMING_WINDOW_SIZE_MILLIS, Integer.valueOf(windowWidthMillis));
        this.dag.setAttribute(Context.DAGContext.HEARTBEAT_TIMEOUT_MILLIS, Integer.valueOf(heartbeatTimeoutMillis));
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3", GenericTestOperator.class);
        this.dag.addStream("o1.output1", addOperator.outport1, addOperator3.inport1);
        this.dag.addStream("o2.output1", addOperator2.outport1, addOperator3.inport2);
        this.scm = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = this.scm.getPhysicalPlan();
        this.o1p1 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        this.o2p1 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(0);
        this.o3p1 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator3)).get(0);
    }

    private long getLatency(long j, long j2, long j3, final boolean z, final long j4, final long j5, final long j6) {
        this.o1p1.stats.statsRevs.checkout();
        this.o1p1.stats.currentWindowId.set(j);
        this.o1p1.stats.statsRevs.commit();
        this.o2p1.stats.statsRevs.checkout();
        this.o2p1.stats.currentWindowId.set(j2);
        this.o2p1.stats.statsRevs.commit();
        this.o3p1.stats.statsRevs.checkout();
        this.o3p1.stats.currentWindowId.set(j3);
        this.o3p1.stats.statsRevs.commit();
        return this.scm.updateOperatorLatency(this.o3p1, new StreamingContainerManager.UpdateOperatorLatencyContext() { // from class: com.datatorrent.stram.LatencyTest.1
            long getRPCLatency(PTOperator pTOperator) {
                return 0L;
            }

            boolean endWindowStatsExists(long j7) {
                return z;
            }

            long getEndWindowEmitTimestamp(long j7, PTOperator pTOperator) {
                if (pTOperator == LatencyTest.this.o1p1) {
                    return j4;
                }
                if (pTOperator == LatencyTest.this.o2p1) {
                    return j5;
                }
                if (pTOperator == LatencyTest.this.o3p1) {
                    return j6;
                }
                Assert.fail();
                return 0L;
            }
        });
    }

    @Test
    public void testLatency() {
        Assert.assertEquals(100L, getLatency(1000L, 1000L, 1000L, true, 1000L, 1500L, 1600L));
        Assert.assertEquals(5940000L, getLatency(10000L, 10000L, 100L, true, 1000L, 1500L, 1600L));
        Assert.assertEquals(1800L, getLatency(1000L, 1000L, 997L, false, 1000L, 1500L, 1600L));
        Assert.assertEquals(-1L, getLatency(1000L, 1000L, 1001L, true, -1L, -1L, 1600L));
        Assert.assertEquals(-1L, getLatency(1000L, 1000L, 0L, false, -1L, -1L, -1L));
        Assert.assertEquals(0L, getLatency(1000L, 90L, 1000L, false, -1L, -1L, -1L));
    }
}
