package org.apache.eventmesh.runtime.core.protocol.grpc.push;

import io.grpc.stub.StreamObserver;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.collections4.MapUtils;
import org.apache.commons.lang3.RandomUtils;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.grpc.cloudevents.CloudEvent;
import org.apache.eventmesh.common.protocol.grpc.common.EventMeshCloudEventUtils;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.grpc.consumer.consumergroup.StreamTopicConfig;
import org.apache.eventmesh.runtime.core.protocol.grpc.service.EventEmitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest.class */
public class StreamPushRequest extends AbstractPushRequest {
    private static final Logger log = LoggerFactory.getLogger(StreamPushRequest.class);
    private final Map<String, List<EventEmitter<CloudEvent>>> idcEmitters;
    private final List<EventEmitter<CloudEvent>> totalEmitters;
    private final SubscriptionMode subscriptionMode;
    private final int startIdx;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.eventmesh.runtime.core.protocol.grpc.push.StreamPushRequest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/grpc/push/StreamPushRequest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$eventmesh$common$protocol$SubscriptionMode = new int[SubscriptionMode.values().length];

        static {
            try {
                $SwitchMap$org$apache$eventmesh$common$protocol$SubscriptionMode[SubscriptionMode.CLUSTERING.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$common$protocol$SubscriptionMode[SubscriptionMode.BROADCASTING.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public StreamPushRequest(HandleMsgContext handleMsgContext, Map<String, Set<AbstractPushRequest>> map) {
        super(handleMsgContext, map);
        StreamTopicConfig streamTopicConfig = (StreamTopicConfig) handleMsgContext.getConsumeTopicConfig();
        this.idcEmitters = streamTopicConfig.getIdcEmitters();
        this.totalEmitters = streamTopicConfig.getTotalEmitters();
        this.subscriptionMode = streamTopicConfig.getSubscriptionMode();
        this.startIdx = RandomUtils.nextInt(0, this.totalEmitters.size());
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.grpc.push.AbstractPushRequest
    public void tryPushRequest() {
        if (this.eventMeshCloudEvent == null) {
            return;
        }
        for (EventEmitter<CloudEvent> eventEmitter : selectEmitter()) {
            this.lastPushTime = System.currentTimeMillis();
            this.eventMeshCloudEvent = CloudEvent.newBuilder(this.eventMeshCloudEvent).putAttributes(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, CloudEvent.CloudEventAttributeValue.newBuilder().setCeString(String.valueOf(this.lastPushTime)).build()).build();
            try {
                StreamObserver<CloudEvent> emitter = eventEmitter.getEmitter();
                synchronized (emitter) {
                    emitter.onNext(this.eventMeshCloudEvent);
                }
                log.info("message|eventMesh2client|emitter|topic={}|bizSeqNo={}|uniqueId={}|cost={}", new Object[]{EventMeshCloudEventUtils.getSubject(this.eventMeshCloudEvent), EventMeshCloudEventUtils.getSeqNum(this.eventMeshCloudEvent), EventMeshCloudEventUtils.getUniqueId(this.eventMeshCloudEvent), Long.valueOf(System.currentTimeMillis() - this.lastPushTime)});
                complete();
            } catch (Throwable th) {
                log.error("message|eventMesh2client|exception={} |emitter|topic={}|bizSeqNo={}|uniqueId={}|cost={}", new Object[]{th.getMessage(), EventMeshCloudEventUtils.getSubject(this.eventMeshCloudEvent), EventMeshCloudEventUtils.getSeqNum(this.eventMeshCloudEvent), EventMeshCloudEventUtils.getUniqueId(this.eventMeshCloudEvent), Long.valueOf(System.currentTimeMillis() - this.lastPushTime), th});
                delayRetry();
            }
        }
    }

    private List<EventEmitter<CloudEvent>> selectEmitter() {
        List<EventEmitter<CloudEvent>> list = (List) MapUtils.getObject(this.idcEmitters, this.eventMeshGrpcConfiguration.getEventMeshIDC(), (Object) null);
        if (CollectionUtils.isNotEmpty(list)) {
            return getEventEmitters(list);
        }
        if (CollectionUtils.isNotEmpty(this.totalEmitters)) {
            return getEventEmitters(this.totalEmitters);
        }
        log.error("No event emitters from subscriber, no message returning.");
        return Collections.emptyList();
    }

    private List<EventEmitter<CloudEvent>> getEventEmitters(List<EventEmitter<CloudEvent>> list) {
        switch (AnonymousClass1.$SwitchMap$org$apache$eventmesh$common$protocol$SubscriptionMode[this.subscriptionMode.ordinal()]) {
            case 1:
                return Collections.singletonList(list.get((this.startIdx + this.retryTimes) % list.size()));
            case 2:
                return list;
            default:
                log.error("Invalid Subscription Mode, no message returning back to subscriber.");
                return Collections.emptyList();
        }
    }
}
