package org.apache.seatunnel.connectors.seatunnel.rocketmq.sink;

import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqAdminUtil;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.common.RocketMqBaseConfiguration;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConnectorException;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/rocketmq/sink/RocketMqTransactionSender.class */
public class RocketMqTransactionSender implements RocketMqProducerSender {
    private static final String TXN_PARAM = "SeaTunnel-RocketMq";
    private final TransactionMQProducer transactionMQProducer;

    public RocketMqTransactionSender(RocketMqBaseConfiguration rocketMqBaseConfiguration) {
        this.transactionMQProducer = RocketMqAdminUtil.initTransactionMqProducer(rocketMqBaseConfiguration, new TransactionListener() { // from class: org.apache.seatunnel.connectors.seatunnel.rocketmq.sink.RocketMqTransactionSender.1
            @Override // org.apache.rocketmq.client.producer.TransactionListener
            public LocalTransactionState executeLocalTransaction(Message message, Object obj) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }

            @Override // org.apache.rocketmq.client.producer.TransactionListener
            public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
                return LocalTransactionState.COMMIT_MESSAGE;
            }
        });
        try {
            this.transactionMQProducer.start();
        } catch (MQClientException e) {
            throw new RocketMqConnectorException(RocketMqConnectorErrorCode.PRODUCER_START_ERROR, e);
        }
    }

    @Override // org.apache.seatunnel.connectors.seatunnel.rocketmq.sink.RocketMqProducerSender
    public void send(Message message) {
        try {
            this.transactionMQProducer.sendMessageInTransaction(message, StringUtils.isEmpty(message.getKeys()) ? TXN_PARAM : message.getKeys());
        } catch (MQClientException e) {
            throw new RocketMqConnectorException(RocketMqConnectorErrorCode.PRODUCER_SEND_MESSAGE_ERROR, e);
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        if (this.transactionMQProducer != null) {
            this.transactionMQProducer.shutdown();
        }
    }
}
