/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.nacos.naming.push;

import com.alibaba.nacos.api.naming.pojo.ServiceInfo;
import com.alibaba.nacos.api.remote.PushCallBack;
import com.alibaba.nacos.common.utils.JacksonUtils;
import com.alibaba.nacos.naming.constants.Constants;
import com.alibaba.nacos.naming.core.Service;
import com.alibaba.nacos.naming.core.v2.upgrade.UpgradeJudgement;
import com.alibaba.nacos.naming.misc.GlobalExecutor;
import com.alibaba.nacos.naming.misc.Loggers;
import com.alibaba.nacos.naming.misc.SwitchDomain;
import com.alibaba.nacos.naming.misc.UtilsAndCommons;
import com.alibaba.nacos.naming.monitor.MetricsMonitor;
import com.alibaba.nacos.naming.pojo.Subscriber;
import com.alibaba.nacos.naming.push.v1.ClientInfo;
import com.alibaba.nacos.naming.push.v1.NamingSubscriberServiceV1Impl;
import com.alibaba.nacos.naming.push.v1.PushClient;
import com.alibaba.nacos.naming.push.v1.ServiceChangeEvent;
import com.alibaba.nacos.naming.remote.udp.AckEntry;
import com.alibaba.nacos.naming.remote.udp.AckPacket;
import com.alibaba.nacos.naming.remote.udp.UdpConnector;
import com.alibaba.nacos.sys.utils.ApplicationUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.zip.GZIPOutputStream;
import org.apache.commons.collections.MapUtils;
import org.codehaus.jackson.util.VersionUtil;
import org.javatuples.Pair;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;

@Component
public class UdpPushService
implements ApplicationContextAware,
ApplicationListener<ServiceChangeEvent> {
    @Autowired
    private SwitchDomain switchDomain;
    @Autowired
    private NamingSubscriberServiceV1Impl subscriberServiceV1;
    private ApplicationContext applicationContext;
    private static volatile ConcurrentMap<String, AckEntry> ackMap = new ConcurrentHashMap<String, AckEntry>();
    private static volatile ConcurrentMap<String, Long> udpSendTimeMap = new ConcurrentHashMap<String, Long>();
    private static DatagramSocket udpSocket;
    private final UdpConnector udpConnector;
    private static ConcurrentMap<String, Future> futureMap;

    public UdpPushService(UdpConnector udpConnector) {
        this.udpConnector = udpConnector;
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public void onApplicationEvent(ServiceChangeEvent event) {
        if (((UpgradeJudgement)((Object)ApplicationUtils.getBean(UpgradeJudgement.class))).isUseGrpcFeatures()) {
            return;
        }
        Service service = event.getService();
        String serviceName = service.getName();
        String namespaceId = service.getNamespaceId();
        if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName))) {
            return;
        }
        ScheduledFuture<?> future = GlobalExecutor.scheduleUdpSender(() -> {
            try {
                Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
                ConcurrentMap clients = (ConcurrentMap)this.subscriberServiceV1.getClientMap().get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
                if (MapUtils.isEmpty((Map)clients)) {
                    return;
                }
                HashMap<String, Pair> cache = new HashMap<String, Pair>(16);
                long lastRefTime = System.nanoTime();
                for (PushClient client : clients.values()) {
                    AckEntry ackEntry;
                    if (client.zombie()) {
                        Loggers.PUSH.debug("client is zombie: " + client);
                        clients.remove(client.toString());
                        Loggers.PUSH.debug("client is zombie: " + client);
                        continue;
                    }
                    Loggers.PUSH.debug("push serviceName: {} to client: {}", (Object)serviceName, (Object)client);
                    String key = UdpPushService.getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                    byte[] compressData = null;
                    Map data = null;
                    if (this.switchDomain.getDefaultPushCacheMillis() >= 20000L && cache.containsKey(key)) {
                        Pair pair = (Pair)cache.get(key);
                        compressData = (byte[])pair.getValue0();
                        data = (Map)pair.getValue1();
                        Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", (Object)serviceName, (Object)client.getAddrStr());
                    }
                    if (compressData != null) {
                        ackEntry = UdpPushService.prepareAckEntry(client, compressData, data, lastRefTime);
                    } else {
                        ackEntry = UdpPushService.prepareAckEntry(client, UdpPushService.prepareHostsData(client), lastRefTime);
                        if (ackEntry != null) {
                            cache.put(key, new Pair((Object)ackEntry.getOrigin().getData(), ackEntry.getData()));
                        }
                    }
                    Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", new Object[]{client.getServiceName(), client.getAddrStr(), client.getAgent(), ackEntry == null ? null : ackEntry.getKey()});
                    UdpPushService.udpPush(ackEntry);
                }
            }
            catch (Exception e) {
                Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", (Object)serviceName, (Object)e);
            }
            finally {
                futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            }
        }, 1000L, TimeUnit.MILLISECONDS);
        futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    }

    public void pushDataWithoutCallback(Subscriber subscriber, ServiceInfo serviceInfo) {
        String serviceName = subscriber.getServiceName();
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            AckEntry ackEntry = this.prepareAckEntry(subscriber, serviceInfo);
            Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", new Object[]{serviceInfo, subscriber.getAddrStr(), subscriber.getAgent(), ackEntry == null ? null : ackEntry.getKey()});
            this.udpConnector.sendData(ackEntry);
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", (Object)serviceName, (Object)e);
        }
    }

    public void pushDataWithCallback(Subscriber subscriber, ServiceInfo serviceInfo, PushCallBack pushCallBack) {
        String serviceName = subscriber.getServiceName();
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            AckEntry ackEntry = this.prepareAckEntry(subscriber, serviceInfo);
            Loggers.PUSH.info("serviceName: {} changed, schedule push for: {}, agent: {}, key: {}", new Object[]{serviceInfo, subscriber.getAddrStr(), subscriber.getAgent(), ackEntry == null ? null : ackEntry.getKey()});
            this.udpConnector.sendDataWithCallback(ackEntry, pushCallBack);
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push serviceName: {} to client, error: {}", (Object)serviceName, (Object)e);
        }
    }

    private AckEntry prepareAckEntry(Subscriber subscriber, ServiceInfo serviceInfo) {
        InetSocketAddress socketAddress = new InetSocketAddress(subscriber.getIp(), subscriber.getPort());
        long lastRefTime = System.nanoTime();
        return UdpPushService.prepareAckEntry(socketAddress, UdpPushService.prepareHostsData(JacksonUtils.toJson((Object)serviceInfo)), lastRefTime);
    }

    private static AckEntry prepareAckEntry(PushClient client, Map<String, Object> data, long lastRefTime) {
        return UdpPushService.prepareAckEntry(client.getSocketAddr(), data, lastRefTime);
    }

    private static AckEntry prepareAckEntry(InetSocketAddress socketAddress, Map<String, Object> data, long lastRefTime) {
        if (MapUtils.isEmpty(data)) {
            Loggers.PUSH.error("[NACOS-PUSH] pushing empty data for client is not allowed: {}", (Object)socketAddress);
            return null;
        }
        data.put("lastRefTime", lastRefTime);
        String dataStr = JacksonUtils.toJson(data);
        try {
            byte[] dataBytes = dataStr.getBytes(StandardCharsets.UTF_8);
            dataBytes = UdpPushService.compressIfNecessary(dataBytes);
            return UdpPushService.prepareAckEntry(socketAddress, dataBytes, data, lastRefTime);
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to compress data: {} to client: {}, error: {}", new Object[]{data, socketAddress, e});
            return null;
        }
    }

    private static AckEntry prepareAckEntry(PushClient client, byte[] dataBytes, Map<String, Object> data, long lastRefTime) {
        return UdpPushService.prepareAckEntry(client.getSocketAddr(), dataBytes, data, lastRefTime);
    }

    private static AckEntry prepareAckEntry(InetSocketAddress socketAddress, byte[] dataBytes, Map<String, Object> data, long lastRefTime) {
        String key = AckEntry.getAckKey(socketAddress.getAddress().getHostAddress(), socketAddress.getPort(), lastRefTime);
        try {
            DatagramPacket packet = new DatagramPacket(dataBytes, dataBytes.length, socketAddress);
            AckEntry ackEntry = new AckEntry(key, packet);
            ackEntry.setData(data);
            return ackEntry;
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to prepare data: {} to client: {}, error: {}", new Object[]{data, socketAddress, e});
            return null;
        }
    }

    public static String getPushCacheKey(String serviceName, String clientIP, String agent) {
        return serviceName + "@@@@" + agent;
    }

    public void serviceChanged(Service service) {
        this.applicationContext.publishEvent((ApplicationEvent)new ServiceChangeEvent(this, service));
    }

    public boolean canEnablePush(String agent) {
        if (!this.switchDomain.isPushEnabled()) {
            return false;
        }
        ClientInfo clientInfo = new ClientInfo(agent);
        if (ClientInfo.ClientType.JAVA == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushJavaVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.DNS == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushPythonVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.C == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushCVersion())) >= 0) {
            return true;
        }
        if (ClientInfo.ClientType.GO == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushGoVersion())) >= 0) {
            return true;
        }
        return ClientInfo.ClientType.CSHARP == clientInfo.type && clientInfo.version.compareTo(VersionUtil.parseVersion((String)this.switchDomain.getPushCSharpVersion())) >= 0;
    }

    public static List<AckEntry> getFailedPushes() {
        return new ArrayList<AckEntry>(ackMap.values());
    }

    public static void resetPushState() {
        ackMap.clear();
    }

    private static byte[] compressIfNecessary(byte[] dataBytes) throws IOException {
        int maxDataSizeUncompress = 1024;
        if (dataBytes.length < maxDataSizeUncompress) {
            return dataBytes;
        }
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        GZIPOutputStream gzip = new GZIPOutputStream(out);
        gzip.write(dataBytes);
        gzip.close();
        return out.toByteArray();
    }

    private static Map<String, Object> prepareHostsData(PushClient client) throws Exception {
        return UdpPushService.prepareHostsData(client.getDataSource().getData(client));
    }

    private static Map<String, Object> prepareHostsData(String dataContent) {
        HashMap<String, Object> result = new HashMap<String, Object>(2);
        result.put("type", "dom");
        result.put("data", dataContent);
        return result;
    }

    private static AckEntry udpPush(AckEntry ackEntry) {
        if (ackEntry == null) {
            Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
            return null;
        }
        if (ackEntry.getRetryTimes() > 1) {
            Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", (Object)ackEntry.getRetryTimes(), (Object)ackEntry.getKey());
            ackMap.remove(ackEntry.getKey());
            udpSendTimeMap.remove(ackEntry.getKey());
            MetricsMonitor.incrementFailPush();
            return ackEntry;
        }
        try {
            if (!ackMap.containsKey(ackEntry.getKey())) {
                MetricsMonitor.incrementPush();
            }
            ackMap.put(ackEntry.getKey(), ackEntry);
            udpSendTimeMap.put(ackEntry.getKey(), System.currentTimeMillis());
            Loggers.PUSH.info("send udp packet: " + ackEntry.getKey());
            udpSocket.send(ackEntry.getOrigin());
            ackEntry.increaseRetryTime();
            GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry), TimeUnit.NANOSECONDS.toMillis(Constants.ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
            return ackEntry;
        }
        catch (Exception e) {
            Loggers.PUSH.error("[NACOS-PUSH] failed to push data: {} to client: {}, error: {}", new Object[]{ackEntry.getData(), ackEntry.getOrigin().getAddress().getHostAddress(), e});
            ackMap.remove(ackEntry.getKey());
            udpSendTimeMap.remove(ackEntry.getKey());
            MetricsMonitor.incrementFailPush();
            return null;
        }
    }

    static {
        futureMap = new ConcurrentHashMap<String, Future>();
        try {
            udpSocket = new DatagramSocket();
            Receiver receiver = new Receiver();
            Thread inThread = new Thread(receiver);
            inThread.setDaemon(true);
            inThread.setName("com.alibaba.nacos.naming.push.receiver");
            inThread.start();
        }
        catch (SocketException e) {
            Loggers.SRV_LOG.error("[NACOS-PUSH] failed to init push service");
        }
    }

    public static class Receiver
    implements Runnable {
        @Override
        public void run() {
            while (true) {
                byte[] buffer = new byte[65536];
                DatagramPacket packet = new DatagramPacket(buffer, buffer.length);
                try {
                    udpSocket.receive(packet);
                    String json = new String(packet.getData(), 0, packet.getLength(), StandardCharsets.UTF_8).trim();
                    AckPacket ackPacket = (AckPacket)JacksonUtils.toObj((String)json, AckPacket.class);
                    InetSocketAddress socketAddress = (InetSocketAddress)packet.getSocketAddress();
                    String ip = socketAddress.getAddress().getHostAddress();
                    int port = socketAddress.getPort();
                    if (System.nanoTime() - ackPacket.lastRefTime > Constants.ACK_TIMEOUT_NANOS) {
                        Loggers.PUSH.warn("ack takes too long from {} ack json: {}", (Object)packet.getSocketAddress(), (Object)json);
                    }
                    String ackKey = AckEntry.getAckKey(ip, port, ackPacket.lastRefTime);
                    AckEntry ackEntry = (AckEntry)ackMap.remove(ackKey);
                    if (ackEntry == null) {
                        throw new IllegalStateException("unable to find ackEntry for key: " + ackKey + ", ack json: " + json);
                    }
                    long pushCost = System.currentTimeMillis() - (Long)udpSendTimeMap.get(ackKey);
                    Loggers.PUSH.info("received ack: {} from: {}:{}, cost: {} ms, unacked: {}, total push: {}", new Object[]{json, ip, port, pushCost, ackMap.size(), MetricsMonitor.getTotalPushMonitor().get()});
                    MetricsMonitor.incrementPushCost(pushCost);
                    udpSendTimeMap.remove(ackKey);
                    continue;
                }
                catch (Throwable e) {
                    Loggers.PUSH.error("[NACOS-PUSH] error while receiving ack data", e);
                    continue;
                }
                break;
            }
        }
    }

    public static class Retransmitter
    implements Runnable {
        AckEntry ackEntry;

        public Retransmitter(AckEntry ackEntry) {
            this.ackEntry = ackEntry;
        }

        @Override
        public void run() {
            if (ackMap.containsKey(this.ackEntry.getKey())) {
                Loggers.PUSH.info("retry to push data, key: " + this.ackEntry.getKey());
                UdpPushService.udpPush(this.ackEntry);
            }
        }
    }
}

