package org.apache.eventmesh.connector.standalone.producer;

import com.google.common.base.Preconditions;
import io.cloudevents.CloudEvent;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.connector.standalone.broker.StandaloneBroker;
import org.apache.eventmesh.connector.standalone.broker.model.MessageEntity;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/standalone/producer/StandaloneProducer.class */
public class StandaloneProducer {
    private Logger logger = LoggerFactory.getLogger(StandaloneProducer.class);
    private StandaloneBroker standaloneBroker = StandaloneBroker.getInstance();
    private AtomicBoolean isStarted = new AtomicBoolean(false);

    public StandaloneProducer(Properties properties) {
    }

    public boolean isStarted() {
        return this.isStarted.get();
    }

    public boolean isClosed() {
        return !this.isStarted.get();
    }

    public void start() {
        this.isStarted.compareAndSet(false, true);
    }

    public void shutdown() {
        this.isStarted.compareAndSet(true, false);
    }

    public StandaloneProducer init(Properties properties) throws Exception {
        return new StandaloneProducer(properties);
    }

    public SendResult publish(CloudEvent cloudEvent) {
        Preconditions.checkNotNull(cloudEvent);
        try {
            MessageEntity putMessage = this.standaloneBroker.putMessage(cloudEvent.getSubject(), cloudEvent);
            SendResult sendResult = new SendResult();
            sendResult.setTopic(cloudEvent.getSubject());
            sendResult.setMessageId(String.valueOf(putMessage.getOffset()));
            return sendResult;
        } catch (Exception e) {
            this.logger.error("send message error, topic: {}", cloudEvent.getSubject(), e);
            throw new ConnectorRuntimeException(String.format("Send message error, topic: %s", cloudEvent.getSubject()));
        }
    }

    public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
        Preconditions.checkNotNull(cloudEvent);
        Preconditions.checkNotNull(sendCallback);
        try {
            sendCallback.onSuccess(publish(cloudEvent));
        } catch (Exception e) {
            sendCallback.onException(OnExceptionContext.builder().messageId(cloudEvent.getId()).topic(cloudEvent.getSubject()).exception(new ConnectorRuntimeException(e)).build());
        }
    }

    public void sendOneway(CloudEvent cloudEvent) {
        publish(cloudEvent);
    }

    public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
        Preconditions.checkNotNull(cloudEvent, "CloudEvent cannot be null");
        Preconditions.checkNotNull(sendCallback, "Callback cannot be null");
        try {
            sendCallback.onSuccess(publish(cloudEvent));
        } catch (Exception e) {
            sendCallback.onException(OnExceptionContext.builder().messageId(cloudEvent.getId()).topic(cloudEvent.getSubject()).exception(new ConnectorRuntimeException(e)).build());
        }
    }

    public void request(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long j) throws Exception {
        throw new ConnectorRuntimeException("Request is not supported");
    }

    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
        throw new ConnectorRuntimeException("Reply is not supported");
    }

    public void checkTopicExist(String str) throws Exception {
        if (!this.standaloneBroker.checkTopicExist(str)) {
            throw new ConnectorRuntimeException(String.format("topic:%s is not exist", str));
        }
    }

    public void setExtFields() {
    }
}
