package org.apache.inlong.audit;

import java.io.Serializable;
import java.util.Calendar;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.StringJoiner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.lang3.ClassUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.inlong.audit.entity.AuditComponent;
import org.apache.inlong.audit.entity.AuditInformation;
import org.apache.inlong.audit.entity.AuditMetric;
import org.apache.inlong.audit.entity.FlowType;
import org.apache.inlong.audit.loader.SocketAddressListLoader;
import org.apache.inlong.audit.protocol.AuditApi;
import org.apache.inlong.audit.send.ProxyManager;
import org.apache.inlong.audit.send.SenderManager;
import org.apache.inlong.audit.util.AuditConfig;
import org.apache.inlong.audit.util.AuditDimensions;
import org.apache.inlong.audit.util.AuditManagerUtils;
import org.apache.inlong.audit.util.AuditValues;
import org.apache.inlong.audit.util.Config;
import org.apache.inlong.audit.util.RequestIdUtils;
import org.apache.inlong.audit.util.StatInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/AuditReporterImpl.class */
public class AuditReporterImpl implements Serializable {
    private static final long serialVersionUID = 1;
    private static final Logger LOGGER = LoggerFactory.getLogger(AuditReporterImpl.class);
    private static final String FIELD_SEPARATORS = ":";
    private static final long DEFAULT_AUDIT_VERSION = -1;
    private static final int BATCH_NUM = 100;
    private static final int PERIOD = 60000;
    public static final long DEFAULT_ISOLATE_KEY = 0;
    private SenderManager manager;
    private final ReentrantLock GLOBAL_LOCK = new ReentrantLock();
    private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> preStatMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> summaryStatMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Long, ConcurrentHashMap<String, StatInfo>> expiredStatMap = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Long, HashSet<String>> expiredKeyList = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<Long, Long> flushTime = new ConcurrentHashMap<>();
    private final Config config = new Config();
    private final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
    private int packageId = 1;
    private int dataId = 0;
    private volatile boolean initialized = false;
    private AtomicInteger flushStat = new AtomicInteger(0);
    private AuditConfig auditConfig = null;
    private SocketAddressListLoader loader = null;
    private int flushStatThreshold = BATCH_NUM;
    private boolean autoFlush = true;
    private AuditMetric auditMetric = new AuditMetric();

    public void setFlushStatThreshold(int i) {
        this.flushStatThreshold = i;
    }

    public void setAutoFlush(boolean z) {
        this.autoFlush = z;
    }

    private void init() {
        if (this.initialized) {
            return;
        }
        this.config.init();
        this.timeoutExecutor.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.inlong.audit.AuditReporterImpl.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AuditReporterImpl.this.loadIpPortList();
                    AuditReporterImpl.this.checkFlushTime();
                    if (AuditReporterImpl.this.autoFlush) {
                        AuditReporterImpl.this.flush(0L);
                    }
                } catch (Exception e) {
                    AuditReporterImpl.LOGGER.error("Audit run has exception!", e);
                }
            }
        }, 60000L, 60000L, TimeUnit.MILLISECONDS);
        if (this.auditConfig == null) {
            this.auditConfig = new AuditConfig();
        }
        this.manager = new SenderManager(this.auditConfig);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void loadIpPortList() {
        if (this.loader == null) {
            return;
        }
        try {
            List<String> loadSocketAddressList = this.loader.loadSocketAddressList();
            if (loadSocketAddressList != null && loadSocketAddressList.size() > 0) {
                HashSet<String> hashSet = new HashSet<>();
                hashSet.addAll(loadSocketAddressList);
                setAuditProxy(hashSet);
            }
        } catch (Exception e) {
            LOGGER.error(e.getMessage());
        }
    }

    public void setLoader(SocketAddressListLoader socketAddressListLoader) {
        this.loader = socketAddressListLoader;
    }

    public void setLoaderClass(String str) {
        if (StringUtils.isEmpty(str)) {
            return;
        }
        try {
            Object newInstance = ClassUtils.getClass(str).getDeclaredConstructor(new Class[0]).newInstance(new Object[0]);
            if (newInstance instanceof SocketAddressListLoader) {
                this.loader = (SocketAddressListLoader) newInstance;
                LOGGER.info("Audit list loader class:{}", str);
            }
        } catch (Throwable th) {
            LOGGER.error("Fail to init list loader class:{},error:{}", new Object[]{str, th.getMessage(), th});
        }
    }

    public void setAuditProxy(HashSet<String> hashSet) {
        checkInitStatus();
        ProxyManager.getInstance().setAuditProxy(hashSet);
    }

    public void setLocalIP(String str) {
        this.config.setLocalIP(str);
    }

    public void setAuditProxy(AuditComponent auditComponent, String str, String str2, String str3) {
        checkInitStatus();
        ProxyManager.getInstance().setManagerConfig(auditComponent, str, str2, str3);
    }

    private synchronized void checkInitStatus() {
        if (this.initialized) {
            return;
        }
        init();
        this.initialized = true;
    }

    public void setAuditConfig(AuditConfig auditConfig) {
        this.auditConfig = auditConfig;
        this.manager.setAuditConfig(auditConfig);
    }

    public void add(int i, String str, String str2, long j, long j2, long j3) {
        add(i, "-1", str, str2, j, j2, j3, DEFAULT_AUDIT_VERSION);
    }

    public void add(long j, int i, String str, String str2, String str3, long j2, long j3, long j4, long j5) {
        add(j, i, str, str2, str3, j2, j3, j4, System.currentTimeMillis() - j2, j5);
    }

    public void add(int i, String str, String str2, String str3, long j, long j2, long j3, long j4) {
        add(0L, i, str, str2, str3, j, j2, j3, (System.currentTimeMillis() - j) * j2, j4);
    }

    public void add(int i, String str, String str2, long j, long j2, long j3, long j4) {
        add(0L, i, "-1", str, str2, j, j2, j3, j4, DEFAULT_AUDIT_VERSION);
    }

    public void add(long j, int i, String str, String str2, String str3, long j2, long j3, long j4, long j5, long j6) {
        StringJoiner stringJoiner = new StringJoiner(FIELD_SEPARATORS);
        stringJoiner.add(String.valueOf(j2 / 60000));
        stringJoiner.add(str2);
        stringJoiner.add(str3);
        stringJoiner.add(String.valueOf(i));
        stringJoiner.add(str);
        stringJoiner.add(String.valueOf(j6));
        addByKey(j, stringJoiner.toString(), j3, j4, j5);
    }

    public void add(AuditDimensions auditDimensions, AuditValues auditValues) {
        StringJoiner stringJoiner = new StringJoiner(FIELD_SEPARATORS);
        stringJoiner.add(String.valueOf(auditDimensions.getLogTime() / 60000));
        stringJoiner.add(auditDimensions.getInlongGroupID());
        stringJoiner.add(auditDimensions.getInlongStreamID());
        stringJoiner.add(String.valueOf(auditDimensions.getAuditID()));
        stringJoiner.add(auditDimensions.getAuditTag());
        stringJoiner.add(String.valueOf(auditDimensions.getAuditVersion()));
        addByKey(auditDimensions.getIsolateKey(), stringJoiner.toString(), auditValues.getCount(), auditValues.getSize(), auditValues.getDelayTime());
    }

    private void addByKey(long j, String str, long j2, long j3, long j4) {
        if (null == this.preStatMap.get(Long.valueOf(j))) {
            this.preStatMap.putIfAbsent(Long.valueOf(j), new ConcurrentHashMap<>());
        }
        ConcurrentHashMap<String, StatInfo> concurrentHashMap = this.preStatMap.get(Long.valueOf(j));
        if (null == concurrentHashMap.get(str)) {
            concurrentHashMap.putIfAbsent(str, new StatInfo(0L, 0L, 0L));
        }
        StatInfo statInfo = concurrentHashMap.get(str);
        statInfo.count.addAndGet(j2);
        statInfo.size.addAndGet(j3);
        statInfo.delay.addAndGet(j4);
    }

    public synchronized void flush() {
        flush(0L);
    }

    public synchronized void flush(long j) {
        if (this.flushTime.putIfAbsent(Long.valueOf(j), Long.valueOf(System.currentTimeMillis())) != null || this.flushStat.addAndGet(1) > this.flushStatThreshold) {
            return;
        }
        long currentTimeMillis = System.currentTimeMillis();
        LOGGER.info("Audit flush isolate key {} ", Long.valueOf(j));
        try {
            try {
                this.manager.checkFailedData();
                resetStat();
                summaryExpiredStatMap(j);
                Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> it = this.preStatMap.entrySet().iterator();
                while (it.hasNext()) {
                    Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> next = it.next();
                    if (next.getValue().isEmpty()) {
                        LOGGER.info("Remove the key of pre stat map: {},isolate key: {} ", next.getKey(), Long.valueOf(j));
                        it.remove();
                    } else if (next.getKey().longValue() <= j) {
                        summaryPreStatMap(next.getKey().longValue(), next.getValue());
                        send(next.getKey().longValue());
                    }
                }
                clearExpiredKey(j);
                this.manager.closeSocket();
            } catch (Exception e) {
                LOGGER.error("Flush audit has exception!", e);
                this.manager.closeSocket();
            }
            LOGGER.info("Success report {} package, Failed report {} package, total {} message, memory size {}, cost: {} ms", new Object[]{this.auditMetric.getSuccessPack(), this.auditMetric.getFailedPack(), this.auditMetric.getTotalMsg(), this.auditMetric.getMemorySize(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis)});
            this.auditMetric.reset();
        } catch (Throwable th) {
            this.manager.closeSocket();
            throw th;
        }
    }

    private void sendByBaseCommand(AuditApi.AuditRequest auditRequest) {
        AuditApi.BaseCommand.Builder newBuilder = AuditApi.BaseCommand.newBuilder();
        newBuilder.setType(AuditApi.BaseCommand.Type.AUDIT_REQUEST).setAuditRequest(auditRequest).build();
        if (this.manager.send(newBuilder.build(), auditRequest)) {
            this.auditMetric.addSuccessPack(serialVersionUID);
        } else {
            this.auditMetric.addFailedPack(serialVersionUID);
        }
    }

    private void sumThreadGroup(long j, String str, StatInfo statInfo) {
        long andSet = statInfo.count.getAndSet(0L);
        if (0 == andSet) {
            return;
        }
        StatInfo computeIfAbsent = this.summaryStatMap.computeIfAbsent(Long.valueOf(j), l -> {
            return new ConcurrentHashMap();
        }).computeIfAbsent(str, str2 -> {
            return new StatInfo(0L, 0L, 0L);
        });
        computeIfAbsent.count.addAndGet(andSet);
        computeIfAbsent.size.addAndGet(statInfo.size.getAndSet(0L));
        computeIfAbsent.delay.addAndGet(statInfo.delay.getAndSet(0L));
    }

    private void resetStat() {
        this.dataId = 0;
        this.packageId = 1;
    }

    private void summaryExpiredStatMap(long j) {
        Iterator<Map.Entry<Long, ConcurrentHashMap<String, StatInfo>>> it = this.expiredStatMap.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, ConcurrentHashMap<String, StatInfo>> next = it.next();
            if (next.getValue().isEmpty()) {
                LOGGER.info("Remove the key of expired stat map: {},isolate key: {} ", next.getKey(), Long.valueOf(j));
                it.remove();
            } else if (next.getKey().longValue() <= j) {
                for (Map.Entry<String, StatInfo> entry : next.getValue().entrySet()) {
                    sumThreadGroup(j, entry.getKey(), entry.getValue());
                }
                next.getValue().clear();
            }
        }
    }

    private void summaryPreStatMap(long j, ConcurrentHashMap<String, StatInfo> concurrentHashMap) {
        HashSet<String> computeIfAbsent = this.expiredKeyList.computeIfAbsent(Long.valueOf(j), l -> {
            return new HashSet();
        });
        for (Map.Entry<String, StatInfo> entry : concurrentHashMap.entrySet()) {
            String key = entry.getKey();
            StatInfo value = entry.getValue();
            if (value.count.get() == 0) {
                computeIfAbsent.add(key);
            } else {
                sumThreadGroup(j, key, value);
            }
        }
    }

    private void clearExpiredKey(long j) {
        Iterator<Map.Entry<Long, HashSet<String>>> it = this.expiredKeyList.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, HashSet<String>> next = it.next();
            if (next.getValue().isEmpty()) {
                LOGGER.info("Remove the key of expired key list: {},isolate key: {}", next.getKey(), Long.valueOf(j));
                it.remove();
            } else if (next.getKey().longValue() <= j) {
                ConcurrentHashMap<String, StatInfo> concurrentHashMap = this.preStatMap.get(next.getKey());
                if (null == concurrentHashMap) {
                    it.remove();
                } else {
                    ConcurrentHashMap<String, StatInfo> computeIfAbsent = this.expiredStatMap.computeIfAbsent(next.getKey(), l -> {
                        return new ConcurrentHashMap();
                    });
                    Iterator<String> it2 = next.getValue().iterator();
                    while (it2.hasNext()) {
                        String next2 = it2.next();
                        computeIfAbsent.put(next2, concurrentHashMap.remove(next2));
                    }
                    next.getValue().clear();
                }
            }
        }
    }

    private void send(long j) {
        if (null == this.summaryStatMap.get(Long.valueOf(j))) {
            return;
        }
        if (this.summaryStatMap.get(Long.valueOf(j)).isEmpty()) {
            this.summaryStatMap.remove(Long.valueOf(j));
            return;
        }
        AuditApi.AuditMessageHeader build = AuditApi.AuditMessageHeader.newBuilder().setIp(this.config.getLocalIP()).setDockerId(this.config.getDockerId()).setThreadId(String.valueOf(Thread.currentThread().getId())).setSdkTs(Calendar.getInstance().getTimeInMillis()).setPacketId(this.packageId).build();
        AuditApi.AuditRequest.Builder newBuilder = AuditApi.AuditRequest.newBuilder();
        newBuilder.setMsgHeader(build);
        for (Map.Entry<String, StatInfo> entry : this.summaryStatMap.get(Long.valueOf(j)).entrySet()) {
            String[] split = entry.getKey().split(FIELD_SEPARATORS);
            if (split.length < 6) {
                LOGGER.error("Number of keys {} <6", Integer.valueOf(split.length));
            } else {
                try {
                    long parseLong = Long.parseLong(split[0]) * 60000;
                    long parseLong2 = Long.parseLong(split[5]);
                    String str = split[1];
                    String str2 = split[2];
                    String str3 = split[3];
                    String str4 = split[4];
                    StatInfo value = entry.getValue();
                    newBuilder.addMsgBody(AuditApi.AuditMessageBody.newBuilder().setLogTs(parseLong).setInlongGroupId(str).setInlongStreamId(str2).setAuditId(str3).setAuditTag(str4).setCount(value.count.get()).setSize(value.size.get()).setDelay(value.delay.get()).setAuditVersion(parseLong2).build());
                    this.auditMetric.addMemorySize(r0.toByteArray().length);
                    int i = this.dataId;
                    this.dataId = i + 1;
                    if (i >= BATCH_NUM) {
                        this.dataId = 0;
                        this.packageId++;
                        sendData(newBuilder);
                    }
                } catch (NumberFormatException e) {
                    LOGGER.error("Failed to parse long from string", e);
                }
            }
        }
        if (newBuilder.getMsgBodyCount() > 0) {
            sendData(newBuilder);
        }
        this.summaryStatMap.get(Long.valueOf(j)).clear();
    }

    private void sendData(AuditApi.AuditRequest.Builder builder) {
        builder.setRequestId(RequestIdUtils.nextRequestId().longValue());
        sendByBaseCommand(builder.build());
        this.auditMetric.addTotalMsg(builder.getMsgBodyCount());
        builder.clearMsgBody();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void checkFlushTime() {
        this.flushStat.set(0);
        long timeInMillis = Calendar.getInstance().getTimeInMillis();
        this.flushTime.forEach((l, l2) -> {
            if (timeInMillis - l2.longValue() > 60000) {
                this.flushTime.remove(l);
            }
        });
    }

    public int buildAuditId(AuditIdEnum auditIdEnum, boolean z, boolean z2, boolean z3, boolean z4) {
        return AuditManagerUtils.buildAuditId(auditIdEnum, z, z2, z3, z4);
    }

    public int buildSuccessfulAuditId(AuditIdEnum auditIdEnum) {
        return buildAuditId(auditIdEnum, true, true, false, false);
    }

    public int buildSuccessfulAuditId(AuditIdEnum auditIdEnum, boolean z) {
        return buildAuditId(auditIdEnum, true, z, false, false);
    }

    public int buildFailedAuditId(AuditIdEnum auditIdEnum) {
        return buildAuditId(auditIdEnum, false, true, false, false);
    }

    public int buildFailedAuditId(AuditIdEnum auditIdEnum, boolean z) {
        return buildAuditId(auditIdEnum, false, z, false, false);
    }

    public int buildDiscardAuditId(AuditIdEnum auditIdEnum) {
        return buildAuditId(auditIdEnum, true, true, true, false);
    }

    public int buildDiscardAuditId(AuditIdEnum auditIdEnum, boolean z) {
        return buildAuditId(auditIdEnum, true, z, true, false);
    }

    public int buildRetryAuditId(AuditIdEnum auditIdEnum) {
        return buildAuditId(auditIdEnum, true, true, false, true);
    }

    public int buildRetryAuditId(AuditIdEnum auditIdEnum, boolean z) {
        return buildAuditId(auditIdEnum, true, z, false, true);
    }

    public AuditInformation buildAuditInformation(String str, FlowType flowType, boolean z, boolean z2, boolean z3, boolean z4) {
        return AuditManagerUtils.buildAuditInformation(str, flowType, z, z2, z3, z4);
    }

    public List<AuditInformation> getAllAuditInformation() {
        return AuditManagerUtils.getAllAuditInformation();
    }

    public List<AuditInformation> getAllMetricInformation() {
        return AuditManagerUtils.getAllMetricInformation();
    }

    public List<AuditInformation> getAllAuditInformation(String str) {
        return AuditManagerUtils.getAllAuditInformation(str);
    }

    public int getStartAuditIdForMetric() {
        return AuditManagerUtils.getStartAuditIdForMetric();
    }

    public void setManagerTimeout(int i) {
        ProxyManager.getInstance().setManagerTimeout(i);
    }

    public void setAutoUpdateAuditProxy() {
        ProxyManager.getInstance().setAutoUpdateAuditProxy();
    }

    public void setUpdateInterval(int i) {
        ProxyManager.getInstance().setUpdateInterval(i);
    }

    public void setMaxGlobalAuditMemory(long j) {
        SenderManager.setMaxGlobalAuditMemory(j);
    }
}
