package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiConsumer;
import java.util.stream.IntStream;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processors.standard.TestNotify;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestWait.class */
public class TestWait {
    private TestRunner runner;
    private TestNotify.MockCacheClient service;

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestWait$TestIteration.class */
    private class TestIteration {
        final List<MockFlowFile> released;
        final List<MockFlowFile> waiting;
        final List<MockFlowFile> failed;
        final List<String> expectedReleased;
        final List<String> expectedWaiting;
        final List<String> expectedFailed;

        private TestIteration() {
            this.released = new ArrayList();
            this.waiting = new ArrayList();
            this.failed = new ArrayList();
            this.expectedReleased = new ArrayList();
            this.expectedWaiting = new ArrayList();
            this.expectedFailed = new ArrayList();
        }

        void run() {
            this.released.clear();
            this.waiting.clear();
            this.failed.clear();
            TestWait.this.runner.run();
            this.released.addAll(TestWait.this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS));
            this.waiting.addAll(TestWait.this.runner.getFlowFilesForRelationship(Wait.REL_WAIT));
            this.failed.addAll(TestWait.this.runner.getFlowFilesForRelationship(Wait.REL_FAILURE));
            Assert.assertEquals(this.expectedReleased.size(), this.released.size());
            Assert.assertEquals(this.expectedWaiting.size(), this.waiting.size());
            Assert.assertEquals(this.expectedFailed.size(), this.failed.size());
            BiConsumer biConsumer = (list, list2) -> {
                for (int i = 0; i < list.size(); i++) {
                    ((MockFlowFile) list2.get(i)).assertContentEquals((String) list.get(i));
                }
            };
            biConsumer.accept(this.expectedReleased, this.released);
            biConsumer.accept(this.expectedWaiting, this.waiting);
            biConsumer.accept(this.expectedFailed, this.failed);
            TestWait.this.runner.clearTransferState();
            this.expectedReleased.clear();
            this.expectedWaiting.clear();
            this.expectedFailed.clear();
        }
    }

    @Before
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner(Wait.class);
        this.service = new TestNotify.MockCacheClient();
        this.runner.addControllerService("service", this.service);
        this.runner.enableControllerService(this.service);
        this.runner.setProperty(Wait.DISTRIBUTED_CACHE_SERVICE, "service");
    }

    @Test
    public void testWait() throws InitializationException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0)).assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
    }

    @Test
    public void testWaitKeepInUpstreamConnection() throws InitializationException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.WAIT_MODE, Wait.WAIT_MODE_KEEP_IN_UPSTREAM);
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertQueueNotEmpty();
        this.runner.clearTransferState();
    }

    @Test
    public void testExpired() throws InitializationException, InterruptedException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile.assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile});
        Thread.sleep(101L);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0)).assertAttributeNotExists("wait.start.timestamp");
        this.runner.clearTransferState();
    }

    @Test
    public void testCounterExpired() throws InitializationException, InterruptedException, IOException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "5");
        this.runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "notification-id");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile.assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile});
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("signal-attr-1", "signal-attr-1-value");
        hashMap2.put("signal-attr-2", "signal-attr-2-value");
        waitNotifyProtocol.notify("notification-id", "counter-A", 1, hashMap2);
        waitNotifyProtocol.notify("notification-id", "counter-B", 2, hashMap2);
        Thread.sleep(101L);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_EXPIRED, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_EXPIRED).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        mockFlowFile.assertAttributeEquals("wait.counter.total", "3");
        mockFlowFile.assertAttributeEquals("wait.counter.counter-A", "1");
        mockFlowFile.assertAttributeEquals("wait.counter.counter-B", "2");
        mockFlowFile.assertAttributeEquals("signal-attr-1", "signal-attr-1-value");
        mockFlowFile.assertAttributeEquals("signal-attr-2", "signal-attr-2-value");
        this.runner.clearTransferState();
    }

    @Test
    public void testBadWaitStartTimestamp() throws InitializationException, InterruptedException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.EXPIRATION_DURATION, "100 ms");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        hashMap.put("wait.start.timestamp", "blue bunny");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0)).assertAttributeNotExists("wait.start.timestamp");
        this.runner.clearTransferState();
    }

    @Test
    public void testEmptyReleaseSignal() throws InitializationException, InterruptedException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.enqueue(new byte[0], new HashMap());
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_FAILURE, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_FAILURE).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        mockFlowFile.assertAttributeNotExists("wait.counter.total");
        this.runner.clearTransferState();
    }

    @Test
    public void testFailingCacheService() throws InitializationException, IOException {
        this.service.setFailOnCalls(true);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "2");
        this.runner.enqueue(new byte[0], hashMap);
        try {
            this.runner.run();
            Assert.fail("Expect the processor to receive an IO exception from the cache service and throws ProcessException.");
        } catch (AssertionError e) {
            Assert.assertTrue(e.getCause() instanceof ProcessException);
            Assert.assertTrue(e.getCause().getCause() instanceof IOException);
        } finally {
            this.service.setFailOnCalls(false);
        }
    }

    @Test
    public void testWaitPenaltyDuration() throws InitializationException {
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.WAIT_PENALTY_DURATION, "1 hour");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run(1, false);
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0)).assertAttributeExists("wait.start.timestamp");
        this.runner.clearTransferState();
        Map signalIdPenalties = this.runner.getProcessor().getSignalIdPenalties();
        Assert.assertEquals(1L, signalIdPenalties.size());
        Assert.assertTrue(signalIdPenalties.containsKey("1"));
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run(1, false);
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 0);
        this.runner.clearTransferState();
    }

    @Test
    public void testReplaceAttributes() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("both", "notifyValue");
        hashMap.put("uuid", "notifyUuid");
        hashMap.put("notify.only", "notifyValue");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        waitNotifyProtocol.notify("key", "default", 1, hashMap);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_REPLACE.getValue());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("wait.only", "waitValue");
        hashMap2.put("both", "waitValue");
        this.runner.enqueue("content".getBytes("UTF-8"), hashMap2);
        Assert.assertNotNull(waitNotifyProtocol.getSignal("key"));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        this.runner.assertTransferCount(Wait.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assert.assertEquals("notifyValue", mockFlowFile.getAttribute("notify.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("wait.only"));
        Assert.assertEquals("notifyValue", mockFlowFile.getAttribute("both"));
        this.runner.clearTransferState();
        Assert.assertNull(waitNotifyProtocol.getSignal("key"));
    }

    @Test
    public void testKeepOriginalAttributes() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("both", "notifyValue");
        hashMap.put("uuid", "notifyUuid");
        hashMap.put("notify.only", "notifyValue");
        new WaitNotifyProtocol(this.service).notify("key", "default", 1, hashMap);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.ATTRIBUTE_COPY_MODE, Wait.ATTRIBUTE_COPY_KEEP_ORIGINAL.getValue());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("wait.only", "waitValue");
        hashMap2.put("both", "waitValue");
        this.runner.enqueue("content".getBytes("UTF-8"), hashMap2);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        this.runner.assertTransferCount(Wait.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assert.assertEquals("notifyValue", mockFlowFile.getAttribute("notify.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("wait.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("both"));
        this.runner.clearTransferState();
    }

    @Test
    public void testWaitForTotalCount() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("both", "notifyValue");
        hashMap.put("uuid", "notifyUuid");
        hashMap.put("notify.only", "notifyValue");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        waitNotifyProtocol.notify("key", "counter-A", 1, hashMap);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("targetSignalCount", "3");
        hashMap2.put("wait.only", "waitValue");
        hashMap2.put("both", "waitValue");
        this.runner.enqueue("content".getBytes("UTF-8"), hashMap2);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile.assertAttributeExists("wait.start.timestamp");
        String attribute = flowFile.getAttribute("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile});
        waitNotifyProtocol.notify("key", "counter-B", 1, hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile2 = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile2.assertAttributeExists("wait.start.timestamp");
        flowFile2.assertAttributeEquals("wait.start.timestamp", attribute);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile2});
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile3 = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile3.assertAttributeExists("wait.start.timestamp");
        flowFile3.assertAttributeEquals("wait.start.timestamp", attribute);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile3});
        waitNotifyProtocol.notify("key", "counter-C", 1, hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assert.assertEquals("notifyValue", mockFlowFile.getAttribute("notify.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("wait.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("both"));
        this.runner.clearTransferState();
        Assert.assertNull("The key no longer exist", waitNotifyProtocol.getSignal("key"));
    }

    @Test
    public void testWaitForSpecificCount() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("both", "notifyValue");
        hashMap.put("uuid", "notifyUuid");
        hashMap.put("notify.only", "notifyValue");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        waitNotifyProtocol.notify("key", "counter-A", 1, hashMap);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("targetSignalCount", "2");
        hashMap2.put("signalCounterName", "counter-B");
        hashMap2.put("wait.only", "waitValue");
        hashMap2.put("both", "waitValue");
        this.runner.enqueue("content".getBytes("UTF-8"), hashMap2);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile.assertAttributeExists("wait.start.timestamp");
        String attribute = flowFile.getAttribute("wait.start.timestamp");
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile});
        waitNotifyProtocol.notify("key", "counter-B", 1, hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile2 = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile2.assertAttributeExists("wait.start.timestamp");
        flowFile2.assertAttributeEquals("wait.start.timestamp", attribute);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile2});
        waitNotifyProtocol.notify("key", "counter-C", 1, hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_WAIT, 1);
        FlowFile flowFile3 = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_WAIT).get(0);
        flowFile3.assertAttributeExists("wait.start.timestamp");
        flowFile3.assertAttributeEquals("wait.start.timestamp", attribute);
        this.runner.clearTransferState();
        this.runner.enqueue(new FlowFile[]{flowFile3});
        waitNotifyProtocol.notify("key", "counter-B", 1, hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        Assert.assertEquals("notifyValue", mockFlowFile.getAttribute("notify.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("wait.only"));
        Assert.assertEquals("waitValue", mockFlowFile.getAttribute("both"));
        mockFlowFile.assertAttributeEquals("wait.counter.total", "4");
        mockFlowFile.assertAttributeEquals("wait.counter.counter-A", "1");
        mockFlowFile.assertAttributeEquals("wait.counter.counter-B", "2");
        mockFlowFile.assertAttributeEquals("wait.counter.counter-C", "1");
        this.runner.clearTransferState();
    }

    @Test
    public void testDecrementCache() throws ConcurrentModificationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("both", "notifyValue");
        hashMap.put("uuid", "notifyUuid");
        hashMap.put("notify.only", "notifyValue");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        waitNotifyProtocol.notify("key", "counter", 1, hashMap);
        waitNotifyProtocol.notify("key", "counter", 1, hashMap);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1");
        this.runner.assertValid();
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("signalCounterName", "counter");
        hashMap2.put("wait.only", "waitValue");
        hashMap2.put("both", "waitValue");
        hashMap2.put("uuid", UUID.randomUUID().toString());
        this.runner.enqueue("content".getBytes("UTF-8"), hashMap2);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        mockFlowFile.assertAttributeNotExists("wait.start.timestamp");
        mockFlowFile.assertAttributeEquals("wait.counter.counter", "2");
        Assert.assertEquals("0", Long.toString(waitNotifyProtocol.getSignal("key").getCount("counter")));
        Assert.assertEquals("1", Long.toString(waitNotifyProtocol.getSignal("key").getReleasableCount()));
        this.runner.enqueue("content".getBytes("UTF-8"), hashMap2);
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
        MockFlowFile mockFlowFile2 = (MockFlowFile) this.runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
        mockFlowFile2.assertAttributeNotExists("wait.start.timestamp");
        mockFlowFile2.assertAttributeEquals("wait.counter.counter", "0");
        Assert.assertNull("The key no longer exist", waitNotifyProtocol.getSignal("key"));
        this.runner.clearTransferState();
    }

    @Test
    public void testWaitBufferCount() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("notified", "notified-value");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key-A");
        hashMap2.put("targetSignalCount", "1");
        hashMap2.put("signalCounterName", "counter");
        HashMap hashMap3 = new HashMap();
        hashMap3.put("releaseSignalAttribute", "key-B");
        hashMap3.put("targetSignalCount", "3");
        hashMap3.put("signalCounterName", "counter");
        HashMap hashMap4 = new HashMap();
        hashMap4.putAll(hashMap3);
        hashMap4.remove("releaseSignalAttribute");
        TestIteration testIteration = new TestIteration();
        this.runner.enqueue("1".getBytes(), hashMap3);
        this.runner.enqueue("2".getBytes(), hashMap2);
        this.runner.enqueue("3".getBytes(), hashMap4);
        this.runner.enqueue("4".getBytes(), hashMap2);
        this.runner.enqueue("5".getBytes(), hashMap3);
        this.runner.enqueue("6".getBytes(), hashMap3);
        testIteration.expectedWaiting.addAll(Arrays.asList("1", "5"));
        testIteration.expectedFailed.add("3");
        testIteration.run();
        waitNotifyProtocol.notify("key-B", "counter", 3, hashMap);
        testIteration.expectedReleased.add("6");
        testIteration.expectedWaiting.add("1");
        testIteration.waiting.forEach(mockFlowFile -> {
            this.runner.enqueue(new FlowFile[]{mockFlowFile});
        });
        testIteration.run();
    }

    @Test
    public void testReleaseMultipleFlowFiles() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("notified", "notified-value");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
        this.runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "${fragmentCount}");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("targetSignalCount", "3");
        hashMap2.put("signalCounterName", "counter");
        hashMap2.put("fragmentCount", "6");
        TestIteration testIteration = new TestIteration();
        IntStream.range(1, 7).forEach(i -> {
            this.runner.enqueue(String.valueOf(i).getBytes(), hashMap2);
        });
        testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
        testIteration.run();
        Assert.assertNull(waitNotifyProtocol.getSignal("key"));
        waitNotifyProtocol.notify("key", "counter", 3, hashMap);
        testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
        testIteration.waiting.forEach(mockFlowFile -> {
            this.runner.enqueue(new FlowFile[]{mockFlowFile});
        });
        testIteration.run();
        Assert.assertEquals(0L, waitNotifyProtocol.getSignal("key").getCount("count"));
        Assert.assertEquals(4L, r0.getReleasableCount());
        testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
        testIteration.waiting.forEach(mockFlowFile2 -> {
            this.runner.enqueue(new FlowFile[]{mockFlowFile2});
        });
        testIteration.run();
        Assert.assertEquals(0L, waitNotifyProtocol.getSignal("key").getCount("count"));
        Assert.assertEquals(2L, r0.getReleasableCount());
    }

    @Test
    public void testOpenGate() throws InitializationException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put("notified", "notified-value");
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.service);
        this.runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "${targetSignalCount}");
        this.runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
        this.runner.setProperty(Wait.WAIT_BUFFER_COUNT, "2");
        this.runner.setProperty(Wait.RELEASABLE_FLOWFILE_COUNT, "0");
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "key");
        hashMap2.put("targetSignalCount", "3");
        hashMap2.put("signalCounterName", "counter");
        TestIteration testIteration = new TestIteration();
        IntStream.range(1, 7).forEach(i -> {
            this.runner.enqueue(String.valueOf(i).getBytes(), hashMap2);
        });
        testIteration.expectedWaiting.addAll(Arrays.asList("1", "2"));
        testIteration.run();
        Assert.assertNull(waitNotifyProtocol.getSignal("key"));
        waitNotifyProtocol.notify("key", "counter", 3, hashMap);
        testIteration.expectedReleased.addAll(Arrays.asList("3", "4"));
        testIteration.waiting.forEach(mockFlowFile -> {
            this.runner.enqueue(new FlowFile[]{mockFlowFile});
        });
        testIteration.run();
        Assert.assertEquals(3L, waitNotifyProtocol.getSignal("key").getCount("counter"));
        Assert.assertEquals(0L, r0.getReleasableCount());
        testIteration.expectedReleased.addAll(Arrays.asList("5", "6"));
        testIteration.waiting.forEach(mockFlowFile2 -> {
            this.runner.enqueue(new FlowFile[]{mockFlowFile2});
        });
        testIteration.run();
        Assert.assertEquals(3L, waitNotifyProtocol.getSignal("key").getCount("counter"));
        Assert.assertEquals(0L, r0.getReleasableCount());
    }
}
