package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.components.state.StateMap;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestMonitorActivity.class */
public class TestMonitorActivity {

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestMonitorActivity$TestableProcessor.class */
    private class TestableProcessor extends MonitorActivity {
        private final long timestampDifference;

        public TestableProcessor(long j) {
            this.timestampDifference = j;
        }

        public void resetLastSuccessfulTransfer() {
            setLastSuccessfulTransfer(System.currentTimeMillis() - this.timestampDifference);
        }
    }

    @Test
    public void testFirstMessage() throws InterruptedException {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(1000L));
        newTestRunner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        newTestRunner.enqueue(new byte[0]);
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        newTestRunner.clearTransferState();
        Thread.sleep(1000L);
        runNext(newTestRunner);
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        newTestRunner.clearTransferState();
        for (int i = 0; i < 10; i++) {
            runNext(newTestRunner);
            newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
            newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
            newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
            Thread.sleep(100L);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("key", "value");
        hashMap.put("key1", "value1");
        newTestRunner.enqueue(new byte[0], hashMap);
        runNext(newTestRunner);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        Assert.assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", new String(mockFlowFile.toByteArray())));
        mockFlowFile.assertAttributeNotExists("key");
        mockFlowFile.assertAttributeNotExists("key1");
        newTestRunner.clearTransferState();
        newTestRunner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        Thread.sleep(200L);
        for (int i2 = 0; i2 < 10; i2++) {
            runNext(newTestRunner);
            Thread.sleep(200L);
        }
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 10);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        newTestRunner.clearTransferState();
        newTestRunner.enqueue(new byte[0], hashMap);
        runNext(newTestRunner);
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile2 = (MockFlowFile) newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        Assert.assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", new String(mockFlowFile2.toByteArray())));
        mockFlowFile2.assertAttributeNotExists("key");
        mockFlowFile2.assertAttributeNotExists("key1");
    }

    private void runNext(TestRunner testRunner) {
        testRunner.run(1, false, false);
    }

    @Test
    public void testFirstMessageWithInherit() throws InterruptedException, IOException {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(1000L));
        newTestRunner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "100 millis");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.enqueue(new byte[0]);
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS).get(0);
        newTestRunner.clearTransferState();
        Thread.sleep(1000L);
        runNext(newTestRunner);
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE, 1);
        newTestRunner.clearTransferState();
        for (int i = 0; i < 10; i++) {
            runNext(newTestRunner);
            newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
            newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
            newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
            Thread.sleep(100L);
        }
        HashMap hashMap = new HashMap();
        hashMap.put("key", "value");
        hashMap.put("key1", "value1");
        newTestRunner.enqueue(new byte[0], hashMap);
        runNext(newTestRunner);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        MockFlowFile mockFlowFile2 = (MockFlowFile) newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        Assert.assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", new String(mockFlowFile2.toByteArray())));
        mockFlowFile2.assertAttributeEquals("key", "value");
        mockFlowFile2.assertAttributeEquals("key1", "value1");
        mockFlowFile2.assertAttributeNotEquals(CoreAttributes.UUID.key(), mockFlowFile.getAttribute(CoreAttributes.UUID.key()));
        mockFlowFile2.assertAttributeNotEquals(CoreAttributes.FILENAME.key(), mockFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
        Assert.assertTrue(String.format("file sizes match when they shouldn't original=%1$s restored=%2$s", Long.valueOf(mockFlowFile.getSize()), Long.valueOf(mockFlowFile2.getSize())), mockFlowFile2.getSize() != mockFlowFile.getSize());
        Assert.assertTrue(String.format("lineage start dates match when they shouldn't original=%1$s restored=%2$s", Long.valueOf(mockFlowFile.getLineageStartDate()), Long.valueOf(mockFlowFile2.getLineageStartDate())), mockFlowFile2.getLineageStartDate() != mockFlowFile.getLineageStartDate());
        newTestRunner.clearTransferState();
        newTestRunner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "true");
        Thread.sleep(200L);
        for (int i2 = 0; i2 < 10; i2++) {
            runNext(newTestRunner);
            Thread.sleep(200L);
        }
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 10);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        newTestRunner.clearTransferState();
        newTestRunner.enqueue(new byte[0], hashMap);
        runNext(newTestRunner);
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 1);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile3 = (MockFlowFile) newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED).get(0);
        Assert.assertTrue(Pattern.matches("Activity restored at time: (.*) after being inactive for 0 minutes", new String(mockFlowFile3.toByteArray())));
        mockFlowFile3.assertAttributeEquals("key", "value");
        mockFlowFile3.assertAttributeEquals("key1", "value1");
        mockFlowFile3.assertAttributeNotEquals(CoreAttributes.UUID.key(), mockFlowFile.getAttribute(CoreAttributes.UUID.key()));
        mockFlowFile3.assertAttributeNotEquals(CoreAttributes.FILENAME.key(), mockFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
        Assert.assertTrue(String.format("file sizes match when they shouldn't original=%1$s restored=%2$s", Long.valueOf(mockFlowFile.getSize()), Long.valueOf(mockFlowFile3.getSize())), mockFlowFile3.getSize() != mockFlowFile.getSize());
        Assert.assertTrue(String.format("lineage start dates match when they shouldn't original=%1$s restored=%2$s", Long.valueOf(mockFlowFile.getLineageStartDate()), Long.valueOf(mockFlowFile3.getLineageStartDate())), mockFlowFile3.getLineageStartDate() != mockFlowFile.getLineageStartDate());
    }

    @Test(timeout = 5000)
    public void testFirstRunNoMessages() throws InterruptedException, IOException {
        boolean z;
        TestRunner newTestRunner = TestRunners.newTestRunner(new MonitorActivity());
        newTestRunner.setProperty(MonitorActivity.CONTINUALLY_SEND_MESSAGES, "false");
        int i = 100;
        do {
            z = false;
            newTestRunner.setProperty(MonitorActivity.THRESHOLD, i + " millis");
            Thread.sleep(1000L);
            newTestRunner.run();
            newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
            if (newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE).size() == 1) {
                i += i;
                z = true;
            } else {
                newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
            }
            newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
            newTestRunner.clearTransferState();
        } while (z);
    }

    @Test
    public void testClusterMonitorInvalidReportingNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_NODE);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.assertNotValid();
    }

    @Test
    public void testClusterMonitorActive() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        newTestRunner.enqueue("Incoming data");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertNotNull("Latest timestamp should be persisted", state.get("MonitorActivity.latestSuccessTransfer"));
        Assert.assertNull(state.get("key1"));
        Assert.assertNull(state.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveFallbackToNodeScope() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(false);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        newTestRunner.enqueue("Incoming data");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        Assert.assertNull("Latest timestamp should NOT be persisted, because it's running as 'node' scope", newTestRunner.getStateManager().getState(Scope.CLUSTER).get("MonitorActivity.latestSuccessTransfer"));
    }

    @Test
    public void testClusterMonitorActiveWithLatestTimestamp() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        newTestRunner.enqueue("Incoming data");
        HashMap hashMap = new HashMap();
        long currentTimeMillis = System.currentTimeMillis() - 1000;
        hashMap.put("MonitorActivity.latestSuccessTransfer", String.valueOf(currentTimeMillis));
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.getStateManager().setState(hashMap, Scope.CLUSTER);
        newTestRunner.getStateManager().replace(newTestRunner.getStateManager().getState(Scope.CLUSTER), hashMap, Scope.CLUSTER);
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertTrue("Existing timestamp should be updated", currentTimeMillis < Long.parseLong(state.get("MonitorActivity.latestSuccessTransfer")));
        Assert.assertNull(state.get("key1"));
        Assert.assertNull(state.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveMoreRecentTimestampExisted() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        newTestRunner.enqueue("Incoming data");
        HashMap hashMap = new HashMap();
        long currentTimeMillis = System.currentTimeMillis() + 10000;
        hashMap.put("MonitorActivity.latestSuccessTransfer", String.valueOf(currentTimeMillis));
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.getStateManager().setState(hashMap, Scope.CLUSTER);
        newTestRunner.getStateManager().replace(newTestRunner.getStateManager().getState(Scope.CLUSTER), hashMap, Scope.CLUSTER);
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertEquals("Existing timestamp should NOT be updated", String.valueOf(currentTimeMillis), state.get("MonitorActivity.latestSuccessTransfer"));
        Assert.assertEquals(state.get("key1"), hashMap.get("key1"));
        Assert.assertEquals(state.get("key2"), hashMap.get("key2"));
    }

    @Test
    public void testClusterMonitorActiveCopyAttribute() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "1 ms");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.enqueue("Incoming data", hashMap);
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS);
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertNotNull("Latest timestamp should be persisted", state.get("MonitorActivity.latestSuccessTransfer"));
        Assert.assertEquals("value1", state.get("key1"));
        Assert.assertEquals("value2", state.get("key2"));
    }

    @Test
    public void testClusterMonitorInactivity() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assert.assertNotNull(mockFlowFile.getAttribute("inactivityStartMillis"));
        Assert.assertNotNull(mockFlowFile.getAttribute("inactivityDurationMillis"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityFallbackToNodeScope() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(false);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assert.assertNotNull(mockFlowFile.getAttribute("inactivityStartMillis"));
        Assert.assertNotNull(mockFlowFile.getAttribute("inactivityDurationMillis"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityOnPrimaryNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(true);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_INACTIVE);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        MockFlowFile mockFlowFile = (MockFlowFile) flowFilesForRelationship.get(0);
        Assert.assertNotNull(mockFlowFile.getAttribute("inactivityStartMillis"));
        Assert.assertNotNull(mockFlowFile.getAttribute("inactivityDurationMillis"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorInactivityOnNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelf() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.enqueue("Incoming data", hashMap);
        runNext(newTestRunner);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List flowFilesForRelationship2 = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        Assert.assertEquals(1L, flowFilesForRelationship2.size());
        Assert.assertEquals("value1", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key1"));
        Assert.assertEquals("value2", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key2"));
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertNotNull("Latest timestamp should be persisted", state.get("MonitorActivity.latestSuccessTransfer"));
        Assert.assertEquals("value1", state.get("key1"));
        Assert.assertEquals("value2", state.get("key2"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.enqueue("Incoming data", hashMap);
        runNext(newTestRunner);
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_SUCCESS, 1);
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertNotNull("Latest timestamp should be persisted", state.get("MonitorActivity.latestSuccessTransfer"));
        Assert.assertEquals("value1", state.get("key1"));
        Assert.assertEquals("value2", state.get("key2"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(true);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.enqueue("Incoming data", hashMap);
        runNext(newTestRunner);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List flowFilesForRelationship2 = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        Assert.assertEquals(1L, flowFilesForRelationship2.size());
        Assert.assertEquals("value1", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key1"));
        Assert.assertEquals("value2", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key2"));
        StateMap state = newTestRunner.getStateManager().getState(Scope.CLUSTER);
        Assert.assertNotNull("Latest timestamp should be persisted", state.get("MonitorActivity.latestSuccessTransfer"));
        Assert.assertEquals("value1", state.get("key1"));
        Assert.assertEquals("value2", state.get("key2"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredBySelfOnPrimaryNodeFallbackToNodeScope() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(false);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.enqueue("Incoming data", hashMap);
        runNext(newTestRunner);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List flowFilesForRelationship2 = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assert.assertEquals(1L, flowFilesForRelationship.size());
        Assert.assertEquals(1L, flowFilesForRelationship2.size());
        Assert.assertEquals("value1", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key1"));
        Assert.assertEquals("value2", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key2"));
        Assert.assertNull("Latest timestamp should NOT be persisted", newTestRunner.getStateManager().getState(Scope.CLUSTER).get("MonitorActivity.latestSuccessTransfer"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("MonitorActivity.latestSuccessTransfer", String.valueOf(System.currentTimeMillis()));
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.getStateManager().setState(hashMap, Scope.CLUSTER);
        newTestRunner.getStateManager().replace(newTestRunner.getStateManager().getState(Scope.CLUSTER), hashMap, Scope.CLUSTER);
        runNext(newTestRunner);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List flowFilesForRelationship2 = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assert.assertEquals("Should be zero since it doesn't have incoming file.", 0L, flowFilesForRelationship.size());
        Assert.assertEquals(1L, flowFilesForRelationship2.size());
        Assert.assertEquals("value1", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key1"));
        Assert.assertEquals("value2", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key2"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNodeOnPrimary() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(true);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "1 hour");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertAllFlowFilesTransferred(MonitorActivity.REL_INACTIVE);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("MonitorActivity.latestSuccessTransfer", String.valueOf(System.currentTimeMillis()));
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.getStateManager().setState(hashMap, Scope.CLUSTER);
        newTestRunner.getStateManager().replace(newTestRunner.getStateManager().getState(Scope.CLUSTER), hashMap, Scope.CLUSTER);
        runNext(newTestRunner);
        List flowFilesForRelationship = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_SUCCESS);
        List flowFilesForRelationship2 = newTestRunner.getFlowFilesForRelationship(MonitorActivity.REL_ACTIVITY_RESTORED);
        Assert.assertEquals("Should be zero since it doesn't have incoming file.", 0L, flowFilesForRelationship.size());
        Assert.assertEquals(1L, flowFilesForRelationship2.size());
        Assert.assertEquals("value1", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key1"));
        Assert.assertEquals("value2", ((MockFlowFile) flowFilesForRelationship2.get(0)).getAttribute("key2"));
        newTestRunner.clearTransferState();
    }

    @Test
    public void testClusterMonitorActivityRestoredByOtherNodeOnNode() throws Exception {
        TestRunner newTestRunner = TestRunners.newTestRunner(new TestableProcessor(TimeUnit.MINUTES.toMillis(120L)));
        newTestRunner.setClustered(true);
        newTestRunner.setPrimaryNode(false);
        newTestRunner.setProperty(MonitorActivity.MONITORING_SCOPE, MonitorActivity.SCOPE_CLUSTER);
        newTestRunner.setProperty(MonitorActivity.REPORTING_NODE, MonitorActivity.REPORT_NODE_PRIMARY);
        newTestRunner.setProperty(MonitorActivity.THRESHOLD, "3 mins");
        newTestRunner.setProperty(MonitorActivity.COPY_ATTRIBUTES, "true");
        newTestRunner.run();
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        newTestRunner.clearTransferState();
        HashMap hashMap = new HashMap();
        hashMap.put("MonitorActivity.latestSuccessTransfer", String.valueOf(System.currentTimeMillis()));
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        newTestRunner.getStateManager().setState(hashMap, Scope.CLUSTER);
        newTestRunner.getStateManager().replace(newTestRunner.getStateManager().getState(Scope.CLUSTER), hashMap, Scope.CLUSTER);
        runNext(newTestRunner);
        newTestRunner.assertTransferCount(MonitorActivity.REL_SUCCESS, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_INACTIVE, 0);
        newTestRunner.assertTransferCount(MonitorActivity.REL_ACTIVITY_RESTORED, 0);
        newTestRunner.clearTransferState();
    }
}
