package org.apache.eventmesh.storage.kafka.producer;

import io.cloudevents.CloudEvent;
import io.cloudevents.kafka.CloudEventSerializer;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.api.exception.StorageRuntimeException;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/storage/kafka/producer/ProducerImpl.class */
public class ProducerImpl {
    private static final Logger log = LoggerFactory.getLogger(ProducerImpl.class);
    private final KafkaProducer<String, CloudEvent> producer;
    private final Properties properties = new Properties();
    private final AtomicBoolean isStarted = new AtomicBoolean(false);

    public ProducerImpl(Properties properties) {
        this.properties.put("bootstrap.servers", properties.getProperty("bootstrap.servers"));
        this.properties.put("key.serializer", StringSerializer.class);
        this.properties.put("value.serializer", CloudEventSerializer.class);
        this.producer = new KafkaProducer<>(this.properties);
    }

    public boolean isStarted() {
        return this.isStarted.get();
    }

    public boolean isClosed() {
        return !this.isStarted.get();
    }

    public void start() {
        this.isStarted.compareAndSet(false, true);
    }

    public void shutdown() {
        this.isStarted.compareAndSet(true, false);
    }

    public void init(Properties properties) {
        this.properties.putAll(properties);
    }

    public void send(CloudEvent cloudEvent) {
        try {
            this.producer.send(new ProducerRecord((String) Objects.requireNonNull(cloudEvent.getSubject()), cloudEvent));
        } catch (Exception e) {
            log.error(String.format("Send message oneway Exception, %s", cloudEvent), e);
        }
    }

    public void checkTopicExist(String str) throws ExecutionException, InterruptedException, StorageRuntimeException {
        Admin create = Admin.create(this.properties);
        Throwable th = null;
        try {
            try {
                if (!((Set) create.listTopics().names().get()).contains(str)) {
                    throw new StorageRuntimeException(String.format("topic:%s is not exist", str));
                }
                if (create != null) {
                    if (0 == 0) {
                        create.close();
                        return;
                    }
                    try {
                        create.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (th != null) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    public void request(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long j) throws Exception {
        throw new StorageRuntimeException("Request is not supported");
    }

    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
        throw new StorageRuntimeException("Reply is not supported");
    }

    public void sendOneway(CloudEvent cloudEvent) {
    }

    public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
        try {
            this.producer.send(new ProducerRecord((String) Objects.requireNonNull(cloudEvent.getSubject()), cloudEvent), (recordMetadata, exc) -> {
                if (exc == null) {
                    SendResult sendResult = new SendResult();
                    sendResult.setTopic(cloudEvent.getSubject());
                    sendResult.setMessageId(cloudEvent.getId());
                    sendCallback.onSuccess(sendResult);
                    return;
                }
                StorageRuntimeException storageRuntimeException = new StorageRuntimeException(exc.getMessage(), exc);
                OnExceptionContext onExceptionContext = new OnExceptionContext();
                onExceptionContext.setTopic(cloudEvent.getSubject());
                onExceptionContext.setException(storageRuntimeException);
                sendCallback.onException(onExceptionContext);
            });
        } catch (Exception e) {
            log.error(String.format("Send message oneway Exception, %s", cloudEvent), e);
        }
    }
}
