package org.apache.eventmesh.connector.kafka.producer;

import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventSerializer;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/kafka/producer/ProducerImpl.class */
public class ProducerImpl {
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
    private KafkaProducer<String, CloudEvent> producer;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    Properties properties = new Properties();

    public ProducerImpl(Properties properties) {
        this.properties.put("bootstrap.servers", properties.getProperty("bootstrap.servers"));
        this.properties.put("key.serializer", StringSerializer.class);
        this.properties.put("value.serializer", CloudEventSerializer.class);
        this.producer = new KafkaProducer<>(this.properties);
    }

    public boolean isStarted() {
        return this.isStarted.get();
    }

    public boolean isClosed() {
        return !this.isStarted.get();
    }

    public void start() {
        this.isStarted.compareAndSet(false, true);
    }

    public void shutdown() {
        this.isStarted.compareAndSet(true, false);
    }

    public ProducerImpl init(Properties properties) throws Exception {
        return new ProducerImpl(properties);
    }

    public void send(CloudEvent cloudEvent) {
        try {
            this.producer.send(new ProducerRecord(cloudEvent.getSubject(), cloudEvent));
        } catch (Exception e) {
            log.error(String.format("Send message oneway Exception, %s", cloudEvent), e);
        }
    }

    public void checkTopicExist(String str) throws ExecutionException, InterruptedException, ConnectorRuntimeException {
        Admin create = Admin.create(this.properties);
        Set set = (Set) create.listTopics().names().get();
        create.close();
        if (!set.contains(str)) {
            throw new ConnectorRuntimeException(String.format("topic:%s is not exist", str));
        }
    }

    public void request(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long j) throws Exception {
        throw new ConnectorRuntimeException("Request is not supported");
    }

    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
        throw new ConnectorRuntimeException("Reply is not supported");
    }

    public void sendOneway(CloudEvent cloudEvent) {
    }

    public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
        try {
            this.producer.send(new ProducerRecord(cloudEvent.getSubject(), cloudEvent));
        } catch (Exception e) {
            log.error(String.format("Send message oneway Exception, %s", cloudEvent), e);
        }
    }
}
