package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.regex.Pattern;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
import org.apache.nifi.distributed.cache.client.AtomicDistributedMapCacheClient;
import org.apache.nifi.distributed.cache.client.Deserializer;
import org.apache.nifi.distributed.cache.client.Serializer;
import org.apache.nifi.processors.standard.WaitNotifyProtocol;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestNotify.class */
public class TestNotify {
    private TestRunner runner;
    private MockCacheClient service;

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestNotify$MockCacheClient.class */
    static class MockCacheClient extends AbstractControllerService implements AtomicDistributedMapCacheClient<Long> {
        private final ConcurrentMap<Object, AtomicCacheEntry<Object, Object, Long>> values = new ConcurrentHashMap();
        private boolean failOnCalls = false;

        /* JADX INFO: Access modifiers changed from: package-private */
        public void setFailOnCalls(boolean z) {
            this.failOnCalls = z;
        }

        private void verifyNotFail() throws IOException {
            if (this.failOnCalls) {
                throw new IOException("Could not call to remote service because Unit Test marked service unavailable");
            }
        }

        private void unsupported() throws UnsupportedOperationException {
            throw new UnsupportedOperationException("This method shouldn't be used from Notify processor.");
        }

        public <K, V> boolean putIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
            unsupported();
            return false;
        }

        public <K, V> V getAndPutIfAbsent(K k, V v, Serializer<K> serializer, Serializer<V> serializer2, Deserializer<V> deserializer) throws IOException {
            unsupported();
            return null;
        }

        public <K> boolean containsKey(K k, Serializer<K> serializer) throws IOException {
            unsupported();
            return false;
        }

        public <K, V> void put(K k, V v, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
            unsupported();
        }

        public <K, V> V get(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
            verifyNotFail();
            AtomicCacheEntry<Object, Object, Long> atomicCacheEntry = this.values.get(k);
            if (atomicCacheEntry == null) {
                return null;
            }
            return (V) deserializer.deserialize(((String) atomicCacheEntry.getValue()).getBytes(StandardCharsets.UTF_8));
        }

        public void close() throws IOException {
        }

        public <K> boolean remove(K k, Serializer<K> serializer) throws IOException {
            verifyNotFail();
            return this.values.remove(k) != null;
        }

        public long removeByPattern(String str) throws IOException {
            verifyNotFail();
            ArrayList arrayList = new ArrayList();
            Pattern compile = Pattern.compile(str);
            for (Object obj : this.values.keySet()) {
                if (compile.matcher(obj.toString()).matches()) {
                    arrayList.add(this.values.get(obj));
                }
            }
            long size = arrayList.size();
            ConcurrentMap<Object, AtomicCacheEntry<Object, Object, Long>> concurrentMap = this.values;
            concurrentMap.getClass();
            arrayList.forEach(concurrentMap::remove);
            return size;
        }

        /* JADX WARN: Multi-variable type inference failed */
        public <K, V> AtomicCacheEntry<K, V, Long> fetch(K k, Serializer<K> serializer, Deserializer<V> deserializer) throws IOException {
            verifyNotFail();
            return this.values.get(k);
        }

        public <K, V> boolean replace(AtomicCacheEntry<K, V, Long> atomicCacheEntry, Serializer<K> serializer, Serializer<V> serializer2) throws IOException {
            verifyNotFail();
            Object key = atomicCacheEntry.getKey();
            AtomicCacheEntry<Object, Object, Long> atomicCacheEntry2 = this.values.get(key);
            if (atomicCacheEntry2 != null && !atomicCacheEntry2.getRevision().equals(atomicCacheEntry.getRevision())) {
                return false;
            }
            this.values.put(key, new AtomicCacheEntry<>(key, atomicCacheEntry.getValue(), Long.valueOf(((Long) atomicCacheEntry.getRevision().orElse(0L)).longValue() + 1)));
            return true;
        }
    }

    @Before
    public void setup() throws InitializationException {
        this.runner = TestRunners.newTestRunner(Notify.class);
        this.service = new MockCacheClient();
        this.runner.addControllerService("service", this.service);
        this.runner.enableControllerService(this.service);
        this.runner.setProperty(Notify.DISTRIBUTED_CACHE_SERVICE, "service");
    }

    @Test
    public void testNotify() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        hashMap.put("key", "value");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).get(0)).assertAttributeEquals("notified", "true");
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol(this.service).getSignal("1");
        Assert.assertEquals("value", signal.getAttributes().get("key"));
        Assert.assertTrue(signal.isTotalCountReached(1L));
    }

    @Test
    public void testNotifyCounters() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "someDataProcessing");
        hashMap.put("key", "data1");
        hashMap.put("status", "success");
        this.runner.enqueue(new byte[0], hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "someDataProcessing");
        hashMap2.put("key", "data2");
        hashMap2.put("status", "success");
        this.runner.enqueue(new byte[0], hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("releaseSignalAttribute", "someDataProcessing");
        hashMap3.put("key", "data3");
        hashMap3.put("status", "failure");
        this.runner.enqueue(new byte[0], hashMap3);
        this.runner.run(3);
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(mockFlowFile -> {
            mockFlowFile.assertAttributeEquals("notified", "true");
        });
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol(this.service).getSignal("someDataProcessing");
        Assert.assertEquals("Same attribute key will be overwritten by the latest signal", "data3", signal.getAttributes().get("key"));
        Assert.assertTrue(signal.isTotalCountReached(3L));
        Assert.assertEquals(2L, signal.getCount("success"));
        Assert.assertEquals(1L, signal.getCount("failure"));
    }

    @Test
    public void testNotifyCountersBatch() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        this.runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "2");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "someDataProcessing");
        hashMap.put("key", "data1");
        hashMap.put("status", "success");
        this.runner.enqueue(new byte[0], hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "someDataProcessing");
        hashMap2.put("key", "data2");
        hashMap2.put("status", "success");
        this.runner.enqueue(new byte[0], hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("releaseSignalAttribute", "someDataProcessing");
        hashMap3.put("key", "data3");
        hashMap3.put("status", "failure");
        this.runner.enqueue(new byte[0], hashMap3);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 2);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(mockFlowFile -> {
            mockFlowFile.assertAttributeEquals("notified", "true");
        });
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol(this.service).getSignal("someDataProcessing");
        Assert.assertEquals("Same attribute key will be overwritten by the latest signal", "data2", signal.getAttributes().get("key"));
        Assert.assertTrue(signal.isTotalCountReached(2L));
        Assert.assertEquals(2L, signal.getCount("success"));
        Assert.assertEquals(0L, signal.getCount("failure"));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(mockFlowFile2 -> {
            mockFlowFile2.assertAttributeEquals("notified", "true");
        });
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal2 = new WaitNotifyProtocol(this.service).getSignal("someDataProcessing");
        Assert.assertEquals("Same attribute key will be overwritten by the latest signal", "data3", signal2.getAttributes().get("key"));
        Assert.assertTrue(signal2.isTotalCountReached(3L));
        Assert.assertEquals(2L, signal2.getCount("success"));
        Assert.assertEquals(1L, signal2.getCount("failure"));
    }

    @Test
    public void testNotifyCountersUsingDelta() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}");
        this.runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "someDataProcessing");
        hashMap.put("key", "data1");
        hashMap.put("status", "success");
        hashMap.put("record.count", "1024");
        this.runner.enqueue(new byte[0], hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "someDataProcessing");
        hashMap2.put("key", "data2");
        hashMap2.put("status", "success");
        hashMap2.put("record.count", "2048");
        this.runner.enqueue(new byte[0], hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("releaseSignalAttribute", "someDataProcessing");
        hashMap3.put("key", "data3");
        hashMap3.put("status", "failure");
        hashMap3.put("record.count", "512");
        this.runner.enqueue(new byte[0], hashMap3);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 3);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(mockFlowFile -> {
            mockFlowFile.assertAttributeEquals("notified", "true");
        });
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol(this.service).getSignal("someDataProcessing");
        Assert.assertEquals("Same attribute key will be overwritten by the latest signal", "data3", signal.getAttributes().get("key"));
        Assert.assertTrue(signal.isTotalCountReached(3584L));
        Assert.assertEquals(3072L, signal.getCount("success"));
        Assert.assertEquals(512L, signal.getCount("failure"));
    }

    @Test
    public void testIllegalDelta() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, ".*");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_NAME, "${status}");
        this.runner.setProperty(Notify.SIGNAL_COUNTER_DELTA, "${record.count}");
        this.runner.setProperty(Notify.SIGNAL_BUFFER_COUNT, "10");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "someDataProcessing");
        hashMap.put("key", "data1");
        hashMap.put("status", "success");
        hashMap.put("record.count", "1024");
        this.runner.enqueue(new byte[0], hashMap);
        HashMap hashMap2 = new HashMap();
        hashMap2.put("releaseSignalAttribute", "someDataProcessing");
        hashMap2.put("key", "data2");
        hashMap2.put("status", "success");
        hashMap2.put("record.count", "2048 records");
        this.runner.enqueue(new byte[0], hashMap2);
        HashMap hashMap3 = new HashMap();
        hashMap3.put("releaseSignalAttribute", "someDataProcessing");
        hashMap3.put("key", "data3");
        hashMap3.put("status", "failure");
        hashMap3.put("record.count", "512");
        this.runner.enqueue(new byte[0], hashMap3);
        this.runner.run();
        this.runner.assertTransferCount(Notify.REL_SUCCESS, 2);
        this.runner.getFlowFilesForRelationship(Notify.REL_SUCCESS).forEach(mockFlowFile -> {
            mockFlowFile.assertAttributeEquals("notified", "true");
        });
        this.runner.assertTransferCount(Notify.REL_FAILURE, 1);
        this.runner.getFlowFilesForRelationship(Notify.REL_FAILURE).forEach(mockFlowFile2 -> {
            mockFlowFile2.assertAttributeEquals("notified", "false");
        });
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol(this.service).getSignal("someDataProcessing");
        Assert.assertEquals("Same attribute key will be overwritten by the latest signal", "data3", signal.getAttributes().get("key"));
        Assert.assertTrue(signal.isTotalCountReached(1536L));
        Assert.assertEquals(1024L, signal.getCount("success"));
        Assert.assertEquals(512L, signal.getCount("failure"));
    }

    @Test
    public void testRegex() throws InitializationException, IOException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.setProperty(Notify.ATTRIBUTE_CACHE_REGEX, "key[0-9]*");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "1");
        hashMap.put("key1", "value");
        hashMap.put("other.key1", "value");
        this.runner.enqueue(new byte[0], hashMap);
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_SUCCESS, 1);
        this.runner.clearTransferState();
        WaitNotifyProtocol.Signal signal = new WaitNotifyProtocol(this.service).getSignal("1");
        Map attributes = signal.getAttributes();
        Assert.assertEquals("value", attributes.get("key1"));
        Assert.assertNull(attributes.get("other.key1"));
        Assert.assertTrue(signal.isTotalCountReached(1L));
    }

    @Test
    public void testEmptyReleaseSignal() throws InitializationException, InterruptedException {
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        this.runner.enqueue(new byte[0], new HashMap());
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(Notify.REL_FAILURE, 1);
        this.runner.clearTransferState();
    }

    @Test
    public void testFailingCacheService() throws InitializationException, IOException {
        this.service.setFailOnCalls(true);
        this.runner.setProperty(Notify.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
        HashMap hashMap = new HashMap();
        hashMap.put("releaseSignalAttribute", "2");
        this.runner.enqueue(new byte[0], hashMap);
        try {
            this.runner.run();
            Assert.fail("Processor should throw RuntimeException in case it receives an IO exception from the cache service and yield for a while.");
        } catch (AssertionError e) {
            Assert.assertTrue(e.getCause() instanceof RuntimeException);
        }
        this.service.setFailOnCalls(false);
    }
}
