package org.apache.inlong.sdk.dataproxy.pb;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.flume.Context;
import org.apache.flume.Transaction;
import org.apache.flume.conf.Configurable;
import org.apache.flume.lifecycle.LifecycleState;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.sdk.commons.protocol.SdkEvent;
import org.apache.inlong.sdk.dataproxy.MessageSender;
import org.apache.inlong.sdk.dataproxy.common.SendMessageCallback;
import org.apache.inlong.sdk.dataproxy.common.SendResult;
import org.apache.inlong.sdk.dataproxy.network.ProxysdkException;
import org.apache.inlong.sdk.dataproxy.pb.channel.BufferQueueChannel;
import org.apache.inlong.sdk.dataproxy.pb.context.CallbackProfile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/sdk/dataproxy/pb/PbProtocolMessageSender.class */
public class PbProtocolMessageSender implements MessageSender, Configurable {
    public static final Logger LOG = LoggerFactory.getLogger(PbProtocolMessageSender.class);
    private String name;
    private String localIp;
    private LifecycleState lifecycleState = LifecycleState.IDLE;
    private Context context;
    private BufferQueueChannel channel;
    private ProxySdkSink sink;

    public PbProtocolMessageSender(String str) {
        this.name = str == null ? PbProtocolMessageSender.class.getSimpleName() + "-" + Thread.currentThread().getName() : str;
        this.localIp = NetworkUtils.getLocalIp();
    }

    public void start() {
        this.lifecycleState = LifecycleState.START;
        this.channel.start();
        this.sink.start();
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void close() {
        this.lifecycleState = LifecycleState.STOP;
        this.sink.stop();
        this.channel.stop();
    }

    public void configure(Context context) {
        this.context = context;
        this.channel = new BufferQueueChannel();
        this.channel.setName(this.name + "-channel");
        this.channel.configure(context);
        this.sink = new ProxySdkSink();
        this.sink.setName(this.name + "-sink");
        this.sink.configure(context);
        this.sink.setChannel(this.channel);
    }

    private void put(CallbackProfile callbackProfile) {
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        try {
            try {
                this.channel.put(callbackProfile);
                transaction.commit();
                transaction.close();
            } catch (Exception e) {
                LOG.error("Put event failed:{}", e);
                try {
                    transaction.rollback();
                } catch (Throwable th) {
                    LOG.error("Channel take transaction rollback exception:" + this.name, th);
                }
                throw e;
            }
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    private void putAll(List<CallbackProfile> list) {
        Transaction transaction = this.channel.getTransaction();
        transaction.begin();
        try {
            try {
                list.forEach(callbackProfile -> {
                    this.channel.put(callbackProfile);
                });
                transaction.commit();
                transaction.close();
            } catch (Exception e) {
                LOG.error("Put event failed:{}", e);
                try {
                    transaction.rollback();
                } catch (Throwable th) {
                    LOG.error("Channel take transaction rollback exception:" + this.name, th);
                }
                throw e;
            }
        } catch (Throwable th2) {
            transaction.close();
            throw th2;
        }
    }

    public LifecycleState getLifecycleState() {
        return this.lifecycleState;
    }

    public Context getContext() {
        return this.context;
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, TimeUnit timeUnit) {
        return SendResult.INVALID_ATTRIBUTES;
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) {
        return sendMessage(bArr, str, str2, j, str3, j2, timeUnit, (Map<String, String>) null);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) {
        SdkEvent sdkEvent = new SdkEvent();
        sdkEvent.setInlongGroupId(str);
        sdkEvent.setInlongStreamId(str2);
        sdkEvent.setBody(bArr);
        sdkEvent.setMsgTime(j);
        sdkEvent.setSourceIp(this.localIp);
        if (map != null) {
            sdkEvent.setHeaders(map);
        }
        final CountDownLatch countDownLatch = new CountDownLatch(1);
        final AtomicReference atomicReference = new AtomicReference();
        put(new CallbackProfile(sdkEvent, new SendMessageCallback() { // from class: org.apache.inlong.sdk.dataproxy.pb.PbProtocolMessageSender.1
            @Override // org.apache.inlong.sdk.dataproxy.common.SendMessageCallback
            public void onMessageAck(SendResult sendResult) {
                atomicReference.set(SendResult.OK);
                countDownLatch.countDown();
            }

            @Override // org.apache.inlong.sdk.dataproxy.common.SendMessageCallback
            public void onException(Throwable th) {
                PbProtocolMessageSender.LOG.error(th.getMessage(), th);
                atomicReference.set(SendResult.CONNECTION_BREAK);
                countDownLatch.countDown();
            }
        }));
        try {
            if (!countDownLatch.await(j2, timeUnit)) {
                atomicReference.set(SendResult.TIMEOUT);
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            atomicReference.set(SendResult.UNKOWN_ERROR);
        }
        return (SendResult) atomicReference.get();
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) {
        return sendMessage(list, str, str2, j, str3, j2, timeUnit, (Map<String, String>) null);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public SendResult sendMessage(List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) {
        final AtomicReference atomicReference = new AtomicReference();
        final CountDownLatch countDownLatch = new CountDownLatch(list.size());
        ArrayList arrayList = new ArrayList(list.size());
        for (byte[] bArr : list) {
            SdkEvent sdkEvent = new SdkEvent();
            sdkEvent.setInlongGroupId(str);
            sdkEvent.setInlongStreamId(str2);
            sdkEvent.setBody(bArr);
            sdkEvent.setMsgTime(j);
            sdkEvent.setSourceIp(this.localIp);
            if (map != null) {
                sdkEvent.setHeaders(map);
            }
            arrayList.add(new CallbackProfile(sdkEvent, new SendMessageCallback() { // from class: org.apache.inlong.sdk.dataproxy.pb.PbProtocolMessageSender.2
                @Override // org.apache.inlong.sdk.dataproxy.common.SendMessageCallback
                public void onMessageAck(SendResult sendResult) {
                    atomicReference.set(SendResult.OK);
                    countDownLatch.countDown();
                }

                @Override // org.apache.inlong.sdk.dataproxy.common.SendMessageCallback
                public void onException(Throwable th) {
                    PbProtocolMessageSender.LOG.error(th.getMessage(), th);
                    atomicReference.set(SendResult.CONNECTION_BREAK);
                    countDownLatch.countDown();
                }
            }));
        }
        putAll(arrayList);
        try {
            if (!countDownLatch.await(j2, timeUnit)) {
                atomicReference.set(SendResult.TIMEOUT);
            }
        } catch (Exception e) {
            LOG.error(e.getMessage(), e);
            atomicReference.set(SendResult.UNKOWN_ERROR);
        }
        return (SendResult) atomicReference.get();
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, TimeUnit timeUnit) throws ProxysdkException {
        throw new ProxysdkException("Not support");
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) throws ProxysdkException {
        SdkEvent sdkEvent = new SdkEvent();
        sdkEvent.setInlongGroupId(str);
        sdkEvent.setInlongStreamId(str2);
        sdkEvent.setBody(bArr);
        sdkEvent.setMsgTime(j);
        sdkEvent.setSourceIp(this.localIp);
        if (map != null) {
            sdkEvent.setHeaders(map);
        }
        put(new CallbackProfile(sdkEvent, sendMessageCallback));
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, byte[] bArr, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, j, str3, j2, timeUnit, (Map<String, String>) null);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, j, str3, j2, timeUnit, (Map<String, String>) null);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(SendMessageCallback sendMessageCallback, List<byte[]> list, String str, String str2, long j, String str3, long j2, TimeUnit timeUnit, Map<String, String> map) throws ProxysdkException {
        ArrayList arrayList = new ArrayList(list.size());
        for (byte[] bArr : list) {
            SdkEvent sdkEvent = new SdkEvent();
            sdkEvent.setInlongGroupId(str);
            sdkEvent.setInlongStreamId(str2);
            sdkEvent.setBody(bArr);
            sdkEvent.setMsgTime(j);
            sdkEvent.setSourceIp(this.localIp);
            if (map != null) {
                sdkEvent.setHeaders(map);
            }
            arrayList.add(new CallbackProfile(sdkEvent, sendMessageCallback));
        }
        putAll(arrayList);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(String str, String str2, byte[] bArr, SendMessageCallback sendMessageCallback) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, bArr, str, str2, System.currentTimeMillis(), (String) null, 0L, (TimeUnit) null, (Map<String, String>) null);
    }

    @Override // org.apache.inlong.sdk.dataproxy.MessageSender
    public void asyncSendMessage(String str, String str2, List<byte[]> list, SendMessageCallback sendMessageCallback) throws ProxysdkException {
        asyncSendMessage(sendMessageCallback, list, str, str2, System.currentTimeMillis(), (String) null, 0L, (TimeUnit) null, (Map<String, String>) null);
    }
}
