package org.apache.nifi.processors.standard;

import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.distributed.cache.client.exception.DeserializationException;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;
import org.apache.nifi.processors.standard.util.FlowFileAttributesSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.stubbing.Answer;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestWaitNotifyProtocol.class */
public class TestWaitNotifyProtocol {
    private AtomicDistributedMapCacheClient<Long> cache;
    private final Map<String, AtomicCacheEntry<String, String, Long>> cacheEntries = new HashMap();
    private final ObjectMapper mapper = new ObjectMapper();
    private final Answer<?> successfulReplace = invocationOnMock -> {
        AtomicCacheEntry atomicCacheEntry = (AtomicCacheEntry) invocationOnMock.getArgument(0);
        this.cacheEntries.put(atomicCacheEntry.getKey(), new AtomicCacheEntry(atomicCacheEntry.getKey(), atomicCacheEntry.getValue(), Long.valueOf(((Long) atomicCacheEntry.getRevision().orElse(0L)).longValue() + 1)));
        return true;
    };

    @BeforeEach
    public void before() throws Exception {
        this.cacheEntries.clear();
        this.cache = (AtomicDistributedMapCacheClient) Mockito.mock(AtomicDistributedMapCacheClient.class);
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(invocationOnMock -> {
            return this.cacheEntries.get(invocationOnMock.getArguments()[0]);
        }).when(this.cache)).fetch(ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Deserializer) ArgumentMatchers.any());
    }

    @Test
    public void testNotifyRetryFailure() throws Exception {
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(invocationOnMock -> {
            return false;
        }).when(this.cache)).replace((AtomicCacheEntry) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any());
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.cache);
        Assertions.assertThrows(ConcurrentModificationException.class, () -> {
            waitNotifyProtocol.notify("signal-id", "a", 1, (Map) null);
        });
    }

    @Test
    public void testNotifyFirst() throws Exception {
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any());
        WaitNotifyProtocol.Signal notify = new WaitNotifyProtocol(this.cache).notify("signal-id", "a", 1, (Map) null);
        Assertions.assertNotNull(notify);
        Assertions.assertEquals(1L, (Long) notify.getCounts().get("a"));
        Assertions.assertTrue(this.cacheEntries.containsKey("signal-id"));
        AtomicCacheEntry<String, String, Long> atomicCacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(1L, ((Long) atomicCacheEntry.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":1},\"attributes\":{},\"releasableCount\":0}", (String) atomicCacheEntry.getValue());
    }

    @Test
    public void testNotifyCounters() throws Exception {
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any());
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.cache);
        waitNotifyProtocol.notify("signal-id", "a", 1, (Map) null);
        waitNotifyProtocol.notify("signal-id", "a", 1, (Map) null);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(2L, ((Long) atomicCacheEntry.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":2},\"attributes\":{},\"releasableCount\":0}", (String) atomicCacheEntry.getValue());
        waitNotifyProtocol.notify("signal-id", "a", 10, (Map) null);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry2 = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(3L, ((Long) atomicCacheEntry2.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":12},\"attributes\":{},\"releasableCount\":0}", (String) atomicCacheEntry2.getValue());
        waitNotifyProtocol.notify("signal-id", "b", 2, (Map) null);
        waitNotifyProtocol.notify("signal-id", "c", 3, (Map) null);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry3 = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(5L, ((Long) atomicCacheEntry3.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":12,\"b\":2,\"c\":3},\"attributes\":{},\"releasableCount\":0}", (String) atomicCacheEntry3.getValue());
        HashMap hashMap = new HashMap();
        hashMap.put("a", 10);
        hashMap.put("b", 25);
        waitNotifyProtocol.notify("signal-id", hashMap, (Map) null);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry4 = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(6L, ((Long) atomicCacheEntry4.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":22,\"b\":27,\"c\":3},\"attributes\":{},\"releasableCount\":0}", (String) atomicCacheEntry4.getValue());
        waitNotifyProtocol.notify("signal-id", "b", 0, (Map) null);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry5 = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(7L, ((Long) atomicCacheEntry5.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":22,\"b\":0,\"c\":3},\"attributes\":{},\"releasableCount\":0}", (String) atomicCacheEntry5.getValue());
    }

    @Test
    public void testNotifyAttributes() throws Exception {
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any());
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.cache);
        HashMap hashMap = new HashMap();
        hashMap.put("p1", "a1");
        hashMap.put("p2", "a1");
        waitNotifyProtocol.notify("signal-id", "a", 1, hashMap);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(1L, ((Long) atomicCacheEntry.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":1},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a1\"},\"releasableCount\":0}", (String) atomicCacheEntry.getValue());
        HashMap hashMap2 = new HashMap();
        hashMap2.put("p2", "a2");
        hashMap2.put("p3", "a2");
        waitNotifyProtocol.notify("signal-id", "a", 1, hashMap2);
        AtomicCacheEntry<String, String, Long> atomicCacheEntry2 = this.cacheEntries.get("signal-id");
        Assertions.assertEquals(2L, ((Long) atomicCacheEntry2.getRevision().orElse(-1L)).longValue());
        assertValueEquals("{\"counts\":{\"a\":2},\"attributes\":{\"p1\":\"a1\",\"p2\":\"a2\",\"p3\":\"a2\"},\"releasableCount\":0}", (String) atomicCacheEntry2.getValue());
    }

    @Test
    public void testSignalCount() throws Exception {
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any());
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.cache);
        Assertions.assertNull(waitNotifyProtocol.getSignal("signal-id"), "Should be null since there's no signal yet");
        waitNotifyProtocol.notify("signal-id", "success", 1, (Map) null);
        WaitNotifyProtocol.Signal signal = waitNotifyProtocol.getSignal("signal-id");
        Assertions.assertNotNull(signal);
        Assertions.assertEquals(1L, signal.getCount("success"));
        Assertions.assertTrue(signal.isCountReached("success", 1L));
        Assertions.assertFalse(signal.isCountReached("success", 2L));
        Assertions.assertTrue(signal.isTotalCountReached(1L));
        Assertions.assertFalse(signal.isTotalCountReached(2L));
        waitNotifyProtocol.notify("signal-id", "failure", 1, (Map) null);
        WaitNotifyProtocol.Signal signal2 = waitNotifyProtocol.getSignal("signal-id");
        Assertions.assertNotNull(signal2);
        Assertions.assertEquals(1L, signal2.getCount("success"));
        Assertions.assertEquals(1L, signal2.getCount("failure"));
        Assertions.assertTrue(signal2.isCountReached("failure", 1L));
        Assertions.assertFalse(signal2.isCountReached("failure", 2L));
        Assertions.assertTrue(signal2.isTotalCountReached(1L));
        Assertions.assertTrue(signal2.isTotalCountReached(2L));
    }

    @Test
    public void testNiFiVersionUpgrade() throws Exception {
        ((AtomicDistributedMapCacheClient) Mockito.doAnswer(this.successfulReplace).when(this.cache)).replace((AtomicCacheEntry) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any(), (Serializer) ArgumentMatchers.any());
        FlowFileAttributesSerializer flowFileAttributesSerializer = new FlowFileAttributesSerializer();
        HashMap hashMap = new HashMap();
        hashMap.put("key1", "value1");
        hashMap.put("key2", "value2");
        hashMap.put("key3", "value3");
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        flowFileAttributesSerializer.serialize(hashMap, byteArrayOutputStream);
        this.cacheEntries.put("old-entry", new AtomicCacheEntry<>("old-entry", new String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8), 0L));
        WaitNotifyProtocol waitNotifyProtocol = new WaitNotifyProtocol(this.cache);
        WaitNotifyProtocol.Signal signal = waitNotifyProtocol.getSignal("old-entry");
        Assertions.assertEquals(1L, signal.getCount("default"));
        Assertions.assertEquals("value1", signal.getAttributes().get("key1"));
        Assertions.assertEquals("value2", signal.getAttributes().get("key2"));
        Assertions.assertEquals("value3", signal.getAttributes().get("key3"));
        this.cacheEntries.put("old-entry", new AtomicCacheEntry<>("old-entry", "UNSUPPORTED_FORMAT", 0L));
        Assertions.assertThrows(DeserializationException.class, () -> {
            waitNotifyProtocol.getSignal("old-entry");
        });
    }

    @Test
    public void testReleaseCandidate() throws Exception {
        List list = (List) IntStream.range(0, 10).boxed().collect(Collectors.toList());
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol.Signal();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        BiConsumer biConsumer = (l, num) -> {
            arrayList.clear();
            arrayList2.clear();
            long longValue = l.longValue();
            int intValue = num.intValue();
            arrayList.getClass();
            Consumer consumer = (v1) -> {
                r5.addAll(v1);
            };
            arrayList2.getClass();
            signal.releaseCandidates("default", longValue, intValue, list, consumer, (v1) -> {
                r6.addAll(v1);
            });
        };
        Field declaredField = WaitNotifyProtocol.Signal.class.getDeclaredField("releasableCount");
        declaredField.setAccessible(true);
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(0, arrayList.size());
        Assertions.assertEquals(10, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount("default"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("default", 1L);
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(0, arrayList.size());
        Assertions.assertEquals(10, arrayList2.size());
        Assertions.assertEquals(1L, signal.getCount("default"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("default", 3L);
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(1, arrayList.size());
        Assertions.assertEquals(9, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount("default"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("default", 6L);
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(2, arrayList.size());
        Assertions.assertEquals(8, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount("default"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("default", 11L);
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(3, arrayList.size());
        Assertions.assertEquals(7, arrayList2.size());
        Assertions.assertEquals(2L, signal.getCount("default"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("default", 6L);
        biConsumer.accept(3L, 2);
        Assertions.assertEquals(4, arrayList.size());
        Assertions.assertEquals(6, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount("default"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("default", 50L);
        biConsumer.accept(3L, 2);
        Assertions.assertEquals(10, arrayList.size());
        Assertions.assertEquals(0, arrayList2.size());
        Assertions.assertEquals(2L, signal.getCount("default"));
        Assertions.assertEquals(22L, declaredField.getLong(signal));
    }

    @Test
    public void testReleaseCandidateTotal() throws Exception {
        List list = (List) IntStream.range(0, 10).boxed().collect(Collectors.toList());
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol.Signal();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        String str = null;
        BiConsumer biConsumer = (l, num) -> {
            arrayList.clear();
            arrayList2.clear();
            long longValue = l.longValue();
            int intValue = num.intValue();
            arrayList.getClass();
            Consumer consumer = (v1) -> {
                r5.addAll(v1);
            };
            arrayList2.getClass();
            signal.releaseCandidates(str, longValue, intValue, list, consumer, (v1) -> {
                r6.addAll(v1);
            });
        };
        Field declaredField = WaitNotifyProtocol.Signal.class.getDeclaredField("releasableCount");
        declaredField.setAccessible(true);
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(0, arrayList.size());
        Assertions.assertEquals(10, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount((String) null));
        Assertions.assertEquals(0L, signal.getCount("consumed"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().remove("consumed");
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(0, arrayList.size());
        Assertions.assertEquals(10, arrayList2.size());
        Assertions.assertEquals(1L, signal.getCount((String) null));
        Assertions.assertEquals(0L, signal.getCount("consumed"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().put("counterB", 1L);
        signal.getCounts().put("counterC", 1L);
        signal.getCounts().remove("consumed");
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(1, arrayList.size());
        Assertions.assertEquals(9, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount((String) null));
        Assertions.assertEquals(-3L, signal.getCount("consumed"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().put("counterB", 2L);
        signal.getCounts().put("counterC", 3L);
        signal.getCounts().remove("consumed");
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(2, arrayList.size());
        Assertions.assertEquals(8, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount((String) null));
        Assertions.assertEquals(-6L, signal.getCount("consumed"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("counterA", 3L);
        signal.getCounts().put("counterB", 3L);
        signal.getCounts().put("counterC", 5L);
        signal.getCounts().remove("consumed");
        biConsumer.accept(3L, 1);
        Assertions.assertEquals(3, arrayList.size());
        Assertions.assertEquals(7, arrayList2.size());
        Assertions.assertEquals(2L, signal.getCount((String) null));
        Assertions.assertEquals(-9L, signal.getCount("consumed"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("counterA", 1L);
        signal.getCounts().put("counterB", 2L);
        signal.getCounts().put("counterC", 3L);
        signal.getCounts().remove("consumed");
        biConsumer.accept(3L, 2);
        Assertions.assertEquals(4, arrayList.size());
        Assertions.assertEquals(6, arrayList2.size());
        Assertions.assertEquals(0L, signal.getCount((String) null));
        Assertions.assertEquals(-6L, signal.getCount("consumed"));
        Assertions.assertEquals(0L, declaredField.getLong(signal));
        signal.getCounts().put("counterA", 10L);
        signal.getCounts().put("counterB", 20L);
        signal.getCounts().put("counterC", 20L);
        signal.getCounts().remove("consumed");
        biConsumer.accept(3L, 2);
        Assertions.assertEquals(10, arrayList.size());
        Assertions.assertEquals(0, arrayList2.size());
        Assertions.assertEquals(2L, signal.getCount((String) null));
        Assertions.assertEquals(-48L, signal.getCount("consumed"));
        Assertions.assertEquals(22L, declaredField.getLong(signal));
    }

    public void assertValueEquals(String str, String str2) throws Exception {
        Assertions.assertEquals(this.mapper.readTree(str), this.mapper.readTree(str2));
    }
}
