package org.apache.inlong.audit.sink.pulsar;

import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.FlumeException;
import org.apache.inlong.audit.sink.EventStat;
import org.apache.inlong.audit.utils.LogCounter;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/audit/sink/pulsar/PulsarClientService.class */
public class PulsarClientService {
    private static final Logger logger = LoggerFactory.getLogger(PulsarClientService.class);
    private static final LogCounter logPrinterA = new LogCounter(10, 100000, 60000);
    private static String PULSAR_SERVER_URL = "pulsar_server_url";
    private static String PULSAR_ENABLE_AUTH = "enable_token_auth";
    private static String PULSAR_ENABLE_AUTH_TOKEN = "auth_token";
    private static String SEND_TIMEOUT = "send_timeout_ms";
    private static String CLIENT_TIMEOUT = "client_op_timeout_second";
    private static String ENABLE_BATCH = "enable_batch";
    private static String BLOCK_IF_QUEUE_FULL = "block_if_queue_full";
    private static String MAX_PENDING_MESSAGES = "max_pending_messages";
    private static String MAX_BATCHING_MESSAGES = "max_batching_messages";
    private static int DEFAULT_SEND_TIMEOUT_MILL = 30000;
    private static int DEFAULT_CLIENT_TIMEOUT_SECOND = 30;
    private static boolean DEFAULT_ENABLE_BATCH = true;
    private static boolean DEFAULT_BLOCK_IF_QUEUE_FULL = true;
    private static int DEFAULT_MAX_PENDING_MESSAGES = 10000;
    private static int DEFAULT_MAX_BATCHING_MESSAGES = 1000;
    private static boolean DEFAULT_PULSAR_ENABLE_TOKEN_AUTH = false;
    private static String DEFAULT_PULSAR_TOKEN_AUTH = "";
    private Integer sendTimeout;
    private Integer clientOpTimeout;
    private boolean enableBatch;
    private boolean blockIfQueueFull;
    private int maxPendingMessages;
    private int maxBatchingMessages;
    public ConcurrentHashMap<String, Producer> producerInfoMap;
    public PulsarClient pulsarClient;
    public String pulsarServerUrl;
    public boolean pulsarEnableTokenAuth;
    public String pulsarTokenAuth;
    private String localIp;

    public PulsarClientService(Context context) {
        this.enableBatch = true;
        this.blockIfQueueFull = true;
        this.maxPendingMessages = 10000;
        this.maxBatchingMessages = 1000;
        this.localIp = "127.0.0.1";
        this.pulsarServerUrl = context.getString(PULSAR_SERVER_URL);
        Preconditions.checkState(this.pulsarServerUrl != null, "No pulsar server url specified");
        this.sendTimeout = context.getInteger(SEND_TIMEOUT, Integer.valueOf(DEFAULT_SEND_TIMEOUT_MILL));
        this.clientOpTimeout = context.getInteger(CLIENT_TIMEOUT, Integer.valueOf(DEFAULT_CLIENT_TIMEOUT_SECOND));
        logger.debug("PulsarClientService " + SEND_TIMEOUT + " " + this.sendTimeout);
        Preconditions.checkArgument(this.sendTimeout.intValue() > 0, "sendTimeout must be > 0");
        this.enableBatch = context.getBoolean(ENABLE_BATCH, Boolean.valueOf(DEFAULT_ENABLE_BATCH)).booleanValue();
        this.blockIfQueueFull = context.getBoolean(BLOCK_IF_QUEUE_FULL, Boolean.valueOf(DEFAULT_BLOCK_IF_QUEUE_FULL)).booleanValue();
        this.maxPendingMessages = context.getInteger(MAX_PENDING_MESSAGES, Integer.valueOf(DEFAULT_MAX_PENDING_MESSAGES)).intValue();
        this.maxBatchingMessages = context.getInteger(MAX_BATCHING_MESSAGES, Integer.valueOf(DEFAULT_MAX_BATCHING_MESSAGES)).intValue();
        this.producerInfoMap = new ConcurrentHashMap<>();
        this.localIp = NetworkUtils.getLocalIp();
        this.pulsarEnableTokenAuth = context.getBoolean(PULSAR_ENABLE_AUTH, Boolean.valueOf(DEFAULT_PULSAR_ENABLE_TOKEN_AUTH)).booleanValue();
        this.pulsarTokenAuth = context.getString(PULSAR_ENABLE_AUTH_TOKEN, DEFAULT_PULSAR_TOKEN_AUTH);
    }

    public void initCreateConnection(CreatePulsarClientCallBack createPulsarClientCallBack) {
        try {
            createConnection(createPulsarClientCallBack);
        } catch (FlumeException e) {
            logger.error("Unable to create pulsar client. Exception follows.", e);
            close();
        }
    }

    public boolean sendMessage(String str, Event event, SendMessageCallBack sendMessageCallBack, EventStat eventStat) {
        Producer producer = null;
        try {
            producer = getProducer(str);
        } catch (Exception e) {
            if (logPrinterA.shouldPrint()) {
                logger.error("Get producer failed!", e);
            }
        }
        if (producer == null) {
            logger.error("Get producer is null!");
            return false;
        }
        HashMap hashMap = new HashMap();
        hashMap.put("auditIp", this.localIp);
        if (event.getHeaders().containsKey("inlongStreamId")) {
            hashMap.put("inlongStreamId", (String) event.getHeaders().get("inlongStreamId"));
        }
        if (event.getHeaders().containsKey("inlongGroupId")) {
            hashMap.put("inlongGroupId", (String) event.getHeaders().get("inlongGroupId"));
        }
        logger.debug("producer send msg!");
        producer.newMessage().properties(hashMap).value(event.getBody()).sendAsync().thenAccept(obj -> {
            sendMessageCallBack.handleMessageSendSuccess((MessageIdImpl) obj, eventStat);
        }).exceptionally(obj2 -> {
            sendMessageCallBack.handleMessageSendException(eventStat, obj2);
            return null;
        });
        return true;
    }

    private void createConnection(CreatePulsarClientCallBack createPulsarClientCallBack) throws FlumeException {
        if (this.pulsarClient != null) {
            return;
        }
        try {
            this.pulsarClient = initPulsarClient(this.pulsarServerUrl);
            createPulsarClientCallBack.handleCreateClientSuccess(this.pulsarServerUrl);
        } catch (PulsarClientException e) {
            createPulsarClientCallBack.handleCreateClientException(this.pulsarServerUrl);
            logger.error("create connnection error in metasink, maybe pulsar master set error, please re-check.url{}, ex1 {}", this.pulsarServerUrl, e.getMessage());
        } catch (Throwable th) {
            createPulsarClientCallBack.handleCreateClientException(this.pulsarServerUrl);
            logger.error("create connnection error in metasink, maybe pulsar master set error/shutdown in progress, please re-check. url{}, ex2 {}", this.pulsarServerUrl, th.getMessage());
        }
    }

    private PulsarClient initPulsarClient(String str) throws Exception {
        ClientBuilder builder = PulsarClient.builder();
        if (this.pulsarEnableTokenAuth && StringUtils.isNotEmpty(this.pulsarTokenAuth)) {
            builder.authentication(AuthenticationFactory.token(this.pulsarTokenAuth));
        }
        return builder.serviceUrl(str).connectionTimeout(this.clientOpTimeout.intValue(), TimeUnit.SECONDS).build();
    }

    public Producer initTopicProducer(String str) {
        logger.info("initTopicProducer topic = {}", str);
        Producer producer = null;
        try {
            producer = this.pulsarClient.newProducer().sendTimeout(this.sendTimeout.intValue(), TimeUnit.MILLISECONDS).topic(str).enableBatching(this.enableBatch).blockIfQueueFull(this.blockIfQueueFull).maxPendingMessages(this.maxPendingMessages).batchingMaxMessages(this.maxBatchingMessages).create();
        } catch (PulsarClientException e) {
            logger.error("create pulsar client has error e = {}", e);
        }
        return producer;
    }

    private Producer getProducer(String str) {
        return this.producerInfoMap.computeIfAbsent(str, str2 -> {
            return initTopicProducer(str);
        });
    }

    public void closeTopicProducer(String str) {
        logger.info("closeTopicProducer topic = {}", str);
        Producer remove = this.producerInfoMap.remove(str);
        if (remove != null) {
            remove.closeAsync();
        }
    }

    private void destroyConnection() {
        this.producerInfoMap.clear();
        if (this.pulsarClient != null) {
            try {
                this.pulsarClient.shutdown();
            } catch (PulsarClientException e) {
                logger.error("destroy pulsarClient error in PulsarSink, PulsarClientException {}", e.getMessage());
            } catch (Exception e2) {
                logger.error("destroy pulsarClient error in PulsarSink, ex {}", e2.getMessage());
            }
        }
        this.pulsarClient = null;
        logger.debug("closed meta producer");
    }

    public void close() {
        destroyConnection();
    }
}
