package org.apache.flink.kinesis.shaded.io.netty.channel.local;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.TimeUnit;
import org.apache.flink.kinesis.shaded.io.netty.bootstrap.ServerBootstrap;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelDuplexHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandler;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelHandlerContext;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelInitializer;
import org.apache.flink.kinesis.shaded.io.netty.channel.ChannelPromise;
import org.apache.flink.kinesis.shaded.io.netty.channel.DefaultEventLoopGroup;
import org.apache.flink.kinesis.shaded.io.netty.channel.EventLoopGroup;
import org.apache.flink.kinesis.shaded.io.netty.util.ReferenceCountUtil;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultEventExecutorGroup;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.DefaultThreadFactory;
import org.apache.flink.kinesis.shaded.io.netty.util.concurrent.EventExecutorGroup;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

/* loaded from: input_file:org/apache/flink/kinesis/shaded/io/netty/channel/local/LocalTransportThreadModelTest3.class */
public class LocalTransportThreadModelTest3 {
    private static EventLoopGroup group;
    private static LocalAddress localAddr;

    /* JADX INFO: Access modifiers changed from: private */
    @ChannelHandler.Sharable
    /* loaded from: input_file:org/apache/flink/kinesis/shaded/io/netty/channel/local/LocalTransportThreadModelTest3$EventForwarder.class */
    public static final class EventForwarder extends ChannelDuplexHandler {
        private EventForwarder() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kinesis/shaded/io/netty/channel/local/LocalTransportThreadModelTest3$EventRecorder.class */
    public static final class EventRecorder extends ChannelDuplexHandler {
        private final Queue<EventType> events;
        private final boolean inbound;

        EventRecorder(Queue<EventType> queue, boolean z) {
            this.events = queue;
            this.inbound = z;
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) throws Exception {
            this.events.add(EventType.EXCEPTION_CAUGHT);
        }

        public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if (this.inbound) {
                this.events.add(EventType.USER_EVENT);
            }
        }

        public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
            if (this.inbound) {
                this.events.add(EventType.MESSAGE_RECEIVED_LAST);
            }
        }

        public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.events.add(EventType.INACTIVE);
        }

        public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.events.add(EventType.ACTIVE);
        }

        public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.events.add(EventType.UNREGISTERED);
        }

        public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
            this.events.add(EventType.REGISTERED);
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            if (this.inbound) {
                this.events.add(EventType.MESSAGE_RECEIVED);
            }
        }

        public void write(ChannelHandlerContext channelHandlerContext, Object obj, ChannelPromise channelPromise) throws Exception {
            if (!this.inbound) {
                this.events.add(EventType.WRITE);
            }
            channelPromise.setSuccess();
        }

        public void read(ChannelHandlerContext channelHandlerContext) {
            if (this.inbound) {
                return;
            }
            this.events.add(EventType.READ);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/kinesis/shaded/io/netty/channel/local/LocalTransportThreadModelTest3$EventType.class */
    public enum EventType {
        EXCEPTION_CAUGHT,
        USER_EVENT,
        MESSAGE_RECEIVED_LAST,
        INACTIVE,
        ACTIVE,
        UNREGISTERED,
        REGISTERED,
        MESSAGE_RECEIVED,
        WRITE,
        READ
    }

    @BeforeAll
    public static void init() {
        group = new DefaultEventLoopGroup();
        ServerBootstrap serverBootstrap = new ServerBootstrap();
        serverBootstrap.group(group).channel(LocalServerChannel.class).childHandler(new ChannelInitializer<LocalChannel>() { // from class: org.apache.flink.kinesis.shaded.io.netty.channel.local.LocalTransportThreadModelTest3.1
            public void initChannel(LocalChannel localChannel) throws Exception {
                localChannel.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: org.apache.flink.kinesis.shaded.io.netty.channel.local.LocalTransportThreadModelTest3.1.1
                    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) {
                        ReferenceCountUtil.release(obj);
                    }
                }});
            }
        });
        localAddr = serverBootstrap.bind(LocalAddress.ANY).syncUninterruptibly().channel().localAddress();
    }

    @AfterAll
    public static void destroy() throws Exception {
        group.shutdownGracefully().sync();
    }

    @Disabled("regression test")
    @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
    @Test
    public void testConcurrentAddRemoveInboundEventsMultiple() throws Throwable {
        for (int i = 0; i < 50; i++) {
            testConcurrentAddRemoveInboundEvents();
        }
    }

    @Disabled("regression test")
    @Timeout(value = 60000, unit = TimeUnit.MILLISECONDS)
    @Test
    public void testConcurrentAddRemoveOutboundEventsMultiple() throws Throwable {
        for (int i = 0; i < 50; i++) {
            testConcurrentAddRemoveOutboundEvents();
        }
    }

    @Disabled("needs a fix")
    @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
    @Test
    public void testConcurrentAddRemoveInboundEvents() throws Throwable {
        testConcurrentAddRemove(true);
    }

    @Disabled("needs a fix")
    @Timeout(value = 30000, unit = TimeUnit.MILLISECONDS)
    @Test
    public void testConcurrentAddRemoveOutboundEvents() throws Throwable {
        testConcurrentAddRemove(false);
    }

    private static void testConcurrentAddRemove(boolean z) throws Exception {
        DefaultEventLoopGroup defaultEventLoopGroup = new DefaultEventLoopGroup(4, new DefaultThreadFactory("l"));
        EventExecutorGroup defaultEventExecutorGroup = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e1"));
        EventExecutorGroup defaultEventExecutorGroup2 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e2"));
        EventExecutorGroup defaultEventExecutorGroup3 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e3"));
        EventExecutorGroup defaultEventExecutorGroup4 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e4"));
        EventExecutorGroup defaultEventExecutorGroup5 = new DefaultEventExecutorGroup(4, new DefaultThreadFactory("e5"));
        final EventExecutorGroup[] eventExecutorGroupArr = {defaultEventExecutorGroup, defaultEventExecutorGroup2, defaultEventExecutorGroup3, defaultEventExecutorGroup4, defaultEventExecutorGroup5};
        try {
            ConcurrentLinkedDeque concurrentLinkedDeque = new ConcurrentLinkedDeque();
            ChannelHandler eventForwarder = new EventForwarder();
            ChannelHandler eventForwarder2 = new EventForwarder();
            ChannelHandler eventForwarder3 = new EventForwarder();
            ChannelHandler eventForwarder4 = new EventForwarder();
            ChannelHandler eventForwarder5 = new EventForwarder();
            EventRecorder eventRecorder = new EventRecorder(concurrentLinkedDeque, z);
            final LocalChannel localChannel = new LocalChannel();
            if (!z) {
                localChannel.config().setAutoRead(false);
            }
            localChannel.pipeline().addLast(defaultEventExecutorGroup, new ChannelHandler[]{eventForwarder}).addLast(defaultEventExecutorGroup, new ChannelHandler[]{eventForwarder2}).addLast(defaultEventExecutorGroup, new ChannelHandler[]{eventForwarder3}).addLast(defaultEventExecutorGroup, new ChannelHandler[]{eventForwarder4}).addLast(defaultEventExecutorGroup, new ChannelHandler[]{eventForwarder5}).addLast(defaultEventExecutorGroup, "recorder", eventRecorder);
            defaultEventLoopGroup.register(localChannel).sync().channel().connect(localAddr).sync();
            LinkedList<EventType> events = events(z, 8192);
            Throwable th = new Throwable();
            Thread thread = new Thread(new Runnable() { // from class: org.apache.flink.kinesis.shaded.io.netty.channel.local.LocalTransportThreadModelTest3.2
                @Override // java.lang.Runnable
                public void run() {
                    Random random = new Random();
                    while (true) {
                        try {
                            Thread.sleep(100L);
                            if (localChannel.isRegistered()) {
                                localChannel.pipeline().addBefore(eventExecutorGroupArr[random.nextInt(eventExecutorGroupArr.length)], "recorder", UUID.randomUUID().toString(), localChannel.pipeline().removeFirst());
                            }
                        } catch (InterruptedException e) {
                            return;
                        }
                    }
                }
            });
            thread.setDaemon(true);
            thread.start();
            Iterator<EventType> it = events.iterator();
            while (it.hasNext()) {
                switch (it.next()) {
                    case EXCEPTION_CAUGHT:
                        localChannel.pipeline().fireExceptionCaught(th);
                        break;
                    case MESSAGE_RECEIVED:
                        localChannel.pipeline().fireChannelRead("");
                        break;
                    case MESSAGE_RECEIVED_LAST:
                        localChannel.pipeline().fireChannelReadComplete();
                        break;
                    case USER_EVENT:
                        localChannel.pipeline().fireUserEventTriggered("");
                        break;
                    case WRITE:
                        localChannel.pipeline().write("");
                        break;
                    case READ:
                        localChannel.pipeline().read();
                        break;
                }
            }
            localChannel.close().sync();
            while (concurrentLinkedDeque.peekLast() != EventType.UNREGISTERED) {
                Thread.sleep(10L);
            }
            events.addFirst(EventType.ACTIVE);
            events.addFirst(EventType.REGISTERED);
            events.addLast(EventType.INACTIVE);
            events.addLast(EventType.UNREGISTERED);
            while (true) {
                EventType eventType = (EventType) concurrentLinkedDeque.poll();
                if (eventType == null) {
                    Assertions.assertTrue(events.isEmpty(), "Missing events:" + events);
                    return;
                }
                Assertions.assertEquals(eventType, events.poll());
            }
        } finally {
            defaultEventLoopGroup.shutdownGracefully();
            defaultEventExecutorGroup.shutdownGracefully();
            defaultEventExecutorGroup2.shutdownGracefully();
            defaultEventExecutorGroup3.shutdownGracefully();
            defaultEventExecutorGroup4.shutdownGracefully();
            defaultEventExecutorGroup5.shutdownGracefully();
            defaultEventLoopGroup.terminationFuture().sync();
            defaultEventExecutorGroup.terminationFuture().sync();
            defaultEventExecutorGroup2.terminationFuture().sync();
            defaultEventExecutorGroup3.terminationFuture().sync();
            defaultEventExecutorGroup4.terminationFuture().sync();
            defaultEventExecutorGroup5.terminationFuture().sync();
        }
    }

    private static LinkedList<EventType> events(boolean z, int i) {
        EventType[] eventTypeArr = z ? new EventType[]{EventType.USER_EVENT, EventType.MESSAGE_RECEIVED, EventType.MESSAGE_RECEIVED_LAST, EventType.EXCEPTION_CAUGHT} : new EventType[]{EventType.READ, EventType.WRITE, EventType.EXCEPTION_CAUGHT};
        Random random = new Random();
        LinkedList<EventType> linkedList = new LinkedList<>();
        for (int i2 = 0; i2 < i; i2++) {
            linkedList.add(eventTypeArr[random.nextInt(eventTypeArr.length)]);
        }
        return linkedList;
    }
}
