/*
 * Decompiled with CFR 0.152.
 */
package net.dryuf.netty.echo;

import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.ChannelInputShutdownEvent;
import io.netty.channel.socket.DuplexChannel;
import io.netty.channel.socket.DuplexChannelConfig;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import net.dryuf.netty.address.AddressSpec;
import net.dryuf.netty.core.NettyServer;
import net.dryuf.netty.pipeline.FullFlowControlHandler;
import net.dryuf.netty.test.ClientServerTester;
import net.dryuf.netty.util.NettyFutures;
import org.apache.commons.lang3.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

public class EchoEndTester {
    private static final Logger log = LogManager.getLogger(EchoEndTester.class);
    private static final String output = StringUtils.repeat((String)"Hello world\n", (int)1000);

    public static void testEcho() throws Exception {
        try (ClientServerTester tester = new ClientServerTester();){
            InetSocketAddress serverAddress = EchoEndTester.runEchoServer(tester);
            try (ClientServerTester tester2 = new ClientServerTester();){
                EchoEndTester.runEchoClient(tester2, serverAddress, 1);
            }
        }
    }

    public static InetSocketAddress runEchoServer(ClientServerTester tester) {
        return EchoEndTester.runEchoServer(tester, InetSocketAddress.createUnresolved("localhost", 0));
    }

    public static <T extends SocketAddress> T runEchoServer(ClientServerTester tester, T listenAddress) {
        final AtomicInteger serverCount = new AtomicInteger();
        NettyServer server = new NettyServer((Channel)tester.nettyEngine().listen(AddressSpec.fromSocketAddress(listenAddress), new ChannelInitializer<DuplexChannel>(){

            protected void initChannel(DuplexChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new EchoServerHandler(ch, serverCount)});
            }
        }).join());
        tester.addServer(server);
        SocketAddress address = server.listenAddress();
        log.info("EchoServer listening: {}", (Object)address);
        return (T)address;
    }

    public static double runEchoClient(ClientServerTester tester, SocketAddress serverAddress, int attempts) {
        final AtomicInteger pending = new AtomicInteger();
        return tester.runNettyClientLoop(ClientServerTester.TestConfig.builder().batchSize(1000).build(), serverAddress, future -> new ChannelInitializer<DuplexChannel>(){

            protected void initChannel(DuplexChannel ch) throws Exception {
                ch.pipeline().addLast(new ChannelHandler[]{new StringDecoder(), new FullFlowControlHandler(), new EchoClientHandler(future, ch, pending)});
            }
        }, channel -> {
            final AtomicInteger counter = new AtomicInteger(1000);
            return CompletableFuture.completedFuture(null).thenComposeAsync(new Function<Void, CompletableFuture<Void>>(){

                @Override
                public CompletableFuture<Void> apply(Void arg) {
                    if (counter.decrementAndGet() < 0) {
                        return NettyFutures.toCompletable(channel.shutdownOutput());
                    }
                    return NettyFutures.toCompletable(channel.writeAndFlush((Object)Unpooled.wrappedBuffer((byte[])"Hello world\n".getBytes(StandardCharsets.UTF_8)))).thenComposeAsync(this::apply);
                }
            });
        });
    }

    public static class EchoClientHandler
    extends ChannelInboundHandlerAdapter {
        private static final Logger log = LogManager.getLogger(EchoClientHandler.class);
        StringBuilder sb = new StringBuilder();
        private final CompletableFuture<Void> closedPromise;
        AtomicInteger pending;

        public EchoClientHandler(CompletableFuture<Void> closedPromise, DuplexChannel channel, AtomicInteger pending) {
            this.closedPromise = closedPromise;
            ((DuplexChannelConfig)channel.config()).setAutoRead(false);
            ((DuplexChannelConfig)channel.config()).setAllowHalfClosure(true);
            this.pending = pending;
            NettyFutures.copy(channel.closeFuture(), closedPromise);
        }

        public void channelActive(ChannelHandlerContext ctx) {
            this.pending.incrementAndGet();
            ctx.read();
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            this.sb.append((String)msg);
            ReferenceCountUtil.release((Object)msg);
            ctx.read();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled force condition propagation
         * Lifted jumps to return sites
         */
        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof ChannelInputShutdownEvent) {
                try {
                    if (this.sb.toString().equals(output)) return;
                    throw new IllegalStateException("Unexpected output: expected=[" + output + "] actual=[" + this.sb.toString() + "]");
                }
                catch (Throwable ex) {
                    this.closedPromise.completeExceptionally((Throwable)((Object)new AssertionError("Channel test failed: " + String.valueOf(ctx.channel()), ex)));
                    return;
                }
                finally {
                    log.info("Pending: {}", (Object)this.pending.decrementAndGet());
                    NettyFutures.copy(ctx.close(), this.closedPromise);
                }
            } else {
                ctx.read();
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("Exception in client:", cause);
            this.closedPromise.completeExceptionally(cause);
            ctx.close();
        }
    }

    public static class EchoServerHandler
    extends ChannelInboundHandlerAdapter {
        private static final Logger log = LogManager.getLogger(EchoServerHandler.class);
        private final AtomicInteger serverCount;

        public EchoServerHandler(DuplexChannel channel, AtomicInteger serverCount) {
            this.serverCount = serverCount;
            ((DuplexChannelConfig)channel.config()).setAutoRead(false);
            ((DuplexChannelConfig)channel.config()).setAllowHalfClosure(true);
            channel.closeFuture().addListener(f -> log.debug("Pending server: {}", (Object)serverCount.decrementAndGet()));
        }

        public void channelActive(ChannelHandlerContext ctx) {
            this.serverCount.incrementAndGet();
            ctx.read();
        }

        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            ctx.writeAndFlush(msg);
            ctx.read();
        }

        public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
            if (evt instanceof ChannelInputShutdownEvent) {
                ctx.writeAndFlush((Object)Unpooled.EMPTY_BUFFER).addListener(f -> {
                    ChannelFuture newFuture = f;
                    if (f.isSuccess()) {
                        newFuture = ((DuplexChannel)ctx.channel()).shutdownOutput();
                    }
                    newFuture.addListener((GenericFutureListener)ChannelFutureListener.CLOSE);
                });
            }
        }

        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            log.error("Exception in server:", cause);
            ctx.close();
        }
    }
}

