package com.datatorrent.stram;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.api.InputOperator;
import com.datatorrent.api.Operator;
import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
import com.datatorrent.api.annotation.Stateless;
import com.datatorrent.common.util.AsyncFSStorageAgent;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.common.util.FSStorageAgent;
import com.datatorrent.stram.StreamingContainerManager;
import com.datatorrent.stram.api.Checkpoint;
import com.datatorrent.stram.api.StreamingContainerUmbilicalProtocol;
import com.datatorrent.stram.engine.GenericTestOperator;
import com.datatorrent.stram.engine.OperatorContext;
import com.datatorrent.stram.plan.logical.LogicalPlan;
import com.datatorrent.stram.plan.physical.PTContainer;
import com.datatorrent.stram.plan.physical.PTOperator;
import com.datatorrent.stram.plan.physical.PhysicalPlan;
import com.datatorrent.stram.support.StramTestSupport;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/CheckpointTest.class */
public class CheckpointTest {
    private static final Logger LOG = LoggerFactory.getLogger(CheckpointTest.class);

    @Rule
    public StramTestSupport.TestMeta testMeta = new StramTestSupport.TestMeta();
    private LogicalPlan dag;

    /* loaded from: input_file:com/datatorrent/stram/CheckpointTest$MockClock.class */
    public class MockClock implements Clock {
        public long time = 0;

        public MockClock() {
        }

        public long getTime() {
            return this.time;
        }
    }

    /* loaded from: input_file:com/datatorrent/stram/CheckpointTest$MockInputOperator.class */
    private static class MockInputOperator extends BaseOperator implements InputOperator, Operator.CheckpointNotificationListener {

        @OutputPortFieldAnnotation(optional = true)
        public final transient DefaultOutputPort<Object> outport;
        private transient int windowCount;
        private int checkpointState;

        private MockInputOperator() {
            this.outport = new DefaultOutputPort<>();
        }

        public void beginWindow(long j) {
            int i = this.windowCount + 1;
            this.windowCount = i;
            if (i == 3) {
                BaseOperator.shutdown();
            }
        }

        public void emitTuples() {
        }

        public void beforeCheckpoint(long j) {
            this.checkpointState++;
        }

        public void checkpointed(long j) {
        }

        public void committed(long j) {
        }
    }

    @Stateless
    /* loaded from: input_file:com/datatorrent/stram/CheckpointTest$StatelessOperator.class */
    public static class StatelessOperator extends GenericTestOperator {
    }

    @Before
    public void setup() {
        this.dag = StramTestSupport.createDAG(this.testMeta);
    }

    @Test
    public void testBackup() throws Exception {
        AsyncFSStorageAgent asyncFSStorageAgent = new AsyncFSStorageAgent(this.testMeta.getPath(), (Configuration) null);
        asyncFSStorageAgent.setSyncCheckpoint(true);
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, asyncFSStorageAgent);
        this.dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
        this.dag.setAttribute(LogicalPlan.HEARTBEAT_INTERVAL_MILLIS, 50);
        this.dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
        this.dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 50);
        MockInputOperator addOperator = this.dag.addOperator("o1", new MockInputOperator());
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        this.dag.setAttribute(addOperator2, OperatorContext.STATELESS, true);
        this.dag.addStream("o1.outport", addOperator.outport, addOperator2.inport1).setLocality(DAG.Locality.CONTAINER_LOCAL);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.run();
        PhysicalPlan physicalPlan = stramLocalCluster.dnmgr.getPhysicalPlan();
        Assert.assertEquals("number required containers", 1L, r0.getPhysicalPlan().getContainers().size());
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        HashSet newHashSet = Sets.newHashSet();
        for (long j : asyncFSStorageAgent.getWindowIds(pTOperator.getId())) {
            newHashSet.add(Long.valueOf(j));
        }
        Assert.assertEquals("number checkpoints " + newHashSet, 3L, newHashSet.size());
        Assert.assertTrue("contains " + newHashSet + " -1", newHashSet.contains(-1L));
        PTOperator pTOperator2 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(0);
        HashSet newHashSet2 = Sets.newHashSet();
        for (long j2 : asyncFSStorageAgent.getWindowIds(pTOperator2.getId())) {
            newHashSet2.add(Long.valueOf(j2));
        }
        Assert.assertEquals("number checkpoints " + newHashSet2, 1L, newHashSet2.size());
        Assert.assertEquals("checkpoints " + pTOperator2, Sets.newHashSet(new Long[]{-1L}), newHashSet2);
        Assert.assertEquals("checkpoints " + pTOperator + " " + pTOperator.checkpoints, 2L, pTOperator.checkpoints.size());
        Assert.assertNotNull("checkpoint not null for statefull operator " + pTOperator, pTOperator.stats.checkpointStats);
        Iterator it = pTOperator.checkpoints.iterator();
        while (it.hasNext()) {
            Assert.assertEquals("Stored Operator and Saved State", asyncFSStorageAgent.load(pTOperator.getId(), ((Checkpoint) it.next()).windowId).getClass(), pTOperator.getOperatorMeta().getOperator().getClass());
        }
    }

    @Test
    public void testUpdateRecoveryCheckpoint() throws Exception {
        SystemClock systemClock = new SystemClock();
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        GenericTestOperator addOperator3 = this.dag.addOperator("o3SL", StatelessOperator.class);
        this.dag.addStream("o1.output1", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("o2.output1", addOperator2.outport1, addOperator3.inport1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        for (PTOperator pTOperator : physicalPlan.getAllOperators().values()) {
            Assert.assertEquals("activation windowId " + pTOperator, Checkpoint.INITIAL_CHECKPOINT, pTOperator.getRecoveryCheckpoint());
            Assert.assertEquals("checkpoints " + pTOperator, Collections.emptyList(), pTOperator.checkpoints);
        }
        List operators = physicalPlan.getOperators(this.dag.getMeta(addOperator));
        Assert.assertNotNull(operators);
        Assert.assertEquals(1L, operators.size());
        PTOperator pTOperator2 = (PTOperator) operators.get(0);
        PTOperator pTOperator3 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(0);
        PTOperator pTOperator4 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator3)).get(0);
        Iterator it = physicalPlan.getAllOperators().values().iterator();
        while (it.hasNext()) {
            Assert.assertEquals("", PTOperator.State.PENDING_DEPLOY, ((PTOperator) it.next()).getState());
        }
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator3, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("no checkpoints " + pTOperator3, Checkpoint.INITIAL_CHECKPOINT, pTOperator3.getRecoveryCheckpoint());
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext = new StreamingContainerManager.UpdateCheckpointsContext(systemClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, updateCheckpointsContext);
        Assert.assertEquals("no checkpoints " + pTOperator2, Checkpoint.INITIAL_CHECKPOINT, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("number dependencies " + updateCheckpointsContext.visited, 3L, updateCheckpointsContext.visited.size());
        Checkpoint checkpoint = new Checkpoint(3L, 0, 0);
        Checkpoint checkpoint2 = new Checkpoint(5L, 0, 0);
        Checkpoint checkpoint3 = new Checkpoint(4L, 0, 0);
        pTOperator2.checkpoints.add(checkpoint);
        pTOperator2.checkpoints.add(checkpoint2);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("checkpoint " + pTOperator2, Checkpoint.INITIAL_CHECKPOINT, pTOperator2.getRecoveryCheckpoint());
        pTOperator3.checkpoints.add(new Checkpoint(3L, 0, 0));
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("checkpoint " + pTOperator2, Checkpoint.INITIAL_CHECKPOINT, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator3, Checkpoint.INITIAL_CHECKPOINT, pTOperator3.getRecoveryCheckpoint());
        streamingContainerManager.addCheckpoint(pTOperator4, checkpoint2);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("checkpoint " + pTOperator2, Checkpoint.INITIAL_CHECKPOINT, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator3, Checkpoint.INITIAL_CHECKPOINT, pTOperator3.getRecoveryCheckpoint());
        Iterator it2 = physicalPlan.getAllOperators().values().iterator();
        while (it2.hasNext()) {
            ((PTOperator) it2.next()).setState(PTOperator.State.ACTIVE);
        }
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("checkpoint " + pTOperator2, checkpoint, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator3, checkpoint, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator4, checkpoint2, pTOperator4.getRecoveryCheckpoint());
        Assert.assertNull("checkpoint null for stateless operator " + pTOperator4, pTOperator4.stats.checkpointStats);
        pTOperator3.checkpoints.add(checkpoint3);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("checkpoint " + pTOperator2, checkpoint, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("checkpoint " + pTOperator3, checkpoint3, pTOperator3.getRecoveryCheckpoint());
        pTOperator2.checkpoints.add(1, checkpoint3);
        Assert.assertEquals(pTOperator2.checkpoints, getCheckpoints(3L, 4L, 5L));
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, new StreamingContainerManager.UpdateCheckpointsContext(systemClock));
        Assert.assertEquals("checkpoint " + pTOperator2, checkpoint3, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals(pTOperator2.checkpoints, getCheckpoints(4L, 5L));
        streamingContainerManager.addCheckpoint(pTOperator3, new Checkpoint(2L, 0, 0));
        Assert.assertEquals("add first", getCheckpoints(2L, 4L), pTOperator3.checkpoints);
        streamingContainerManager.addCheckpoint(pTOperator3, new Checkpoint(3L, 0, 0));
        Assert.assertEquals("add middle", getCheckpoints(2L, 3L, 4L), pTOperator3.checkpoints);
        streamingContainerManager.addCheckpoint(pTOperator3, new Checkpoint(4L, 0, 0));
        Assert.assertEquals("ignore duplicate", getCheckpoints(2L, 3L, 4L), pTOperator3.checkpoints);
        streamingContainerManager.addCheckpoint(pTOperator3, new Checkpoint(5L, 0, 0));
        Assert.assertEquals("add latest", getCheckpoints(2L, 3L, 4L, 5L), pTOperator3.checkpoints);
    }

    @Test
    public void testUpdateCheckpointsRecovery() {
        MockClock mockClock = new MockClock();
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        this.dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 1);
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        StatelessOperator addOperator2 = this.dag.addOperator("o2SL", StatelessOperator.class);
        StatelessOperator addOperator3 = this.dag.addOperator("o3SL", StatelessOperator.class);
        GenericTestOperator addOperator4 = this.dag.addOperator("o4", GenericTestOperator.class);
        this.dag.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        this.dag.addStream("o2SL.outport1", addOperator2.outport1, addOperator3.inport1, addOperator4.inport1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag, mockClock);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        for (PTOperator pTOperator : physicalPlan.getAllOperators().values()) {
            Assert.assertEquals("activation windowId " + pTOperator, Checkpoint.INITIAL_CHECKPOINT, pTOperator.getRecoveryCheckpoint());
            Assert.assertEquals("checkpoints " + pTOperator, Collections.emptyList(), pTOperator.checkpoints);
        }
        PTOperator pTOperator2 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        PTOperator pTOperator3 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator2)).get(0);
        PTOperator pTOperator4 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator3)).get(0);
        PTOperator pTOperator5 = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator4)).get(0);
        Checkpoint checkpoint = new Checkpoint(2L, 0, 0);
        mockClock.time = 3L;
        pTOperator5.checkpoints.add(checkpoint);
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext = new StreamingContainerManager.UpdateCheckpointsContext(mockClock, true, Collections.emptyMap());
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator2, updateCheckpointsContext);
        Assert.assertEquals("initial checkpoint " + pTOperator2, Checkpoint.INITIAL_CHECKPOINT, pTOperator2.getRecoveryCheckpoint());
        Assert.assertEquals("initial checkpoint " + pTOperator3, checkpoint, pTOperator3.getRecoveryCheckpoint());
        Assert.assertEquals("initial checkpoint " + pTOperator4, new Checkpoint(mockClock.getTime(), 0, 0), pTOperator4.getRecoveryCheckpoint());
        Assert.assertEquals("number dependencies " + updateCheckpointsContext.visited, physicalPlan.getAllOperators().size(), updateCheckpointsContext.visited.size());
    }

    public List<Checkpoint> getCheckpoints(Long... lArr) {
        ArrayList arrayList = new ArrayList(lArr.length);
        for (Long l : lArr) {
            arrayList.add(new Checkpoint(l.longValue(), 0, 0));
        }
        return arrayList;
    }

    @Test
    public void testUpdateCheckpointsProcessingTimeout() {
        MockClock mockClock = new MockClock();
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        this.dag.addStream("o1.outport1", addOperator.outport1, addOperator2.inport1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        Iterator it = physicalPlan.getAllOperators().values().iterator();
        while (it.hasNext()) {
            ((PTOperator) it.next()).setState(PTOperator.State.ACTIVE);
        }
        List operators = physicalPlan.getOperators(this.dag.getMeta(addOperator));
        Assert.assertNotNull(operators);
        Assert.assertEquals(1L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        List operators2 = physicalPlan.getOperators(this.dag.getMeta(addOperator2));
        Assert.assertNotNull(operators2);
        Assert.assertEquals(1L, operators2.size());
        PTOperator pTOperator2 = (PTOperator) operators2.get(0);
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext);
        Assert.assertTrue("no blocked operators", updateCheckpointsContext.blocked.isEmpty());
        pTOperator.stats.statsRevs.checkout();
        pTOperator.stats.currentWindowId.set(1L);
        pTOperator.stats.lastWindowIdChangeTms = 1L;
        pTOperator.stats.statsRevs.commit();
        mockClock.time = pTOperator.stats.windowProcessingTimeoutMillis + 1;
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext2 = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext2);
        Assert.assertEquals("o2 blocked", Sets.newHashSet(new PTOperator[]{pTOperator2}), updateCheckpointsContext2.blocked);
        Checkpoint recoveryCheckpoint = pTOperator2.getRecoveryCheckpoint();
        pTOperator2.setRecoveryCheckpoint(new Checkpoint(pTOperator.getRecoveryCheckpoint().windowId + 1, recoveryCheckpoint.applicationWindowCount, recoveryCheckpoint.checkpointWindowCount));
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext3 = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext3);
        Assert.assertEquals("no operators blocked (o2 activation window ahead)", Sets.newHashSet(), updateCheckpointsContext3.blocked);
        pTOperator2.setRecoveryCheckpoint(recoveryCheckpoint);
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext4 = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext4);
        Assert.assertEquals("o2 blocked", Sets.newHashSet(new PTOperator[]{pTOperator2}), updateCheckpointsContext4.blocked);
        mockClock.time++;
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext5 = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext5);
        Assert.assertEquals("operators blocked", Sets.newHashSet(new PTOperator[]{pTOperator, pTOperator2}), updateCheckpointsContext5.blocked);
        pTOperator2.stats.statsRevs.checkout();
        pTOperator2.stats.currentWindowId.set(pTOperator.stats.getCurrentWindowId());
        pTOperator2.stats.statsRevs.commit();
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext6 = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext6);
        Assert.assertEquals("operators blocked", Sets.newHashSet(new PTOperator[]{pTOperator}), updateCheckpointsContext6.blocked);
        mockClock.time--;
        StreamingContainerManager.UpdateCheckpointsContext updateCheckpointsContext7 = new StreamingContainerManager.UpdateCheckpointsContext(mockClock);
        streamingContainerManager.updateRecoveryCheckpoints(pTOperator, updateCheckpointsContext7);
        Assert.assertEquals("operators blocked", Sets.newHashSet(), updateCheckpointsContext7.blocked);
    }

    @Test
    public void testBlockedOperatorContainerRestart() {
        MockClock mockClock = new MockClock();
        this.dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
        GenericTestOperator addOperator = this.dag.addOperator("o1", GenericTestOperator.class);
        this.dag.setAttribute(addOperator, OperatorContext.TIMEOUT_WINDOW_COUNT, 2);
        this.dag.addStream("o1.outport1", addOperator.outport1, this.dag.addOperator("o2", GenericTestOperator.class).inport1);
        StreamingContainerManager streamingContainerManager = new StreamingContainerManager(this.dag, false, mockClock);
        PhysicalPlan physicalPlan = streamingContainerManager.getPhysicalPlan();
        List<PTContainer> containers = physicalPlan.getContainers();
        Assert.assertEquals("Number of containers", 2L, containers.size());
        HashMap newHashMap = Maps.newHashMap();
        for (PTContainer pTContainer : containers) {
            newHashMap.put(pTContainer, new MockContainer(streamingContainerManager, pTContainer));
        }
        Iterator it = newHashMap.values().iterator();
        while (it.hasNext()) {
            ((MockContainer) it.next()).deploy();
        }
        PTOperator pTOperator = (PTOperator) physicalPlan.getOperators(this.dag.getMeta(addOperator)).get(0);
        MockContainer mockContainer = (MockContainer) newHashMap.get(pTOperator.getContainer());
        ((MockContainer) newHashMap.get(pTOperator.getContainer())).stats(pTOperator.getId()).currentWindowId(1L).deployState(StreamingContainerUmbilicalProtocol.OperatorHeartbeat.DeployState.ACTIVE);
        mockClock.time = 10L;
        mockContainer.sendHeartbeat();
        Assert.assertEquals(mockClock.time, pTOperator.stats.lastWindowIdChangeTms);
        Assert.assertEquals(1L, pTOperator.stats.currentWindowId.get());
        Assert.assertEquals(PTOperator.State.ACTIVE, pTOperator.getState());
        int intValue = ((Integer) this.dag.getMeta(addOperator).getValue(OperatorContext.TIMEOUT_WINDOW_COUNT)).intValue() * ((Integer) this.dag.getValue(DAG.STREAMING_WINDOW_SIZE_MILLIS)).intValue();
        Assert.assertEquals("processing timeout", intValue, pTOperator.stats.windowProcessingTimeoutMillis);
        mockClock.time += intValue;
        mockContainer.sendHeartbeat();
        Assert.assertEquals(PTOperator.State.ACTIVE, pTOperator.getState());
        Assert.assertEquals(10L, pTOperator.stats.lastWindowIdChangeTms);
        streamingContainerManager.monitorHeartbeat();
        Assert.assertTrue(streamingContainerManager.containerStopRequests.isEmpty());
        mockClock.time++;
        mockContainer.sendHeartbeat();
        Assert.assertEquals(PTOperator.State.ACTIVE, pTOperator.getState());
        streamingContainerManager.monitorHeartbeat();
        Assert.assertTrue(streamingContainerManager.containerStopRequests.containsKey(pTOperator.getContainer().getExternalId()));
    }

    @Test
    public void testBeforeCheckpointNotification() throws IOException, ClassNotFoundException {
        FSStorageAgent fSStorageAgent = new FSStorageAgent(this.testMeta.getPath(), (Configuration) null);
        this.dag.setAttribute(OperatorContext.STORAGE_AGENT, fSStorageAgent);
        this.dag.setAttribute(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 1);
        this.dag.setAttribute(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 50);
        MockInputOperator addOperator = this.dag.addOperator("o1", new MockInputOperator());
        GenericTestOperator addOperator2 = this.dag.addOperator("o2", GenericTestOperator.class);
        this.dag.setAttribute(addOperator2, OperatorContext.STATELESS, true);
        this.dag.addStream("o1.outport", addOperator.outport, addOperator2.inport1);
        StramLocalCluster stramLocalCluster = new StramLocalCluster(this.dag);
        stramLocalCluster.setHeartbeatMonitoringEnabled(false);
        stramLocalCluster.run();
        List operators = stramLocalCluster.dnmgr.getPhysicalPlan().getOperators(this.dag.getMeta(addOperator));
        Assert.assertEquals("Number partitions", 1L, operators.size());
        PTOperator pTOperator = (PTOperator) operators.get(0);
        long[] windowIds = fSStorageAgent.getWindowIds(pTOperator.getId());
        Arrays.sort(windowIds);
        int i = 0;
        for (long j : windowIds) {
            Assert.assertEquals("Checkpointed state class", MockInputOperator.class, fSStorageAgent.load(pTOperator.getId(), j).getClass());
            int i2 = i;
            i++;
            Assert.assertEquals("Checkpoint state", i2, ((MockInputOperator) r0).checkpointState);
        }
    }
}
