package com.github.chanhohang.akka.actor;

import akka.persistence.RecoveryCompleted;
import akka.persistence.UntypedPersistentActor;
import akka.stream.ActorMaterializer;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.chanhohang.akka.AkkaClusterMessengerConstants;
import com.github.chanhohang.akka.cluster.ClusterMessageRouter;
import com.github.chanhohang.akka.cluster.MessagePublishRouter;
import com.github.chanhohang.akka.journal.ClusterJdbcReadJournal;
import com.github.chanhohang.akka.journal.ClusterReadJournal;
import com.github.chanhohang.akka.message.Acknowledgement;
import com.github.chanhohang.akka.message.MessageEnvelopeImpl;
import com.github.chanhohang.akka.message.persist.AcknowledgementPersist;
import com.github.chanhohang.akka.message.resend.ResendComplete;
import com.github.chanhohang.akka.message.resend.ResendRequest;
import com.github.chanhohang.akka.message.reset.ResetSequenceNumberRequest;
import java.beans.ConstructorProperties;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Component(AkkaClusterMessengerConstants.PersistentId.Cluster.Receiver)
@Lazy
/* loaded from: input_file:com/github/chanhohang/akka/actor/ClusterMessengerReceiver.class */
public class ClusterMessengerReceiver extends UntypedPersistentActor {
    private Logger log;
    private ObjectMapper mapper;
    private Map<String, AtomicLong> sequenceNumberMap;

    @Value("${actp.akka.role}")
    private String role;

    @Autowired(required = false)
    private ClusterMessageRouter router;

    @Autowired(required = false)
    private ClusterReadJournal readJournal;
    private boolean resendTrigger;

    @PostConstruct
    private void postConstruct() {
        if (this.readJournal == null) {
            this.readJournal = new ClusterJdbcReadJournal();
        }
        if (this.router == null) {
            this.router = new MessagePublishRouter(context().system());
        }
    }

    public String persistenceId() {
        return this.role + AkkaClusterMessengerConstants.PersistentId.SEPARATOR + AkkaClusterMessengerConstants.PersistentId.Cluster.Receiver;
    }

    public void onReceiveCommand(Object obj) throws Exception {
        try {
            this.log.info("onReceiveCommand {} from sender {}", obj, getSender());
            if (obj instanceof ResendComplete) {
                handleResendComplete((ResendComplete) obj);
            } else if (obj instanceof ResetSequenceNumberRequest) {
                handleResetSequenceNumber((ResetSequenceNumberRequest) obj);
            } else if (obj instanceof MessageEnvelopeImpl) {
                handleMessage((MessageEnvelopeImpl) obj);
            } else {
                unhandled(obj);
            }
        } catch (Exception e) {
            this.log.error("Unexpected Exception.", e);
            try {
                getSender().tell(e, self());
            } catch (Exception e2) {
                this.log.error("Unexpected replyException.", e2);
            }
        }
    }

    private void handleMessage(MessageEnvelopeImpl messageEnvelopeImpl) throws JsonParseException, JsonMappingException, IOException {
        sendAcknowledgement(messageEnvelopeImpl, this.role);
        String role = messageEnvelopeImpl.getRole();
        AtomicLong sequenceNumber = getSequenceNumber(role);
        if (messageEnvelopeImpl.getSequenceNumber() <= sequenceNumber.get()) {
            this.log.info("Sequence Number {} from {} is already processed.", sequenceNumber, role);
            return;
        }
        if (messageEnvelopeImpl.getSequenceNumber() > sequenceNumber.get() + 1) {
            if (this.resendTrigger) {
                this.log.info("resend from {} is in progressed.", role);
                return;
            }
            this.resendTrigger = true;
            ResendRequest resendRequest = new ResendRequest(sequenceNumber.get() + 1, messageEnvelopeImpl.getSequenceNumber());
            resendRequest.setTargetRole(getRole());
            this.log.info("start resend. {}", resendRequest);
            getSender().tell(resendRequest, getSelf());
            return;
        }
        sequenceNumber.set(messageEnvelopeImpl.getSequenceNumber());
        Object obj = null;
        if (messageEnvelopeImpl.getPayload() == null || messageEnvelopeImpl.getPayloadClass() == null) {
            this.log.info("Skip parsing unrecognized payloadClass {} and payload {}", messageEnvelopeImpl.getPayloadClass(), messageEnvelopeImpl.getPayload());
        } else {
            obj = this.mapper.readValue(messageEnvelopeImpl.getPayload(), messageEnvelopeImpl.getPayloadClass());
        }
        AcknowledgementPersist acknowledgementPersist = new AcknowledgementPersist();
        acknowledgementPersist.setRole(role);
        acknowledgementPersist.setSequenceNumber(messageEnvelopeImpl.getSequenceNumber());
        acknowledgementPersist.setPayloadClass(messageEnvelopeImpl.getPayloadClass());
        acknowledgementPersist.setPayload(obj);
        acknowledgementPersist.setTargetRole(messageEnvelopeImpl.getRole());
        persistAsync(acknowledgementPersist, acknowledgementPersist2 -> {
            if (acknowledgementPersist2.getPayload() != null) {
                this.router.route(acknowledgementPersist2);
            }
        });
    }

    private void sendAcknowledgement(MessageEnvelopeImpl messageEnvelopeImpl, String str) {
        Acknowledgement acknowledgement = new Acknowledgement();
        acknowledgement.setDeliveryId(messageEnvelopeImpl.getDeliveryId());
        acknowledgement.setTargetRole(str);
        acknowledgement.setRole(getRole());
        this.log.info("Sending Acknowledgement {} to sender {}", acknowledgement, getSender());
        getSender().tell(acknowledgement, getSelf());
    }

    private void handleResetSequenceNumber(ResetSequenceNumberRequest resetSequenceNumberRequest) {
        long sequenceNumber = resetSequenceNumberRequest.getSequenceNumber();
        AtomicLong sequenceNumber2 = getSequenceNumber(resetSequenceNumberRequest.getRole());
        String role = resetSequenceNumberRequest.getRole();
        ArrayList arrayList = new ArrayList();
        if (sequenceNumber2.get() == sequenceNumber) {
            this.log.info("handleResetSequenceNumber already reset {}, role {}", Long.valueOf(sequenceNumber), role);
        } else {
            this.log.info("handleResetSequenceNumber for reset sequence number {}, role {}, current sequence number {}", new Object[]{Long.valueOf(sequenceNumber), role, sequenceNumber2});
            sequenceNumber2.set(sequenceNumber);
            this.readJournal.currentEventsByPersistenceId(getContext().system(), persistenceId(), 0L, Long.MAX_VALUE).runForeach(eventEnvelope -> {
                if (eventEnvelope.event() instanceof AcknowledgementPersist) {
                    AcknowledgementPersist acknowledgementPersist = (AcknowledgementPersist) eventEnvelope.event();
                    if (resetSequenceNumberRequest.getRole() == null || resetSequenceNumberRequest.getRole().equals(acknowledgementPersist.getTargetRole())) {
                        long sequenceNr = eventEnvelope.sequenceNr();
                        if (acknowledgementPersist.getSequenceNumber() >= sequenceNumber) {
                            arrayList.add(Long.valueOf(sequenceNr));
                        }
                    }
                }
            }, ActorMaterializer.create(getContext().system())).thenRun(() -> {
                this.log.info("Delete Event Store Sequence Number {}", arrayList);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    deleteMessages(((Long) it.next()).longValue());
                }
            });
        }
        sendAcknowledgement(resetSequenceNumberRequest, resetSequenceNumberRequest.getRole());
    }

    private void handleResendComplete(ResendComplete resendComplete) {
        long endSequenceNumber = resendComplete.getEndSequenceNumber();
        AtomicLong sequenceNumber = getSequenceNumber(resendComplete.getRole());
        if (sequenceNumber.get() == endSequenceNumber) {
            this.log.info("handleResendComplete Complete for end sequence number {}, role {}", Long.valueOf(endSequenceNumber), resendComplete.getRole());
        } else {
            this.log.info("handleResendComplete incomplete and reset sequence number to {}, role {}, ", Long.valueOf(endSequenceNumber), resendComplete.getRole());
            sequenceNumber.set(endSequenceNumber);
        }
        this.resendTrigger = false;
        sendAcknowledgement(resendComplete, resendComplete.getRole());
    }

    private AtomicLong getSequenceNumber(String str) {
        AtomicLong atomicLong = this.sequenceNumberMap.get(str);
        if (atomicLong == null) {
            atomicLong = new AtomicLong();
            this.sequenceNumberMap.put(str, atomicLong);
        }
        return atomicLong;
    }

    public void onReceiveRecover(Object obj) {
        this.log.info("onReceiveRecover {}", obj);
        if (!(obj instanceof AcknowledgementPersist)) {
            if (obj instanceof RecoveryCompleted) {
                this.log.info("sequenceNumberMap {}", this.sequenceNumberMap);
            }
        } else {
            AcknowledgementPersist acknowledgementPersist = (AcknowledgementPersist) obj;
            AtomicLong sequenceNumber = getSequenceNumber(acknowledgementPersist.getRole());
            if (sequenceNumber.get() < acknowledgementPersist.getSequenceNumber()) {
                sequenceNumber.set(acknowledgementPersist.getSequenceNumber());
            }
        }
    }

    public Logger getLog() {
        return this.log;
    }

    public ObjectMapper getMapper() {
        return this.mapper;
    }

    public Map<String, AtomicLong> getSequenceNumberMap() {
        return this.sequenceNumberMap;
    }

    public String getRole() {
        return this.role;
    }

    public ClusterMessageRouter getRouter() {
        return this.router;
    }

    public ClusterReadJournal getReadJournal() {
        return this.readJournal;
    }

    public boolean isResendTrigger() {
        return this.resendTrigger;
    }

    public void setLog(Logger logger) {
        this.log = logger;
    }

    public void setMapper(ObjectMapper objectMapper) {
        this.mapper = objectMapper;
    }

    public void setSequenceNumberMap(Map<String, AtomicLong> map) {
        this.sequenceNumberMap = map;
    }

    public void setRole(String str) {
        this.role = str;
    }

    public void setRouter(ClusterMessageRouter clusterMessageRouter) {
        this.router = clusterMessageRouter;
    }

    public void setReadJournal(ClusterReadJournal clusterReadJournal) {
        this.readJournal = clusterReadJournal;
    }

    public void setResendTrigger(boolean z) {
        this.resendTrigger = z;
    }

    public String toString() {
        return "ClusterMessengerReceiver(log=" + getLog() + ", mapper=" + getMapper() + ", sequenceNumberMap=" + getSequenceNumberMap() + ", role=" + getRole() + ", router=" + getRouter() + ", readJournal=" + getReadJournal() + ", resendTrigger=" + isResendTrigger() + ")";
    }

    @ConstructorProperties({"log", "mapper", "sequenceNumberMap", "role", "router", "readJournal", "resendTrigger"})
    public ClusterMessengerReceiver(Logger logger, ObjectMapper objectMapper, Map<String, AtomicLong> map, String str, ClusterMessageRouter clusterMessageRouter, ClusterReadJournal clusterReadJournal, boolean z) {
        this.log = LoggerFactory.getLogger(getClass());
        this.mapper = new ObjectMapper();
        this.sequenceNumberMap = new HashMap();
        this.resendTrigger = false;
        this.log = logger;
        this.mapper = objectMapper;
        this.sequenceNumberMap = map;
        this.role = str;
        this.router = clusterMessageRouter;
        this.readJournal = clusterReadJournal;
        this.resendTrigger = z;
    }

    public ClusterMessengerReceiver() {
        this.log = LoggerFactory.getLogger(getClass());
        this.mapper = new ObjectMapper();
        this.sequenceNumberMap = new HashMap();
        this.resendTrigger = false;
    }

    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof ClusterMessengerReceiver)) {
            return false;
        }
        ClusterMessengerReceiver clusterMessengerReceiver = (ClusterMessengerReceiver) obj;
        if (!clusterMessengerReceiver.canEqual(this) || !super/*java.lang.Object*/.equals(obj)) {
            return false;
        }
        Logger log = getLog();
        Logger log2 = clusterMessengerReceiver.getLog();
        if (log == null) {
            if (log2 != null) {
                return false;
            }
        } else if (!log.equals(log2)) {
            return false;
        }
        ObjectMapper mapper = getMapper();
        ObjectMapper mapper2 = clusterMessengerReceiver.getMapper();
        if (mapper == null) {
            if (mapper2 != null) {
                return false;
            }
        } else if (!mapper.equals(mapper2)) {
            return false;
        }
        Map<String, AtomicLong> sequenceNumberMap = getSequenceNumberMap();
        Map<String, AtomicLong> sequenceNumberMap2 = clusterMessengerReceiver.getSequenceNumberMap();
        if (sequenceNumberMap == null) {
            if (sequenceNumberMap2 != null) {
                return false;
            }
        } else if (!sequenceNumberMap.equals(sequenceNumberMap2)) {
            return false;
        }
        String role = getRole();
        String role2 = clusterMessengerReceiver.getRole();
        if (role == null) {
            if (role2 != null) {
                return false;
            }
        } else if (!role.equals(role2)) {
            return false;
        }
        ClusterMessageRouter router = getRouter();
        ClusterMessageRouter router2 = clusterMessengerReceiver.getRouter();
        if (router == null) {
            if (router2 != null) {
                return false;
            }
        } else if (!router.equals(router2)) {
            return false;
        }
        ClusterReadJournal readJournal = getReadJournal();
        ClusterReadJournal readJournal2 = clusterMessengerReceiver.getReadJournal();
        if (readJournal == null) {
            if (readJournal2 != null) {
                return false;
            }
        } else if (!readJournal.equals(readJournal2)) {
            return false;
        }
        return isResendTrigger() == clusterMessengerReceiver.isResendTrigger();
    }

    protected boolean canEqual(Object obj) {
        return obj instanceof ClusterMessengerReceiver;
    }

    public int hashCode() {
        int hashCode = (1 * 59) + super/*java.lang.Object*/.hashCode();
        Logger log = getLog();
        int hashCode2 = (hashCode * 59) + (log == null ? 43 : log.hashCode());
        ObjectMapper mapper = getMapper();
        int hashCode3 = (hashCode2 * 59) + (mapper == null ? 43 : mapper.hashCode());
        Map<String, AtomicLong> sequenceNumberMap = getSequenceNumberMap();
        int hashCode4 = (hashCode3 * 59) + (sequenceNumberMap == null ? 43 : sequenceNumberMap.hashCode());
        String role = getRole();
        int hashCode5 = (hashCode4 * 59) + (role == null ? 43 : role.hashCode());
        ClusterMessageRouter router = getRouter();
        int hashCode6 = (hashCode5 * 59) + (router == null ? 43 : router.hashCode());
        ClusterReadJournal readJournal = getReadJournal();
        return (((hashCode6 * 59) + (readJournal == null ? 43 : readJournal.hashCode())) * 59) + (isResendTrigger() ? 79 : 97);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 538208042:
                if (implMethodName.equals("lambda$handleResetSequenceNumber$aeaa109c$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/github/chanhohang/akka/actor/ClusterMessengerReceiver") && serializedLambda.getImplMethodSignature().equals("(Lcom/github/chanhohang/akka/message/reset/ResetSequenceNumberRequest;JLjava/util/List;Lakka/persistence/query/EventEnvelope;)V")) {
                    ResetSequenceNumberRequest resetSequenceNumberRequest = (ResetSequenceNumberRequest) serializedLambda.getCapturedArg(0);
                    long longValue = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    List list = (List) serializedLambda.getCapturedArg(2);
                    return eventEnvelope -> {
                        if (eventEnvelope.event() instanceof AcknowledgementPersist) {
                            AcknowledgementPersist acknowledgementPersist = (AcknowledgementPersist) eventEnvelope.event();
                            if (resetSequenceNumberRequest.getRole() == null || resetSequenceNumberRequest.getRole().equals(acknowledgementPersist.getTargetRole())) {
                                long sequenceNr = eventEnvelope.sequenceNr();
                                if (acknowledgementPersist.getSequenceNumber() >= longValue) {
                                    list.add(Long.valueOf(sequenceNr));
                                }
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
