package org.apache.eventmesh.connector.openfunction.service;

import io.cloudevents.SpecVersion;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.ConsumerServiceGrpc;
import org.apache.eventmesh.connector.openfunction.client.CallbackServiceGrpc;
import org.apache.eventmesh.connector.openfunction.config.OpenFunctionServerConfig;
import org.apache.eventmesh.connector.openfunction.sink.connector.OpenFunctionSinkConnector;
import org.apache.eventmesh.openconnect.api.data.ConnectRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/connector/openfunction/service/ConsumerService.class */
public class ConsumerService extends ConsumerServiceGrpc.ConsumerServiceImplBase {
    private static final Logger log = LoggerFactory.getLogger(ConsumerService.class);
    public OpenFunctionSinkConnector openFunctionSinkConnector;
    public BlockingQueue<ConnectRecord> queue;
    public OpenFunctionServerConfig config;
    private final transient ManagedChannel channel = ManagedChannelBuilder.forAddress("127.0.0.1", 10115).usePlaintext().build();
    private CallbackServiceGrpc.CallbackServiceBlockingStub publisherClient = CallbackServiceGrpc.newBlockingStub(this.channel);
    private final ExecutorService handleService = Executors.newSingleThreadExecutor();

    public ConsumerService(OpenFunctionSinkConnector openFunctionSinkConnector, OpenFunctionServerConfig openFunctionServerConfig) {
        this.openFunctionSinkConnector = openFunctionSinkConnector;
        this.queue = openFunctionSinkConnector.queue();
        this.config = openFunctionServerConfig;
        this.handleService.execute(this::startHandleConsumeEvents);
    }

    private void startHandleConsumeEvents() {
        while (this.openFunctionSinkConnector.isRunning()) {
            ConnectRecord poll = this.queue.poll();
            if (poll != null) {
                this.publisherClient.onTopicEvent(convertRecordToEvent(poll));
            }
        }
    }

    private CloudEvent convertRecordToEvent(ConnectRecord connectRecord) {
        CloudEvent.Builder newBuilder = CloudEvent.newBuilder();
        newBuilder.setId(connectRecord.getExtension("id"));
        newBuilder.setSource(connectRecord.getExtension("source"));
        newBuilder.setSpecVersion(SpecVersion.V1.toString());
        newBuilder.setType(connectRecord.getExtension("type"));
        newBuilder.setTextData(new String((byte[]) connectRecord.getData()));
        for (String str : connectRecord.getExtensions().keySet()) {
            if (!StringUtils.equalsAny(str, new CharSequence[]{"id", "source", "type"})) {
                newBuilder.putAttributes(str, CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(connectRecord.getExtension(str)).build());
            }
        }
        return newBuilder.build();
    }
}
