package org.apache.nifi.processors.standard.relp.handler;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.dispatcher.AsyncChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.handler.ChannelHandlerFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processors.standard.util.TCPTestServer;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler.class */
public class TestRELPSocketChannelHandler {
    private EventFactory<TestEvent> eventFactory;
    private ChannelHandlerFactory<TestEvent, AsyncChannelDispatcher> channelHandlerFactory;
    private BlockingQueue<ByteBuffer> byteBuffers;
    private BlockingQueue<TestEvent> events;
    private ComponentLog logger = (ComponentLog) Mockito.mock(ComponentLog.class);
    private int maxConnections;
    private SSLContext sslContext;
    private Charset charset;
    private ChannelDispatcher dispatcher;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler$TestEvent.class */
    public static class TestEvent implements Event<SocketChannel> {
        private byte[] data;
        private Map<String, String> metadata;

        public TestEvent(byte[] bArr, Map<String, String> map) {
            this.data = bArr;
            this.metadata = map;
        }

        public String getSender() {
            return this.metadata.get("sender");
        }

        public byte[] getData() {
            return this.data;
        }

        public ChannelResponder<SocketChannel> getResponder() {
            return null;
        }
    }

    /* loaded from: input_file:org/apache/nifi/processors/standard/relp/handler/TestRELPSocketChannelHandler$TestEventHolderFactory.class */
    private static class TestEventHolderFactory implements EventFactory<TestEvent> {
        private TestEventHolderFactory() {
        }

        public TestEvent create(byte[] bArr, Map<String, String> map, ChannelResponder channelResponder) {
            return new TestEvent(bArr, map);
        }

        /* renamed from: create, reason: collision with other method in class */
        public /* bridge */ /* synthetic */ Event m41create(byte[] bArr, Map map, ChannelResponder channelResponder) {
            return create(bArr, (Map<String, String>) map, channelResponder);
        }
    }

    @Before
    public void setup() {
        this.eventFactory = new TestEventHolderFactory();
        this.channelHandlerFactory = new RELPSocketChannelHandlerFactory();
        this.byteBuffers = new LinkedBlockingQueue();
        this.byteBuffers.add(ByteBuffer.allocate(4096));
        this.events = new LinkedBlockingQueue();
        this.logger = (ComponentLog) Mockito.mock(ComponentLog.class);
        this.maxConnections = 1;
        this.sslContext = null;
        this.charset = StandardCharsets.UTF_8;
        this.dispatcher = new SocketChannelDispatcher(this.eventFactory, this.channelHandlerFactory, this.byteBuffers, this.events, this.logger, this.maxConnections, this.sslContext, this.charset);
    }

    @Test
    public void testBasicHandling() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        arrayList.add("1 syslog 20 this is message 1234\n");
        arrayList.add("2 syslog 22 this is message 456789\n");
        arrayList.add("3 syslog 21 this is message ABCDE\n");
        run(arrayList);
        Assert.assertEquals(arrayList.size(), this.events.size());
        boolean z = false;
        boolean z2 = false;
        boolean z3 = false;
        while (true) {
            TestEvent poll = this.events.poll();
            if (poll == null) {
                Assert.assertTrue(z);
                Assert.assertTrue(z2);
                Assert.assertTrue(z3);
                return;
            }
            Map map = poll.metadata;
            Assert.assertTrue(map.containsKey("relp.txnr"));
            String str = (String) map.get("relp.txnr");
            if (str.equals("1")) {
                z = true;
            } else if (str.equals("2")) {
                z2 = true;
            } else if (str.equals("3")) {
                z3 = true;
            }
        }
    }

    @Test
    public void testLotsOfFrames() throws IOException, InterruptedException {
        ArrayList arrayList = new ArrayList();
        for (int i = 100; i < 1000; i++) {
            arrayList.add(i + " syslog 19 this is message " + i + TCPTestServer.DEFAULT_MESSAGE_DELIMITER);
        }
        run(arrayList);
        Assert.assertEquals(arrayList.size(), this.events.size());
    }

    protected void run(List<String> list) throws IOException, InterruptedException {
        ByteBuffer allocate = ByteBuffer.allocate(1024);
        try {
            this.dispatcher.open((InetAddress) null, 0, 4096);
            new Thread((Runnable) this.dispatcher).start();
            int port = this.dispatcher.getPort();
            SocketChannel open = SocketChannel.open();
            Throwable th = null;
            try {
                try {
                    open.connect(new InetSocketAddress("localhost", port));
                    Thread.sleep(100L);
                    for (int i = 0; i < list.size(); i++) {
                        allocate.clear();
                        allocate.put(list.get(i).getBytes(this.charset));
                        allocate.flip();
                        while (allocate.hasRemaining()) {
                            open.write(allocate);
                        }
                        Thread.sleep(1L);
                    }
                    if (open != null) {
                        if (0 != 0) {
                            try {
                                open.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            open.close();
                        }
                    }
                    long currentTimeMillis = System.currentTimeMillis();
                    while (this.events.size() < list.size() && System.currentTimeMillis() - currentTimeMillis < 10000) {
                        Thread.sleep(100L);
                    }
                    Assert.assertEquals(list.size(), this.events.size());
                    this.dispatcher.close();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            this.dispatcher.close();
            throw th3;
        }
    }
}
