package org.apache.nifi.processors.standard;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.json.JsonTreeReader;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.StandardEvent;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.serialization.record.MockRecordWriter;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/nifi/processors/standard/TestListenUDPRecord.class */
public class TestListenUDPRecord {
    static final String SCHEMA_TEXT = "{\n  \"name\": \"syslogRecord\",\n  \"namespace\": \"nifi\",\n  \"type\": \"record\",\n  \"fields\": [\n    { \"name\": \"timestamp\", \"type\": \"string\" },\n    { \"name\": \"logsource\", \"type\": \"string\" },\n    { \"name\": \"message\", \"type\": \"string\" }\n  ]\n}";
    static final String DATAGRAM_1 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"} ]";
    static final String DATAGRAM_2 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"} ]";
    static final String DATAGRAM_3 = "[ {\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"} ]";
    static final String MULTI_DATAGRAM_1 = "[{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 1\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 2\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 3\"}]";
    static final String MULTI_DATAGRAM_2 = "[{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 4\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 5\"},{\"timestamp\" : \"123456789\", \"logsource\" : \"syslog\", \"message\" : \"This is a test 6\"}]";
    private TestableListenUDPRecord proc;
    private TestRunner runner;
    private MockRecordWriter mockRecordWriter;

    /* loaded from: input_file:org/apache/nifi/processors/standard/TestListenUDPRecord$TestableListenUDPRecord.class */
    private static class TestableListenUDPRecord extends ListenUDPRecord {
        private volatile BlockingQueue<StandardEvent> testEvents;
        private volatile BlockingQueue<StandardEvent> testErrorEvents;

        private TestableListenUDPRecord() {
            this.testEvents = new LinkedBlockingQueue();
            this.testErrorEvents = new LinkedBlockingQueue();
        }

        protected ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<StandardEvent> blockingQueue) throws IOException {
            return (ChannelDispatcher) Mockito.mock(ChannelDispatcher.class);
        }

        public void addEvent(StandardEvent standardEvent) {
            this.testEvents.add(standardEvent);
        }

        public void addErrorEvent(StandardEvent standardEvent) {
            this.testErrorEvents.add(standardEvent);
        }

        /* JADX INFO: Access modifiers changed from: protected */
        /* renamed from: getMessage, reason: merged with bridge method [inline-methods] */
        public StandardEvent m28getMessage(boolean z, boolean z2, ProcessSession processSession) {
            StandardEvent standardEvent = null;
            if (z2) {
                standardEvent = this.testErrorEvents.poll();
            }
            if (standardEvent == null) {
                try {
                    standardEvent = z ? this.testEvents.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS) : this.testEvents.poll();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return null;
                }
            }
            return standardEvent;
        }
    }

    @Before
    public void setup() throws InitializationException {
        this.proc = new TestableListenUDPRecord();
        this.runner = TestRunners.newTestRunner(this.proc);
        this.runner.setProperty(ListenUDP.PORT, "1");
        JsonTreeReader jsonTreeReader = new JsonTreeReader();
        this.runner.addControllerService("record-reader", jsonTreeReader);
        this.runner.setProperty(jsonTreeReader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.SCHEMA_TEXT_PROPERTY.getValue());
        this.runner.setProperty(jsonTreeReader, SchemaAccessUtils.SCHEMA_TEXT, SCHEMA_TEXT);
        this.runner.enableControllerService(jsonTreeReader);
        this.mockRecordWriter = new MockRecordWriter("timestamp, logsource, message");
        this.runner.addControllerService("record-writer", this.mockRecordWriter);
        this.runner.enableControllerService(this.mockRecordWriter);
        this.runner.setProperty(ListenUDPRecord.RECORD_READER, "record-reader");
        this.runner.setProperty(ListenUDPRecord.RECORD_WRITER, "record-writer");
    }

    @Test
    public void testSuccessWithBatchSizeGreaterThanAvailableRecords() {
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_2.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_3.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0)).assertAttributeEquals("record.count", "3");
    }

    @Test
    public void testSuccessWithBatchLessThanAvailableRecords() {
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_2.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_3.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.runner.setProperty(ListenUDPRecord.BATCH_SIZE, "1");
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0)).assertAttributeEquals("record.count", "1");
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0)).assertAttributeEquals("record.count", "1");
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0)).assertAttributeEquals("record.count", "1");
        this.runner.clearTransferState();
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
    }

    @Test
    public void testMultipleRecordsPerDatagram() {
        this.proc.addEvent(new StandardEvent("foo", MULTI_DATAGRAM_1.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", MULTI_DATAGRAM_2.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_SUCCESS).get(0)).assertAttributeEquals("record.count", "6");
    }

    @Test
    public void testParseFailure() {
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", "WILL NOT PARSE".getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.runner.run();
        this.runner.assertTransferCount(ListenUDPRecord.REL_SUCCESS, 1);
        this.runner.assertTransferCount(ListenUDPRecord.REL_PARSE_FAILURE, 1);
        ((MockFlowFile) this.runner.getFlowFilesForRelationship(ListenUDPRecord.REL_PARSE_FAILURE).get(0)).assertContentEquals("WILL NOT PARSE");
    }

    @Test
    public void testWriterFailure() throws InitializationException {
        this.mockRecordWriter = new MockRecordWriter("timestamp, logsource, message", false, 2);
        this.runner.addControllerService("record-writer", this.mockRecordWriter);
        this.runner.enableControllerService(this.mockRecordWriter);
        this.runner.setProperty(ListenUDPRecord.RECORD_WRITER, "record-writer");
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_1.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_2.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.proc.addEvent(new StandardEvent("foo", DATAGRAM_3.getBytes(StandardCharsets.UTF_8), (ChannelResponder) null));
        this.runner.run();
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_SUCCESS, 0);
        this.runner.assertAllFlowFilesTransferred(ListenUDPRecord.REL_PARSE_FAILURE, 0);
    }
}
