package com.github.paganini2008.embeddedio;

import com.github.paganini2008.devtools.Observable;
import com.github.paganini2008.embeddedio.ChannelEvent;
import java.io.IOException;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/* loaded from: input_file:com/github/paganini2008/embeddedio/NioConnector.class */
public class NioConnector extends NioReactor implements IoConnector {
    private final NioReader reader;
    private final ChannelEventPublisher channelEventPublisher;
    private Transformer transformer;
    private int writerBatchSize;
    private int writerBufferSize;
    private int autoFlushInterval;
    private final Observable observable;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/github/paganini2008/embeddedio/NioConnector$ChannelFutureHandler.class */
    public class ChannelFutureHandler implements ChannelHandler {
        private ChannelFutureHandler() {
        }

        @Override // com.github.paganini2008.embeddedio.ChannelHandler
        public void fireChannelActive(Channel channel) throws IOException {
            NioConnector.this.observable.notifyObservers(channel.getRemoteAddr().toString(), channel);
        }

        @Override // com.github.paganini2008.embeddedio.ChannelHandler
        public void fireChannelFatal(Channel channel, Throwable th) {
            NioConnector.this.observable.notifyObservers(th);
        }
    }

    public NioConnector() {
        this(Executors.newCachedThreadPool());
    }

    public NioConnector(Executor executor) {
        super(true);
        this.transformer = new SerializationTransformer();
        this.writerBatchSize = 1;
        this.writerBufferSize = 1024;
        this.autoFlushInterval = 0;
        this.observable = Observable.unrepeatable();
        this.reader = new NioReader();
        this.channelEventPublisher = new DefaultChannelEventPublisher(executor);
        initialize();
    }

    public int getWriterBatchSize() {
        return this.writerBatchSize;
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public void setWriterBatchSize(int i) {
        this.writerBatchSize = i;
    }

    public Transformer getTransformer() {
        return this.transformer;
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public void setTransformer(Transformer transformer) {
        this.transformer = transformer;
    }

    public int getWriterBufferSize() {
        return this.writerBufferSize;
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public void setWriterBufferSize(int i) {
        this.writerBufferSize = i;
    }

    public int getAutoFlushInterval() {
        return this.autoFlushInterval;
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public void setAutoFlushInterval(int i) {
        this.autoFlushInterval = i;
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public void addHandler(ChannelHandler channelHandler) {
        this.channelEventPublisher.subscribeChannelEvent(channelHandler);
    }

    protected void initialize() {
        addHandler(new ChannelFutureHandler());
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public Channel connect(SocketAddress socketAddress, ChannelPromise<Channel> channelPromise) throws IOException {
        SocketChannel open = SocketChannel.open();
        Socket socket = open.socket();
        socket.setKeepAlive(true);
        socket.setReuseAddress(true);
        socket.setTcpNoDelay(true);
        if (this.writerBufferSize > 0) {
            socket.setSendBufferSize(this.writerBufferSize);
        }
        open.configureBlocking(false);
        open.connect(socketAddress);
        if (channelPromise != null) {
            this.observable.addObserver(socketAddress.toString(), (observable, obj) -> {
                if (obj instanceof Throwable) {
                    channelPromise.onFailure((Throwable) obj);
                } else {
                    channelPromise.onSuccess((Channel) obj);
                }
            });
        }
        NioChannel nioChannel = new NioChannel(open, this.channelEventPublisher, this.transformer, this.writerBatchSize, this.autoFlushInterval);
        register(open, 8, nioChannel);
        return nioChannel;
    }

    @Override // com.github.paganini2008.embeddedio.IoConnector
    public void close() {
        this.channelEventPublisher.destroy();
        this.reader.destroy();
        destroy();
    }

    @Override // com.github.paganini2008.embeddedio.NioReactor
    protected boolean isSelectable(SelectionKey selectionKey) {
        return selectionKey.isConnectable();
    }

    @Override // com.github.paganini2008.embeddedio.NioReactor
    protected void process(SelectionKey selectionKey) throws IOException {
        boolean z;
        SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
        Channel channel = (Channel) selectionKey.attachment();
        if (socketChannel.isConnectionPending()) {
            do {
                try {
                } catch (IOException e) {
                    z = false;
                    this.channelEventPublisher.publishChannelEvent(new ChannelEvent(channel, ChannelEvent.EventType.FATAL, null, e));
                }
            } while (!socketChannel.finishConnect());
            z = true;
        } else {
            z = socketChannel.isConnected();
        }
        if (z) {
            this.reader.register(socketChannel, 1, channel);
            this.channelEventPublisher.publishChannelEvent(new ChannelEvent(channel, ChannelEvent.EventType.ACTIVE));
        }
    }
}
