package org.apache.inlong.sdk.dataproxy.pb.network;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelUpstreamHandler;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/pb/network/TcpChannelGroup.class */
public class TcpChannelGroup {
    public static final Logger LOG = LoggerFactory.getLogger(TcpChannelGroup.class);
    private String bid;
    private int senderThreadNum;
    private ClientBootstrap client = new ClientBootstrap();
    private Set<IpPort> currentIpLists = new HashSet();
    private List<LinkedBlockingQueue<TcpChannel>> channelQueues = new ArrayList();
    private List<List<TcpChannel>> channelLists = new ArrayList();
    private int mIndex = 0;
    private ConcurrentHashMap<Object, TcpChannel> channelMap = new ConcurrentHashMap<>();
    private AtomicLong channelId = new AtomicLong(0);

    public TcpChannelGroup(String str, int i, ChannelUpstreamHandler channelUpstreamHandler, SimpleChannelHandler simpleChannelHandler) {
        this.bid = str;
        this.senderThreadNum = i;
        LOG.info("TcpChannelGroup netty thread pool size is " + i);
        this.client.setFactory(new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors.newCachedThreadPool(), this.senderThreadNum));
        this.client.setPipelineFactory(() -> {
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", channelUpstreamHandler);
            pipeline.addLast("encoder", new ByteArrayToBinaryEncoder());
            pipeline.addLast("handler", simpleChannelHandler);
            return pipeline;
        });
        this.client.setOption("tcpNoDelay", false);
        this.client.setOption("child.tcpNoDelay", false);
        this.client.setOption("keepAlive", true);
        this.client.setOption("child.keepAlive", true);
        this.client.setOption("reuseAddr", false);
        this.channelQueues.add(new LinkedBlockingQueue<>());
        this.channelQueues.add(new LinkedBlockingQueue<>());
        this.channelLists.add(new ArrayList());
        this.channelLists.add(new ArrayList());
    }

    public void updateConfig(Set<IpPort> set) {
        boolean z = !this.currentIpLists.equals(set);
        LOG.info("TcpChannelGroup updateConfig bid:{},hasIpChange:{},currentIpLists:{},ipLists:{}", new Object[]{this.bid, Boolean.valueOf(z), this.currentIpLists, set});
        int i = this.mIndex ^ 1;
        this.channelQueues.get(i).clear();
        for (TcpChannel tcpChannel : this.channelLists.get(i)) {
            LOG.info(String.format("TcpChannelGroup updateConfig bid:%s,disconnect and close:%s", this.bid, tcpChannel));
            this.channelMap.remove(tcpChannel.getChannel().getAttachment());
            tcpChannel.close();
        }
        this.channelLists.get(i).clear();
        if (!z) {
            for (Map.Entry<Object, TcpChannel> entry : this.channelMap.entrySet()) {
                LOG.info("TcpChannelGroup channel status index:{},isConnected:{},isHasException:{},isReconnectFail:{},availablePermits:{}", new Object[]{entry.getValue().getChannel().getAttachment(), Boolean.valueOf(entry.getValue().getChannel().isConnected()), Boolean.valueOf(entry.getValue().isHasException()), Boolean.valueOf(entry.getValue().isReconnectFail()), Integer.valueOf(entry.getValue().getPackToken().availablePermits())});
                entry.getValue().setHasException(false);
                entry.getValue().setReconnectFail(false);
            }
            return;
        }
        int i2 = this.mIndex ^ 1;
        LinkedBlockingQueue<TcpChannel> linkedBlockingQueue = this.channelQueues.get(i2);
        List<TcpChannel> list = this.channelLists.get(i2);
        for (IpPort ipPort : set) {
            try {
                Channel channel = this.client.connect(ipPort.addr).await().getChannel();
                Long valueOf = Long.valueOf(this.channelId.getAndIncrement());
                channel.setAttachment(valueOf);
                TcpChannel tcpChannel2 = new TcpChannel(channel, ipPort);
                linkedBlockingQueue.add(tcpChannel2);
                list.add(tcpChannel2);
                this.channelMap.put(valueOf, tcpChannel2);
            } catch (Throwable th) {
                LOG.error(String.format("bid:%s,ipPort:%s,connect failed:%s", this.bid, ipPort, th.getMessage()), th);
            }
        }
        this.currentIpLists = set;
        this.mIndex = i2;
    }

    public TcpResult send(ChannelBuffer channelBuffer) {
        LinkedBlockingQueue<TcpChannel> linkedBlockingQueue = this.channelQueues.get(this.mIndex);
        TcpChannel tcpChannel = getTcpChannel(linkedBlockingQueue);
        try {
            if (tcpChannel == null) {
                return new TcpResult("", 0, false, "can not acquire a channel");
            }
            tcpChannel.acquireUninterruptibly();
            ChannelFuture await = tcpChannel.getChannel().write(channelBuffer).sync().await();
            if (await.isSuccess()) {
                linkedBlockingQueue.offer(tcpChannel);
                TcpResult tcpResult = new TcpResult(tcpChannel.getIpPort(), true, "");
                tcpResult.channelId = (Long) tcpChannel.getChannel().getAttachment();
                return tcpResult;
            }
            tcpChannel.setHasException(true);
            linkedBlockingQueue.offer(tcpChannel);
            String message = await.getCause() != null ? await.getCause().getMessage() : "write fail";
            LOG.error(String.format("bid:%s,write failed:%s", this.bid, message), await.getCause());
            return new TcpResult(tcpChannel.getIpPort(), false, message);
        } catch (Throwable th) {
            LOG.error(String.format("bid:%s,netty send failed:%s", this.bid, th.getMessage()), th);
            if (tcpChannel != null) {
                linkedBlockingQueue.offer(tcpChannel);
            }
            return new TcpResult("", 0, false, th.getMessage());
        }
    }

    /* JADX WARN: Code restructure failed: missing block: B:20:0x004a, code lost:
    
        r9.setHasException(false);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    private org.apache.inlong.sdk.dataproxy.pb.network.TcpChannel getTcpChannel(java.util.concurrent.LinkedBlockingQueue<org.apache.inlong.sdk.dataproxy.pb.network.TcpChannel> r8) {
        /*
            r7 = this;
            r0 = 0
            r9 = r0
            r0 = 0
            r10 = r0
        L4:
            r0 = r10
            r1 = r8
            int r1 = r1.size()     // Catch: java.lang.Throwable -> L6d
            if (r0 >= r1) goto L6b
            r0 = r8
            java.lang.Object r0 = r0.take()     // Catch: java.lang.Throwable -> L6d
            org.apache.inlong.sdk.dataproxy.pb.network.TcpChannel r0 = (org.apache.inlong.sdk.dataproxy.pb.network.TcpChannel) r0     // Catch: java.lang.Throwable -> L6d
            r9 = r0
            r0 = r9
            boolean r0 = r0.isReconnectFail()     // Catch: java.lang.Throwable -> L6d
            if (r0 == 0) goto L26
            r0 = r8
            r1 = r9
            boolean r0 = r0.offer(r1)     // Catch: java.lang.Throwable -> L6d
            r0 = 0
            r9 = r0
            goto L65
        L26:
            r0 = r9
            org.jboss.netty.channel.Channel r0 = r0.getChannel()     // Catch: java.lang.Throwable -> L6d
            boolean r0 = r0.isConnected()     // Catch: java.lang.Throwable -> L6d
            if (r0 == 0) goto L39
            r0 = r9
            boolean r0 = r0.isHasException()     // Catch: java.lang.Throwable -> L6d
            if (r0 == 0) goto L3e
        L39:
            r0 = r7
            r1 = r9
            r0.reconnect(r1)     // Catch: java.lang.Throwable -> L6d
        L3e:
            r0 = r9
            org.jboss.netty.channel.Channel r0 = r0.getChannel()     // Catch: java.lang.Throwable -> L6d
            boolean r0 = r0.isConnected()     // Catch: java.lang.Throwable -> L6d
            if (r0 == 0) goto L52
            r0 = r9
            r1 = 0
            r0.setHasException(r1)     // Catch: java.lang.Throwable -> L6d
            goto L6b
        L52:
            org.slf4j.Logger r0 = org.apache.inlong.sdk.dataproxy.pb.network.TcpChannelGroup.LOG     // Catch: java.lang.Throwable -> L6d
            java.lang.String r1 = "reconnect fail,channel:{}"
            r2 = r9
            r0.info(r1, r2)     // Catch: java.lang.Throwable -> L6d
            r0 = r9
            r1 = 1
            r0.setReconnectFail(r1)     // Catch: java.lang.Throwable -> L6d
            r0 = 0
            r9 = r0
        L65:
            int r10 = r10 + 1
            goto L4
        L6b:
            r0 = r9
            return r0
        L6d:
            r10 = move-exception
            org.slf4j.Logger r0 = org.apache.inlong.sdk.dataproxy.pb.network.TcpChannelGroup.LOG
            java.lang.String r1 = "bid:%s,get channel error:%s"
            r2 = 2
            java.lang.Object[] r2 = new java.lang.Object[r2]
            r3 = r2
            r4 = 0
            r5 = r7
            java.lang.String r5 = r5.bid
            r3[r4] = r5
            r3 = r2
            r4 = 1
            r5 = r10
            java.lang.String r5 = r5.getMessage()
            r3[r4] = r5
            java.lang.String r1 = java.lang.String.format(r1, r2)
            r2 = r10
            r0.error(r1, r2)
            r0 = r9
            if (r0 == 0) goto L99
            r0 = r8
            r1 = r9
            boolean r0 = r0.offer(r1)
        L99:
            r0 = 0
            return r0
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.inlong.sdk.dataproxy.pb.network.TcpChannelGroup.getTcpChannel(java.util.concurrent.LinkedBlockingQueue):org.apache.inlong.sdk.dataproxy.pb.network.TcpChannel");
    }

    private void reconnect(TcpChannel tcpChannel) {
        try {
            synchronized (tcpChannel) {
                Channel channel = tcpChannel.getChannel();
                if (channel == null || !channel.isOpen()) {
                    LOG.info("reconnect channel:{}", tcpChannel);
                    Channel channel2 = this.client.connect(tcpChannel.getIpPort().addr).await().getChannel();
                    tcpChannel.setChannel(channel2);
                    channel2.setAttachment(channel.getAttachment());
                    if (channel != null) {
                        channel.disconnect();
                        channel.close();
                    }
                }
            }
        } catch (Throwable th) {
            LOG.error("reconnect failed:" + th.getMessage(), th);
        }
    }

    public String getBid() {
        return this.bid;
    }

    public void close() {
        Iterator<LinkedBlockingQueue<TcpChannel>> it = this.channelQueues.iterator();
        while (it.hasNext()) {
            it.next().clear();
        }
        this.channelQueues.clear();
        Iterator<List<TcpChannel>> it2 = this.channelLists.iterator();
        while (it2.hasNext()) {
            for (TcpChannel tcpChannel : it2.next()) {
                LOG.info("TcpChannelGroup close bid:{},disconnect and close:{}", this.bid, tcpChannel);
                tcpChannel.close();
            }
        }
        this.channelMap.clear();
    }

    public void exceptionChannel(Channel channel) {
        TcpChannel tcpChannel = this.channelMap.get(channel.getAttachment());
        if (tcpChannel != null) {
            tcpChannel.setHasException(true);
        }
    }

    public void releaseChannel(Channel channel) {
        TcpChannel tcpChannel = this.channelMap.get(channel.getAttachment());
        if (tcpChannel != null) {
            tcpChannel.release();
        }
    }
}
