package org.apache.eventmesh.connector.knative.producer;

import io.cloudevents.CloudEvent;
import java.io.IOException;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.exception.ConnectorRuntimeException;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.connector.knative.cloudevent.KnativeMessageFactory;
import org.apache.eventmesh.connector.knative.cloudevent.impl.KnativeHeaders;
import org.apache.eventmesh.connector.knative.utils.CloudEventUtils;
import org.asynchttpclient.Response;
import org.asynchttpclient.util.HttpConstants;

/* loaded from: input_file:org/apache/eventmesh/connector/knative/producer/ProducerImpl.class */
public class ProducerImpl extends AbstractProducer {
    public ProducerImpl(Properties properties) throws IOException {
        super(properties);
    }

    public Properties attributes() {
        return this.properties;
    }

    public void send(CloudEvent cloudEvent, SendCallback sendCallback) {
        try {
            Response response = (Response) super.getAsyncHttpClient().preparePost("http://" + attributes().getProperty("url")).addHeader(KnativeHeaders.CONTENT_TYPE, cloudEvent.getDataContentType()).addHeader(KnativeHeaders.CE_ID, cloudEvent.getId()).addHeader(KnativeHeaders.CE_SPECVERSION, String.valueOf(cloudEvent.getSpecVersion())).addHeader(KnativeHeaders.CE_TYPE, cloudEvent.getType()).addHeader(KnativeHeaders.CE_SOURCE, String.valueOf(cloudEvent.getSource())).setBody(KnativeMessageFactory.createReader(cloudEvent)).execute().get(10L, TimeUnit.SECONDS);
            if (response.getStatusCode() != HttpConstants.ResponseStatusCodes.OK_200) {
                throw new IllegalStateException("HTTP response code error: " + response.getStatusCode());
            }
            sendCallback.onSuccess(CloudEventUtils.convertSendResult(cloudEvent));
        } catch (Exception e) {
            ConnectorRuntimeException checkProducerException = checkProducerException(cloudEvent, e);
            OnExceptionContext onExceptionContext = new OnExceptionContext();
            onExceptionContext.setTopic(KnativeMessageFactory.createReader(cloudEvent));
            onExceptionContext.setException(checkProducerException);
            sendCallback.onException(onExceptionContext);
        }
    }

    public void sendAsync(CloudEvent cloudEvent, SendCallback sendCallback) {
        try {
            send(cloudEvent, sendCallback);
        } catch (Exception e) {
            throw new ConnectorRuntimeException("Send cloudevent message Exception.", e);
        }
    }

    public void init(Properties properties) throws Exception {
        new ProducerImpl(properties);
    }

    public void publish(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
        sendAsync(cloudEvent, sendCallback);
    }

    public void sendOneway(CloudEvent cloudEvent) {
        throw new ConnectorRuntimeException("SendOneWay is not supported");
    }

    public void request(CloudEvent cloudEvent, RequestReplyCallback requestReplyCallback, long j) throws Exception {
        throw new ConnectorRuntimeException("Request is not supported");
    }

    public boolean reply(CloudEvent cloudEvent, SendCallback sendCallback) throws Exception {
        throw new ConnectorRuntimeException("Reply is not supported");
    }

    public void checkTopicExist(String str) throws Exception {
        throw new ConnectorRuntimeException("CheckTopicExist is not supported");
    }

    public void setExtFields() {
        throw new ConnectorRuntimeException("SetExtFields is not supported");
    }

    public void start() {
        this.started.set(true);
    }

    public void shutdown() {
        this.started.set(false);
    }
}
