package org.apache.inlong.sdk.dataproxy.pb;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.inlong.sdk.commons.protocol.ProxySdk;
import org.apache.inlong.sdk.dataproxy.SendResult;
import org.apache.inlong.sdk.dataproxy.pb.context.SdkProfile;
import org.apache.inlong.sdk.dataproxy.pb.context.SdkSinkContext;
import org.apache.inlong.sdk.dataproxy.pb.dispatch.DispatchProfile;
import org.apache.inlong.sdk.dataproxy.pb.network.IpPort;
import org.apache.inlong.sdk.dataproxy.pb.network.TcpChannelGroup;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/pb/SdkProxyChannelManager.class */
public class SdkProxyChannelManager {
    public static final Logger LOG = LoggerFactory.getLogger(SdkProxyChannelManager.class);
    public static final int DEFAULT_LENGTH_FIELD_OFFSET = 0;
    public static final int DEFAULT_LENGTH_FIELD_LENGTH = 4;
    public static final int DEFAULT_LENGTH_ADJUSTMENT = -4;
    public static final int DEFAULT_INITIAL_BYTES_TO_STRIP = 0;
    public static final boolean DEFAULT_FAIL_FAST = true;
    private String proxyClusterId;
    private SdkSinkContext context;
    private Timer reloadTimer;
    private TcpChannelGroup sender;
    private LinkedBlockingQueue<DispatchProfile> proxyDispatchQueue = new LinkedBlockingQueue<>();
    private AtomicLong offerCounter = new AtomicLong(0);
    private ConcurrentHashMap<String, AtomicLong> offerGroupCounter = new ConcurrentHashMap<>();
    private AtomicLong takeCounter = new AtomicLong(0);
    private List<SdkChannelWorker> workers = new ArrayList();
    private AtomicLong sdkPackId = new AtomicLong(RandomUtils.nextLong());
    private ConcurrentHashMap<Long, SdkProfile> profileMap = new ConcurrentHashMap<>();

    public SdkProxyChannelManager(String str, SdkSinkContext sdkSinkContext) {
        this.proxyClusterId = str;
        this.context = sdkSinkContext;
    }

    public void start() {
        try {
            LOG.info("start to SdkProxyChannelManager:{}", this.proxyClusterId);
            this.sender = new TcpChannelGroup(this.proxyClusterId, this.context.getMaxThreads(), new LengthFieldBasedFrameDecoder(SdkSinkContext.MAX_RESPONSE_LENGTH, 0, 4, -4, 0, true), new SdkSenderClientHandler(this));
            for (int i = 0; i < this.context.getMaxThreads(); i++) {
                SdkChannelWorker sdkChannelWorker = new SdkChannelWorker(this, i);
                this.workers.add(sdkChannelWorker);
                sdkChannelWorker.start();
            }
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOG.error("proxyClusterId:{},error:{}", this.proxyClusterId, e);
        }
    }

    private void setReloadTimer() {
        this.reloadTimer = new Timer(true);
        this.reloadTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sdk.dataproxy.pb.SdkProxyChannelManager.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    SdkProxyChannelManager.this.reload();
                } catch (Exception e) {
                    SdkProxyChannelManager.LOG.error("proxyClusterId:{},error:{}", SdkProxyChannelManager.this.proxyClusterId, e);
                }
            }
        }, new Date(System.currentTimeMillis() + this.context.getReloadInterval()), this.context.getReloadInterval());
    }

    public long nextPackId() {
        return this.sdkPackId.getAndIncrement();
    }

    public void reload() {
        try {
            this.sender.updateConfig(this.context.getProxyIpListMap().get(this.proxyClusterId));
            clearTimeoutPack();
        } catch (Exception e) {
            LOG.error("proxyClusterId:{},error:{}", this.proxyClusterId, e);
        }
    }

    private void clearTimeoutPack() {
        LOG.info("ProxyClusterIdChannelManager clearTimeoutPack proxyClusterId:{},queueSize:{},waitingSize:{},offerCount:{},takeCount:{},offerGroupCount:{}", new Object[]{this.proxyClusterId, Integer.valueOf(this.proxyDispatchQueue.size()), Integer.valueOf(this.profileMap.size()), Long.valueOf(this.offerCounter.getAndSet(0L)), Long.valueOf(this.takeCounter.getAndSet(0L)), this.offerGroupCounter});
        this.offerGroupCounter.clear();
        long currentTimeMillis = System.currentTimeMillis();
        ArrayList arrayList = new ArrayList(this.profileMap.size());
        for (Map.Entry<Long, SdkProfile> entry : this.profileMap.entrySet()) {
            if (currentTimeMillis - entry.getValue().getSendTime() > this.context.getSdkPackTimeout()) {
                arrayList.add(entry.getKey());
            }
        }
        if (arrayList.size() > 0) {
            LOG.info("clearTimeoutPack timeoutSize:{}", Integer.valueOf(arrayList.size()));
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                SdkProfile remove = this.profileMap.remove((Long) it.next());
                if (remove != null) {
                    offerDispatchQueue(remove.getDispatchProfile());
                }
            }
        }
    }

    public void putWaitCompletedProfile(SdkProfile sdkProfile) {
        this.profileMap.put(Long.valueOf(sdkProfile.getSdkPackId()), sdkProfile);
    }

    public void removeWaitCompletedProfile(SdkProfile sdkProfile) {
        this.profileMap.remove(Long.valueOf(sdkProfile.getSdkPackId()));
    }

    public void setChannelException(Channel channel) {
        SocketAddress remoteAddress = channel.getRemoteAddress();
        if (remoteAddress instanceof InetSocketAddress) {
            IpPort ipPort = new IpPort((InetSocketAddress) remoteAddress);
            ArrayList arrayList = new ArrayList(this.profileMap.size());
            for (Map.Entry<Long, SdkProfile> entry : this.profileMap.entrySet()) {
                if (ipPort.equals(entry.getValue().getIpPort())) {
                    arrayList.add(entry.getKey());
                }
            }
            LOG.warn("proxyClusterId:{},clear channel:local:{},remote:{},profile size:{}", new Object[]{this.proxyClusterId, channel.getLocalAddress(), channel.getRemoteAddress(), Integer.valueOf(arrayList.size())});
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                SdkProfile remove = this.profileMap.remove((Long) it.next());
                if (remove != null) {
                    offerDispatchQueue(remove.getDispatchProfile());
                    this.context.addSendResultMetric(remove.getDispatchProfile(), this.proxyClusterId, false, remove.getSendTime());
                }
            }
            this.sender.exceptionChannel(channel);
            this.sender.releaseChannel(channel);
        }
    }

    public static InetSocketAddress parseInetSocketAddress(Channel channel) {
        return channel.getRemoteAddress() instanceof InetSocketAddress ? (InetSocketAddress) channel.getRemoteAddress() : channel.getRemoteAddress() != null ? new InetSocketAddress(channel.getRemoteAddress().toString(), 0) : new InetSocketAddress("127.0.0.1", 0);
    }

    public void onMessageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) {
        if (!(messageEvent.getMessage() instanceof ChannelBuffer)) {
            LOG.error("proxyClusterId:{},onMessageReceived e.getMessage:{}", this.proxyClusterId, messageEvent.getMessage());
            setChannelException(messageEvent.getChannel());
            return;
        }
        ChannelBuffer channelBuffer = (ChannelBuffer) messageEvent.getMessage();
        int readInt = channelBuffer.readInt();
        short readShort = channelBuffer.readShort();
        if (readShort != 1) {
            LOG.error("proxyClusterId:{},Error result from:{},error pack version:{}", new Object[]{this.proxyClusterId, String.valueOf(messageEvent.getChannel().getRemoteAddress()), Integer.valueOf(readShort)});
            setChannelException(messageEvent.getChannel());
            return;
        }
        byte[] bArr = new byte[readInt - 2];
        channelBuffer.readBytes(bArr);
        try {
            ProxySdk.ResponseInfo parseFrom = ProxySdk.ResponseInfo.parseFrom(bArr);
            ProxySdk.ResultCode result = parseFrom.getResult();
            if (result != ProxySdk.ResultCode.SUCCUSS) {
                LOG.error("proxyClusterId:{},Error result from:{},resultCode:{}", new Object[]{this.proxyClusterId, String.valueOf(messageEvent.getChannel().getRemoteAddress()), result.toString()});
                setChannelException(messageEvent.getChannel());
                return;
            }
            long packId = parseFrom.getPackId();
            SdkProfile remove = this.profileMap.remove(Long.valueOf(packId));
            if (remove != null) {
                onMessageOK(remove, messageEvent.getChannel());
            } else {
                LOG.error("proxyClusterId:%s,Can not find MessageDispatchProfile by sdkPackId:%d,from:%s", new Object[]{this.proxyClusterId, Long.valueOf(packId), String.valueOf(messageEvent.getChannel().getRemoteAddress())});
                setChannelException(messageEvent.getChannel());
            }
        } catch (Exception e) {
            LOG.error("proxyClusterId:{},Error result from:{},parseFrom exception:{}", new Object[]{this.proxyClusterId, String.valueOf(messageEvent.getChannel().getRemoteAddress()), e});
            setChannelException(messageEvent.getChannel());
        }
    }

    private void onMessageOK(SdkProfile sdkProfile, Channel channel) {
        this.context.addSendResultMetric(sdkProfile.getDispatchProfile(), this.proxyClusterId, true, sdkProfile.getSendTime());
        this.sender.releaseChannel(channel);
        sdkProfile.getDispatchProfile().getEvents().forEach(profileEvent -> {
            try {
                profileEvent.getProfile().getCallback().onMessageAck(SendResult.OK);
            } catch (Exception e) {
                LOG.error(e.getMessage(), e);
            }
        });
    }

    public void close() {
        LOG.info("begin to close proxyClusterId:{}.", this.proxyClusterId);
        this.reloadTimer.cancel();
        Iterator<SdkChannelWorker> it = this.workers.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
        this.workers.clear();
        this.profileMap.clear();
        this.sender.close();
        LOG.info("end to close proxyClusterId:{}.", this.proxyClusterId);
    }

    public String getProxyClusterId() {
        return this.proxyClusterId;
    }

    public SdkSinkContext getContext() {
        return this.context;
    }

    public boolean offerDispatchQueue(DispatchProfile dispatchProfile) {
        this.offerCounter.incrementAndGet();
        this.offerGroupCounter.computeIfAbsent(new Exception().getStackTrace()[1].toString(), str -> {
            return new AtomicLong(0L);
        }).incrementAndGet();
        return this.proxyDispatchQueue.offer(dispatchProfile);
    }

    public DispatchProfile takeDispatchQueue() throws InterruptedException {
        this.takeCounter.incrementAndGet();
        return this.proxyDispatchQueue.take();
    }

    public TcpChannelGroup getSender() {
        return this.sender;
    }
}
