package com.github.chanhohang.akka.actor;

import akka.Done;
import akka.actor.ActorPath;
import akka.actor.ActorSelection;
import akka.japi.Function;
import akka.persistence.RecoveryCompleted;
import akka.persistence.UntypedPersistentActorWithAtLeastOnceDelivery;
import akka.persistence.jdbc.query.javadsl.JdbcReadJournal;
import akka.persistence.query.PersistenceQuery;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.Source;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.chanhohang.akka.AkkaClusterMessengerConstants;
import com.github.chanhohang.akka.cluster.IClusterStore;
import com.github.chanhohang.akka.message.Acknowledgement;
import com.github.chanhohang.akka.message.BaseMessageEnvelope;
import com.github.chanhohang.akka.message.MessageEnvelopeImpl;
import com.github.chanhohang.akka.message.persist.MessagePersist;
import com.github.chanhohang.akka.message.resend.GapFillMessage;
import com.github.chanhohang.akka.message.resend.ResendComplete;
import com.github.chanhohang.akka.message.resend.ResendRequest;
import com.github.chanhohang.akka.message.reset.ResetSequenceNumberRequest;
import java.beans.ConstructorProperties;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

@Component(AkkaClusterMessengerConstants.PersistentId.Cluster.Sender)
@Lazy
/* loaded from: input_file:com/github/chanhohang/akka/actor/ClusterMessengerSender.class */
public class ClusterMessengerSender extends UntypedPersistentActorWithAtLeastOnceDelivery {
    private Logger log;

    @Autowired
    private IClusterStore clusterStore;
    private ObjectMapper objectMapper;
    private AtomicLong sequenceNumber;
    private Map<String, Boolean> resendMap;

    @Value("${actp.akka.role}")
    private String role;

    public String persistenceId() {
        return this.role + AkkaClusterMessengerConstants.PersistentId.SEPARATOR + AkkaClusterMessengerConstants.PersistentId.Cluster.Sender;
    }

    public void onReceiveCommand(Object obj) throws Throwable {
        this.log.info("onReceiveCommand {}", obj);
        try {
            if (obj instanceof Acknowledgement) {
                confirmDelivery(((Acknowledgement) obj).getDeliveryId());
            } else if (obj instanceof ResetSequenceNumberRequest) {
                handleResetSequenceNumber((ResetSequenceNumberRequest) obj);
            } else if (obj instanceof ResendRequest) {
                handleResendRequest((ResendRequest) obj);
            } else if (obj instanceof BaseMessageEnvelope) {
                handleGenericEvent((BaseMessageEnvelope) obj);
            } else {
                unhandled(obj);
            }
        } catch (Exception e) {
            this.log.error("Unexpected Exception", e);
        }
    }

    private void handleResetSequenceNumber(ResetSequenceNumberRequest resetSequenceNumberRequest) {
        long sequenceNumber = resetSequenceNumberRequest.getSequenceNumber();
        Source currentEventsByPersistenceId = PersistenceQuery.get(getContext().system()).getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()).currentEventsByPersistenceId(persistenceId(), 0L, Long.MAX_VALUE);
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        this.sequenceNumber.set(sequenceNumber);
        ArrayList arrayList = new ArrayList();
        currentEventsByPersistenceId.runForeach(eventEnvelope -> {
            if (eventEnvelope.event() instanceof MessagePersist) {
                MessagePersist messagePersist = (MessagePersist) eventEnvelope.event();
                long sequenceNr = eventEnvelope.sequenceNr();
                atomicLong2.set(sequenceNr);
                if (messagePersist.getSequenceNumber() >= sequenceNumber) {
                    arrayList.add(Long.valueOf(sequenceNr));
                    atomicLong.set(sequenceNr);
                }
            }
        }, ActorMaterializer.create(getContext().system())).thenRun(() -> {
            this.log.info("Delete Event Store {}, lastSequenceNumber {}", arrayList);
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                deleteMessages(((Long) it.next()).longValue());
            }
            for (final ActorSelection actorSelection : this.clusterStore.getActor(null, AkkaClusterMessengerConstants.PersistentId.Cluster.Receiver)) {
                deliver(actorSelection, new Function<Long, Object>() { // from class: com.github.chanhohang.akka.actor.ClusterMessengerSender.1
                    public Object apply(Long l) throws Exception {
                        ResetSequenceNumberRequest resetSequenceNumberRequest2 = new ResetSequenceNumberRequest();
                        resetSequenceNumberRequest2.setSequenceNumber(resetSequenceNumberRequest.getSequenceNumber());
                        resetSequenceNumberRequest2.setDeliveryId(l.longValue());
                        resetSequenceNumberRequest2.setRole(ClusterMessengerSender.this.getRole());
                        ClusterMessengerSender.this.log.info("sending ResetSequenceNumber to destination {}, Message={}", actorSelection, resetSequenceNumberRequest2);
                        return resetSequenceNumberRequest2;
                    }
                });
            }
        });
    }

    private void handleResendRequest(ResendRequest resendRequest) throws InterruptedException, ExecutionException {
        String targetRole = resendRequest.getTargetRole();
        if (this.resendMap.get(targetRole) != null && this.resendMap.get(targetRole).booleanValue()) {
            this.log.info("resend in progress. {}, role {}", resendRequest, targetRole);
            return;
        }
        this.log.info("start handle resend. {}, role {}", resendRequest, resendRequest.getTargetRole());
        this.resendMap.put(targetRole, Boolean.TRUE);
        long startSequenceNumber = resendRequest.getStartSequenceNumber();
        Source currentEventsByPersistenceId = PersistenceQuery.get(getContext().system()).getReadJournalFor(JdbcReadJournal.class, JdbcReadJournal.Identifier()).currentEventsByPersistenceId(persistenceId(), 0L, Long.MAX_VALUE);
        ActorMaterializer create = ActorMaterializer.create(getContext().system());
        ActorPath path = getSender().path();
        AtomicLong atomicLong = new AtomicLong();
        CompletionStage runForeach = currentEventsByPersistenceId.runForeach(eventEnvelope -> {
            if (!(eventEnvelope.event() instanceof MessagePersist)) {
                this.log.info("Unknown event {}", eventEnvelope);
                return;
            }
            final MessagePersist messagePersist = (MessagePersist) eventEnvelope.event();
            this.log.info("Check Message {}", messagePersist);
            atomicLong.set(messagePersist.getSequenceNumber());
            if (messagePersist.getSequenceNumber() < startSequenceNumber) {
                this.log.info("Sequence Number {} is already sent.", Long.valueOf(messagePersist.getSequenceNumber()));
            } else if (isTarget(messagePersist, targetRole)) {
                deliver(path, new Function<Long, Object>() { // from class: com.github.chanhohang.akka.actor.ClusterMessengerSender.2
                    public Object apply(Long l) throws Exception {
                        MessageEnvelopeImpl send = ClusterMessengerSender.this.send(l.longValue(), messagePersist);
                        ClusterMessengerSender.this.log.info("sending message to destination {}, Message={}", path, send);
                        return send;
                    }
                });
            } else {
                deliver(path, new Function<Long, Object>() { // from class: com.github.chanhohang.akka.actor.ClusterMessengerSender.3
                    public Object apply(Long l) throws Exception {
                        GapFillMessage gapFillMessage = new GapFillMessage();
                        gapFillMessage.setSequenceNumber(messagePersist.getSequenceNumber());
                        gapFillMessage.setDeliveryId(l.longValue());
                        gapFillMessage.setRole(ClusterMessengerSender.this.getRole());
                        ClusterMessengerSender.this.log.info("sending message to destination {}, Message={}", path, gapFillMessage);
                        return gapFillMessage;
                    }
                });
            }
        }, create);
        runForeach.thenRun(() -> {
            ResendComplete resendComplete = new ResendComplete();
            resendComplete.setRole(getRole());
            resendComplete.setEndSequenceNumber(atomicLong.get());
            deliver(path, l -> {
                this.resendMap.put(targetRole, Boolean.FALSE);
                resendComplete.setDeliveryId(l.longValue());
                return resendComplete;
            });
        });
        this.log.info("Resend Completed. {}", (Done) runForeach.toCompletableFuture().get());
    }

    private void handleGenericEvent(BaseMessageEnvelope baseMessageEnvelope) {
        MessagePersist messagePersist = new MessagePersist();
        messagePersist.setRole(getRole());
        messagePersist.setPayload(baseMessageEnvelope);
        messagePersist.setPayloadClass(baseMessageEnvelope.getClass());
        messagePersist.setSequenceNumber(this.sequenceNumber.incrementAndGet());
        String targetRole = baseMessageEnvelope.getTargetRole();
        if (targetRole != null) {
            messagePersist.setTargetRole(targetRole);
        }
        persist(messagePersist, messagePersist2 -> {
            for (final ActorSelection actorSelection : this.clusterStore.getActor(targetRole, AkkaClusterMessengerConstants.PersistentId.Cluster.Receiver)) {
                deliver(actorSelection, new Function<Long, Object>() { // from class: com.github.chanhohang.akka.actor.ClusterMessengerSender.4
                    public Object apply(Long l) throws Exception {
                        MessageEnvelopeImpl send = ClusterMessengerSender.this.send(l.longValue(), messagePersist);
                        ClusterMessengerSender.this.log.info("sending message to destination {}, Message={}", actorSelection, send);
                        return send;
                    }
                });
            }
        });
    }

    public void onReceiveRecover(Object obj) throws Throwable {
        this.log.info("onReceiveRecover {}", obj);
        if (obj instanceof MessagePersist) {
            this.sequenceNumber.set(((MessagePersist) obj).getSequenceNumber());
        } else if (obj instanceof RecoveryCompleted) {
            this.log.info("sequenceNumber {}", this.sequenceNumber);
        }
    }

    private boolean isTarget(MessagePersist messagePersist, String str) {
        return messagePersist.getTargetRole() == null || messagePersist.getTargetRole().equals(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public MessageEnvelopeImpl send(long j, MessagePersist messagePersist) throws JsonProcessingException {
        Object payload = messagePersist.getPayload();
        Class<?> payloadClass = messagePersist.getPayloadClass();
        String writeValueAsString = this.objectMapper.writeValueAsString(payload);
        String role = messagePersist.getRole();
        String targetRole = messagePersist.getTargetRole();
        MessageEnvelopeImpl messageEnvelopeImpl = new MessageEnvelopeImpl(j);
        messageEnvelopeImpl.setSequenceNumber(messagePersist.getSequenceNumber());
        messageEnvelopeImpl.setPayloadClass(payloadClass);
        messageEnvelopeImpl.setPayload(writeValueAsString);
        messageEnvelopeImpl.setRole(role);
        messageEnvelopeImpl.setTargetRole(targetRole);
        return messageEnvelopeImpl;
    }

    public Logger getLog() {
        return this.log;
    }

    public IClusterStore getClusterStore() {
        return this.clusterStore;
    }

    public ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    public AtomicLong getSequenceNumber() {
        return this.sequenceNumber;
    }

    public Map<String, Boolean> getResendMap() {
        return this.resendMap;
    }

    public String getRole() {
        return this.role;
    }

    public void setLog(Logger logger) {
        this.log = logger;
    }

    public void setClusterStore(IClusterStore iClusterStore) {
        this.clusterStore = iClusterStore;
    }

    public void setObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public void setSequenceNumber(AtomicLong atomicLong) {
        this.sequenceNumber = atomicLong;
    }

    public void setResendMap(Map<String, Boolean> map) {
        this.resendMap = map;
    }

    public void setRole(String str) {
        this.role = str;
    }

    public String toString() {
        return "ClusterMessengerSender(log=" + getLog() + ", clusterStore=" + getClusterStore() + ", objectMapper=" + getObjectMapper() + ", sequenceNumber=" + getSequenceNumber() + ", resendMap=" + getResendMap() + ", role=" + getRole() + ")";
    }

    @ConstructorProperties({"log", "clusterStore", "objectMapper", "sequenceNumber", "resendMap", "role"})
    public ClusterMessengerSender(Logger logger, IClusterStore iClusterStore, ObjectMapper objectMapper, AtomicLong atomicLong, Map<String, Boolean> map, String str) {
        this.log = LoggerFactory.getLogger(getClass());
        this.objectMapper = new ObjectMapper();
        this.sequenceNumber = new AtomicLong();
        this.resendMap = new HashMap();
        this.log = logger;
        this.clusterStore = iClusterStore;
        this.objectMapper = objectMapper;
        this.sequenceNumber = atomicLong;
        this.resendMap = map;
        this.role = str;
    }

    public ClusterMessengerSender() {
        this.log = LoggerFactory.getLogger(getClass());
        this.objectMapper = new ObjectMapper();
        this.sequenceNumber = new AtomicLong();
        this.resendMap = new HashMap();
    }

    public boolean equals(Object obj) {
        if (obj == this) {
            return true;
        }
        if (!(obj instanceof ClusterMessengerSender)) {
            return false;
        }
        ClusterMessengerSender clusterMessengerSender = (ClusterMessengerSender) obj;
        if (!clusterMessengerSender.canEqual(this) || !super/*java.lang.Object*/.equals(obj)) {
            return false;
        }
        Logger log = getLog();
        Logger log2 = clusterMessengerSender.getLog();
        if (log == null) {
            if (log2 != null) {
                return false;
            }
        } else if (!log.equals(log2)) {
            return false;
        }
        IClusterStore clusterStore = getClusterStore();
        IClusterStore clusterStore2 = clusterMessengerSender.getClusterStore();
        if (clusterStore == null) {
            if (clusterStore2 != null) {
                return false;
            }
        } else if (!clusterStore.equals(clusterStore2)) {
            return false;
        }
        ObjectMapper objectMapper = getObjectMapper();
        ObjectMapper objectMapper2 = clusterMessengerSender.getObjectMapper();
        if (objectMapper == null) {
            if (objectMapper2 != null) {
                return false;
            }
        } else if (!objectMapper.equals(objectMapper2)) {
            return false;
        }
        AtomicLong sequenceNumber = getSequenceNumber();
        AtomicLong sequenceNumber2 = clusterMessengerSender.getSequenceNumber();
        if (sequenceNumber == null) {
            if (sequenceNumber2 != null) {
                return false;
            }
        } else if (!sequenceNumber.equals(sequenceNumber2)) {
            return false;
        }
        Map<String, Boolean> resendMap = getResendMap();
        Map<String, Boolean> resendMap2 = clusterMessengerSender.getResendMap();
        if (resendMap == null) {
            if (resendMap2 != null) {
                return false;
            }
        } else if (!resendMap.equals(resendMap2)) {
            return false;
        }
        String role = getRole();
        String role2 = clusterMessengerSender.getRole();
        return role == null ? role2 == null : role.equals(role2);
    }

    protected boolean canEqual(Object obj) {
        return obj instanceof ClusterMessengerSender;
    }

    public int hashCode() {
        int hashCode = (1 * 59) + super/*java.lang.Object*/.hashCode();
        Logger log = getLog();
        int hashCode2 = (hashCode * 59) + (log == null ? 43 : log.hashCode());
        IClusterStore clusterStore = getClusterStore();
        int hashCode3 = (hashCode2 * 59) + (clusterStore == null ? 43 : clusterStore.hashCode());
        ObjectMapper objectMapper = getObjectMapper();
        int hashCode4 = (hashCode3 * 59) + (objectMapper == null ? 43 : objectMapper.hashCode());
        AtomicLong sequenceNumber = getSequenceNumber();
        int hashCode5 = (hashCode4 * 59) + (sequenceNumber == null ? 43 : sequenceNumber.hashCode());
        Map<String, Boolean> resendMap = getResendMap();
        int hashCode6 = (hashCode5 * 59) + (resendMap == null ? 43 : resendMap.hashCode());
        String role = getRole();
        return (hashCode6 * 59) + (role == null ? 43 : role.hashCode());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -771182516:
                if (implMethodName.equals("lambda$handleResendRequest$3424e8b6$1")) {
                    z = false;
                    break;
                }
                break;
            case 124756346:
                if (implMethodName.equals("lambda$handleResetSequenceNumber$b825831$1")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 7 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/github/chanhohang/akka/actor/ClusterMessengerSender") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;JLjava/lang/String;Lakka/actor/ActorPath;Lakka/persistence/query/EventEnvelope;)V")) {
                    ClusterMessengerSender clusterMessengerSender = (ClusterMessengerSender) serializedLambda.getCapturedArg(0);
                    AtomicLong atomicLong = (AtomicLong) serializedLambda.getCapturedArg(1);
                    long longValue = ((Long) serializedLambda.getCapturedArg(2)).longValue();
                    String str = (String) serializedLambda.getCapturedArg(3);
                    ActorPath actorPath = (ActorPath) serializedLambda.getCapturedArg(4);
                    return eventEnvelope -> {
                        if (!(eventEnvelope.event() instanceof MessagePersist)) {
                            this.log.info("Unknown event {}", eventEnvelope);
                            return;
                        }
                        final MessagePersist messagePersist = (MessagePersist) eventEnvelope.event();
                        this.log.info("Check Message {}", messagePersist);
                        atomicLong.set(messagePersist.getSequenceNumber());
                        if (messagePersist.getSequenceNumber() < longValue) {
                            this.log.info("Sequence Number {} is already sent.", Long.valueOf(messagePersist.getSequenceNumber()));
                        } else if (isTarget(messagePersist, str)) {
                            deliver(actorPath, new Function<Long, Object>() { // from class: com.github.chanhohang.akka.actor.ClusterMessengerSender.2
                                public Object apply(Long l) throws Exception {
                                    MessageEnvelopeImpl send = ClusterMessengerSender.this.send(l.longValue(), messagePersist);
                                    ClusterMessengerSender.this.log.info("sending message to destination {}, Message={}", actorPath, send);
                                    return send;
                                }
                            });
                        } else {
                            deliver(actorPath, new Function<Long, Object>() { // from class: com.github.chanhohang.akka.actor.ClusterMessengerSender.3
                                public Object apply(Long l) throws Exception {
                                    GapFillMessage gapFillMessage = new GapFillMessage();
                                    gapFillMessage.setSequenceNumber(messagePersist.getSequenceNumber());
                                    gapFillMessage.setDeliveryId(l.longValue());
                                    gapFillMessage.setRole(ClusterMessengerSender.this.getRole());
                                    ClusterMessengerSender.this.log.info("sending message to destination {}, Message={}", actorPath, gapFillMessage);
                                    return gapFillMessage;
                                }
                            });
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("akka/japi/function/Procedure") && serializedLambda.getFunctionalInterfaceMethodName().equals("apply") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)V") && serializedLambda.getImplClass().equals("com/github/chanhohang/akka/actor/ClusterMessengerSender") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicLong;JLjava/util/List;Ljava/util/concurrent/atomic/AtomicLong;Lakka/persistence/query/EventEnvelope;)V")) {
                    AtomicLong atomicLong2 = (AtomicLong) serializedLambda.getCapturedArg(0);
                    long longValue2 = ((Long) serializedLambda.getCapturedArg(1)).longValue();
                    List list = (List) serializedLambda.getCapturedArg(2);
                    AtomicLong atomicLong3 = (AtomicLong) serializedLambda.getCapturedArg(3);
                    return eventEnvelope2 -> {
                        if (eventEnvelope2.event() instanceof MessagePersist) {
                            MessagePersist messagePersist = (MessagePersist) eventEnvelope2.event();
                            long sequenceNr = eventEnvelope2.sequenceNr();
                            atomicLong2.set(sequenceNr);
                            if (messagePersist.getSequenceNumber() >= longValue2) {
                                list.add(Long.valueOf(sequenceNr));
                                atomicLong3.set(sequenceNr);
                            }
                        }
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
