package org.apache.eventmesh.connector.openfunction.service;

import com.google.protobuf.Timestamp;
import io.grpc.stub.StreamObserver;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneId;
import java.time.ZoneOffset;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEventBatch;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.PublisherServiceGrpc;
import org.apache.eventmesh.common.protocol.grpc.common.StatusCode;
import org.apache.eventmesh.connector.openfunction.config.OpenFunctionServerConfig;
import org.apache.eventmesh.connector.openfunction.source.connector.OpenFunctionSourceConnector;
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.api.data.RecordOffset;
import org.apache.eventmesh.openconnect.api.data.RecordPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/openfunction/service/ProducerService.class */
public class ProducerService extends PublisherServiceGrpc.PublisherServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(ProducerService.class);
    public OpenFunctionSourceConnector openFunctionSourceConnector;
    public BlockingQueue<ConnectRecord> queue;
    public OpenFunctionServerConfig config;

    public ProducerService(OpenFunctionSourceConnector openFunctionSourceConnector, OpenFunctionServerConfig openFunctionServerConfig) {
        this.openFunctionSourceConnector = openFunctionSourceConnector;
        this.queue = openFunctionSourceConnector.queue();
        this.config = openFunctionServerConfig;
    }

    public void publish(CloudEvent cloudEvent, StreamObserver<CloudEvent> streamObserver) {
        log.info("receive cloudevents {}", cloudEvent);
        Instant now = now();
        CloudEvent.Builder newBuilder = CloudEvent.newBuilder();
        try {
            this.queue.put(convertCloudEventToConnectorRecord(cloudEvent));
            newBuilder.putAttributes("status_code", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(StatusCode.SUCCESS.getRetCode()).build()).putAttributes("response_message", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(StatusCode.SUCCESS.getErrMsg()).build()).putAttributes("time", CloudEvent.CloudEventAttributeValue.newBuilder().setCeTimestamp(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).setNanos(now.getNano()).build()).build());
            streamObserver.onNext(newBuilder.build());
            streamObserver.onCompleted();
        } catch (InterruptedException e) {
            log.error("publish event error {}", e.getMessage());
            newBuilder.putAttributes("status_code", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(StatusCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getRetCode()).build()).putAttributes("response_message", CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(StatusCode.EVENTMESH_SEND_ASYNC_MSG_ERR.getErrMsg()).build()).putAttributes("time", CloudEvent.CloudEventAttributeValue.newBuilder().setCeTimestamp(Timestamp.newBuilder().setSeconds(now.getEpochSecond()).setNanos(now.getNano()).build()).build());
            throw new RuntimeException(e);
        }
    }

    private ConnectRecord convertCloudEventToConnectorRecord(CloudEvent cloudEvent) {
        ConnectRecord connectRecord = new ConnectRecord((RecordPartition) null, (RecordOffset) null, Long.valueOf(System.currentTimeMillis()), cloudEvent.getTextData());
        for (String str : cloudEvent.getAttributesMap().keySet()) {
            connectRecord.addExtension(str, ((CloudEvent.CloudEventAttributeValue) Objects.requireNonNull(cloudEvent.getAttributesOrThrow(str))).getCeString());
        }
        connectRecord.addExtension("id", cloudEvent.getId());
        connectRecord.addExtension("source", cloudEvent.getSource());
        connectRecord.addExtension("type", cloudEvent.getType());
        return connectRecord;
    }

    public void batchPublish(CloudEventBatch cloudEventBatch, StreamObserver<CloudEvent> streamObserver) {
    }

    private static Instant now() {
        return OffsetDateTime.of(LocalDateTime.now(ZoneId.systemDefault()), ZoneOffset.UTC).toInstant();
    }
}
