package org.apache.flink.connectors.tubemq;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashSet;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.inlong.tubemq.client.config.TubeClientConfig;
import org.apache.inlong.tubemq.client.factory.MessageSessionFactory;
import org.apache.inlong.tubemq.client.factory.TubeSingleSessionFactory;
import org.apache.inlong.tubemq.client.producer.MessageProducer;
import org.apache.inlong.tubemq.client.producer.MessageSentResult;
import org.apache.inlong.tubemq.corebase.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/connectors/tubemq/TubemqSinkFunction.class */
public class TubemqSinkFunction<T> extends RichSinkFunction<T> implements CheckpointedFunction {
    private static final Logger LOG = LoggerFactory.getLogger(TubemqSinkFunction.class);
    private static final String SYSTEM_HEADER_TIME_FORMAT = "yyyyMMddHHmm";
    private final String masterAddress;
    private final String topic;
    private final String streamId;
    private final SerializationSchema<T> serializationSchema;
    private transient MessageProducer producer;
    private transient MessageSessionFactory sessionFactory;
    private final int maxRetries;

    public TubemqSinkFunction(String str, String str2, SerializationSchema<T> serializationSchema, Configuration configuration) {
        Preconditions.checkNotNull(str, "The topic must not be null.");
        Preconditions.checkNotNull(str2, "The master address must not be null.");
        Preconditions.checkNotNull(serializationSchema, "The serialization schema must not be null.");
        Preconditions.checkNotNull(configuration, "The configuration must not be null.");
        this.topic = str;
        this.masterAddress = str2;
        this.serializationSchema = serializationSchema;
        this.streamId = configuration.getString(TubemqOptions.STREAM_ID);
        this.maxRetries = configuration.getInteger(TubemqOptions.MAX_RETRIES);
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) {
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) {
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        this.sessionFactory = new TubeSingleSessionFactory(new TubeClientConfig(this.masterAddress));
        this.producer = this.sessionFactory.createProducer();
        HashSet hashSet = new HashSet();
        hashSet.add(this.topic);
        this.producer.publish(hashSet);
    }

    public void invoke(T t, SinkFunction.Context context) throws Exception {
        MessageSentResult sendMessage;
        int i = 0;
        Exception exc = null;
        while (true) {
            if (this.maxRetries > 0 && i >= this.maxRetries) {
                throw new IOException("Could not properly send the message to hippo.", exc);
            }
            try {
                Message message = new Message(this.topic, this.serializationSchema.serialize(t));
                if (StringUtils.isNotBlank(this.streamId)) {
                    message.putSystemHeader(this.streamId, new SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT).format(new Date(System.currentTimeMillis())));
                }
                sendMessage = this.producer.sendMessage(message);
            } catch (Exception e) {
                LOG.warn("Could not properly send the message to hippo (retries: {}).", Integer.valueOf(i), e);
                i++;
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
            if (sendMessage.isSuccess()) {
                return;
            } else {
                LOG.warn("Send msg fail, error code: {}, error message: {}", Integer.valueOf(sendMessage.getErrCode()), sendMessage.getErrMsg());
            }
        }
    }

    public void close() throws Exception {
        try {
            if (this.producer != null) {
                this.producer.shutdown();
                this.producer = null;
            }
            if (this.sessionFactory != null) {
                this.sessionFactory.shutdown();
                this.sessionFactory = null;
            }
        } catch (Throwable th) {
            LOG.error("Shutdown producer error", th);
        } finally {
            super.close();
        }
    }
}
