package org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.redis;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import java.lang.invoke.MethodHandles;
import org.apache.servicecomb.pack.alpha.core.NodeStatus;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.channel.AbstractEventConsumer;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.metrics.MetricsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;

/* loaded from: input_file:BOOT-INF/lib/alpha-spec-saga-akka-0.7.0.jar:org/apache/servicecomb/pack/alpha/spec/saga/akka/channel/redis/RedisSagaEventConsumer.class */
public class RedisSagaEventConsumer extends AbstractEventConsumer implements MessageListener {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private NodeStatus nodeStatus;
    private MessageSerializer messageSerializer;

    public RedisSagaEventConsumer(ActorSystem actorSystem, ActorRef actorRef, MetricsService metricsService, NodeStatus nodeStatus) {
        super(actorSystem, actorRef, metricsService);
        this.messageSerializer = new MessageSerializer();
        this.nodeStatus = nodeStatus;
    }

    @Override // org.springframework.data.redis.connection.MessageListener
    public void onMessage(Message message, byte[] bArr) {
        if (this.nodeStatus.isMaster()) {
            this.messageSerializer.deserialize(message.getBody()).ifPresent(obj -> {
                BaseEvent baseEvent = (BaseEvent) obj;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("event = [{}]", baseEvent);
                }
                try {
                    long currentTimeMillis = System.currentTimeMillis();
                    this.metricsService.metrics().doActorReceived();
                    this.sagaShardRegionActor.tell(baseEvent, this.sagaShardRegionActor);
                    long currentTimeMillis2 = System.currentTimeMillis();
                    this.metricsService.metrics().doActorAccepted();
                    this.metricsService.metrics().doActorAvgTime(currentTimeMillis2 - currentTimeMillis);
                } catch (Exception e) {
                    this.metricsService.metrics().doActorRejected();
                    LOG.error("subscriber Exception = [{}]", e.getMessage(), e);
                }
            });
        }
    }
}
