package org.apache.inlong.sdk.dataproxy.pb;

import com.alibaba.fastjson.JSON;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.inlong.sdk.dataproxy.pb.context.ProfileEvent;
import org.apache.inlong.sdk.dataproxy.pb.context.SdkSinkContext;
import org.apache.inlong.sdk.dataproxy.pb.dispatch.DispatchManager;
import org.apache.inlong.sdk.dataproxy.pb.dispatch.DispatchProfile;
import org.apache.inlong.sdk.dataproxy.pb.network.IpPort;
import org.jboss.netty.channel.ChannelException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/pb/ProxySdkSink.class */
public class ProxySdkSink extends AbstractSink implements Configurable {
    public static final Logger LOG = LoggerFactory.getLogger(ProxySdkSink.class);
    private Context parentContext;
    private SdkSinkContext context;
    private DispatchManager dispatchManager;
    protected Timer sinkTimer;
    protected Timer processTimer;
    private LinkedBlockingQueue<DispatchProfile> dispatchQueue = new LinkedBlockingQueue<>();
    private final ConcurrentHashMap<String, SdkProxyChannelManager> proxyManagers = new ConcurrentHashMap<>();
    private final List<SdkProxyChannelManager> deletingProxyManager = new ArrayList();

    public void start() {
        try {
            this.context = new SdkSinkContext(this.parentContext, getChannel());
            if (getChannel() == null) {
                LOG.error("channel is null");
            }
            this.context.start();
            this.dispatchManager = new DispatchManager(this.parentContext, this.dispatchQueue);
            reload();
            setReloadTimer();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
        super.start();
    }

    protected void setReloadTimer() {
        this.sinkTimer = new Timer(true);
        this.sinkTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sdk.dataproxy.pb.ProxySdkSink.1
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                ProxySdkSink.this.reload();
            }
        }, new Date(System.currentTimeMillis() + this.context.getReloadInterval()), this.context.getReloadInterval());
        this.sinkTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sdk.dataproxy.pb.ProxySdkSink.2
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                ProxySdkSink.this.dispatchManager.setNeedOutputOvertimeData();
            }
        }, new Date(System.currentTimeMillis() + this.dispatchManager.getDispatchTimeout()), this.dispatchManager.getDispatchTimeout());
        this.processTimer = new Timer(true);
        this.processTimer.schedule(new TimerTask() { // from class: org.apache.inlong.sdk.dataproxy.pb.ProxySdkSink.3
            @Override // java.util.TimerTask, java.lang.Runnable
            public void run() {
                try {
                    ProxySdkSink.this.outputProxyQueue();
                } catch (Exception e) {
                    ProxySdkSink.LOG.error(e.getMessage(), e);
                }
            }
        }, new Date(System.currentTimeMillis() + this.context.getProcessInterval()), this.context.getProcessInterval());
    }

    public void reload() {
        try {
            LOG.info("All proxy managers start status,proxy size:{},proxys:{},metricItemSize:{}", new Object[]{Integer.valueOf(this.proxyManagers.size()), this.proxyManagers.keySet(), Integer.valueOf(this.context.getMetricItemSet().getItemMap().size())});
            Iterator<SdkProxyChannelManager> it = this.deletingProxyManager.iterator();
            while (it.hasNext()) {
                it.next().close();
            }
            this.deletingProxyManager.clear();
            for (Map.Entry<String, Set<IpPort>> entry : this.context.getProxyIpListMap().entrySet()) {
                if (!this.proxyManagers.containsKey(entry.getKey())) {
                    SdkProxyChannelManager sdkProxyChannelManager = new SdkProxyChannelManager(entry.getKey(), this.context);
                    this.proxyManagers.put(entry.getKey(), sdkProxyChannelManager);
                    sdkProxyChannelManager.start();
                }
            }
            HashSet hashSet = new HashSet();
            for (Map.Entry<String, SdkProxyChannelManager> entry2 : this.proxyManagers.entrySet()) {
                if (!this.context.getProxyIpListMap().containsKey(entry2.getKey())) {
                    hashSet.add(entry2.getKey());
                }
            }
            Iterator it2 = hashSet.iterator();
            while (it2.hasNext()) {
                this.deletingProxyManager.add(this.proxyManagers.remove((String) it2.next()));
            }
            LOG.info("All proxy managers end status,proxy size:{},proxys:{},metricItemSize:{}", new Object[]{Integer.valueOf(this.proxyManagers.size()), this.proxyManagers.keySet(), Integer.valueOf(this.context.getMetricItemSet().getItemMap().size())});
        } catch (Throwable th) {
            LOG.error(th.getMessage(), th);
        }
    }

    public void stop() {
        try {
            Iterator<Map.Entry<String, SdkProxyChannelManager>> it = this.proxyManagers.entrySet().iterator();
            while (it.hasNext()) {
                it.next().getValue().close();
            }
            this.sinkTimer.cancel();
            this.context.close();
            super.stop();
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
        }
    }

    public void configure(Context context) {
        LOG.info("start to configure:{}, context:{}.", getClass().getSimpleName(), context.toString());
        this.parentContext = context;
    }

    public Sink.Status process() throws EventDeliveryException {
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            try {
                Event take = channel.take();
                if (take == null) {
                    transaction.commit();
                    Sink.Status status = Sink.Status.BACKOFF;
                    transaction.close();
                    return status;
                }
                if (!(take instanceof ProfileEvent)) {
                    transaction.commit();
                    this.context.addSendFailMetric();
                    Sink.Status status2 = Sink.Status.READY;
                    transaction.close();
                    return status2;
                }
                this.dispatchManager.addEvent((ProfileEvent) take);
                transaction.commit();
                Sink.Status status3 = Sink.Status.READY;
                transaction.close();
                return status3;
            } catch (Throwable th) {
                LOG.error("Process event failed!" + getName(), th);
                try {
                    transaction.rollback();
                } catch (Throwable th2) {
                    LOG.error("Channel take transaction rollback exception:" + getName(), th2);
                }
                Sink.Status status4 = Sink.Status.BACKOFF;
                transaction.close();
                return status4;
            }
        } catch (Throwable th3) {
            transaction.close();
            throw th3;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void outputProxyQueue() {
        DispatchProfile poll = this.dispatchQueue.poll();
        while (true) {
            DispatchProfile dispatchProfile = poll;
            if (dispatchProfile == null) {
                return;
            }
            String uid = dispatchProfile.getUid();
            String proxyClusterId = this.context.getProxyClusterId(uid);
            if (proxyClusterId == null) {
                LOG.error("can not find uid:{}", uid);
                this.context.addSendResultMetric(dispatchProfile, uid, false, 0L);
                ChannelException channelException = new ChannelException(String.format("can not find proxyClusterId:%s", uid));
                dispatchProfile.getEvents().forEach(profileEvent -> {
                    try {
                        profileEvent.getProfile().getCallback().onException(channelException);
                    } catch (Exception e) {
                        LOG.error(e.getMessage(), e);
                    }
                });
                poll = this.dispatchQueue.poll();
            } else {
                SdkProxyChannelManager sdkProxyChannelManager = this.proxyManagers.get(proxyClusterId);
                if (sdkProxyChannelManager == null) {
                    LOG.error("can not find proxy:{},proxyManagers:{}", proxyClusterId, JSON.toJSONString(this.proxyManagers));
                    this.context.addSendResultMetric(dispatchProfile, uid, false, 0L);
                    ChannelException channelException2 = new ChannelException(String.format("can not find proxyClusterId manager:%s", proxyClusterId));
                    dispatchProfile.getEvents().forEach(profileEvent2 -> {
                        try {
                            profileEvent2.getProfile().getCallback().onException(channelException2);
                        } catch (Exception e) {
                            LOG.error(e.getMessage(), e);
                        }
                    });
                    poll = this.dispatchQueue.poll();
                } else {
                    sdkProxyChannelManager.offerDispatchQueue(dispatchProfile);
                    poll = this.dispatchQueue.poll();
                }
            }
        }
    }
}
