package org.apache.servicecomb.pack.alpha.spec.saga.akka;

import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.cluster.sharding.ShardRegion;
import akka.japi.pf.FI;
import akka.japi.pf.FSMTransitionHandlerBuilder;
import akka.persistence.fsm.AbstractPersistentFSM;
import akka.persistence.fsm.PersistentFSM;
import akka.persistence.fsm.japi.pf.FSMStateFunctionBuilder;
import akka.persistence.fsm.japi.pf.FSMStopBuilder;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.servicecomb.pack.alpha.core.AlphaException;
import org.apache.servicecomb.pack.alpha.core.fsm.SuspendedType;
import org.apache.servicecomb.pack.alpha.core.fsm.TxState;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaAbortedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.SagaTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxAbortedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckFailedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxCompensateAckSucceedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxEndedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.TxStartedEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.base.BaseEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.CompensateAckTimeoutEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.ComponsitedCheckEvent;
import org.apache.servicecomb.pack.alpha.core.fsm.event.internal.StopEvent;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.AddTxEventDomain;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.DomainEvent;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.SagaEndedDomain;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.SagaStartedDomain;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.domain.UpdateTxEventDomain;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.SagaData;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.model.TxEntity;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SagaDataExtension;
import org.apache.servicecomb.pack.alpha.spec.saga.akka.spring.integration.akka.SpringAkkaExtension;
import org.apache.servicecomb.pack.common.Environment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.duration.Duration;

/* loaded from: input_file:BOOT-INF/lib/alpha-spec-saga-akka-0.7.0.jar:org/apache/servicecomb/pack/alpha/spec/saga/akka/SagaActor.class */
public class SagaActor extends AbstractPersistentFSM<SagaActorState, SagaData, DomainEvent> {
    private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
    private String persistenceId;
    private long sagaBeginTime;
    private long sagaEndTime;

    public static Props props(String str) {
        return Props.create((Class<?>) SagaActor.class, str);
    }

    public SagaActor(String str) {
        if (str != null) {
            this.persistenceId = str;
        } else {
            this.persistenceId = getSelf().path().name();
        }
        startWith(SagaActorState.IDLE, SagaData.builder().build());
        when((SagaActor) SagaActorState.IDLE, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(SagaStartedEvent.class, (sagaStartedEvent, sagaData) -> {
            this.sagaBeginTime = System.currentTimeMillis();
            SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaBeginCounter();
            DomainEvent sagaStartedDomain = new SagaStartedDomain(sagaStartedEvent);
            if (sagaStartedEvent.getTimeout() <= 0) {
                return goTo(SagaActorState.READY).applying(sagaStartedDomain);
            }
            sagaData.setTimeout(sagaStartedEvent.getTimeout());
            return goTo(SagaActorState.READY).applying(sagaStartedDomain).forMax(Duration.create(sagaStartedEvent.getTimeout(), TimeUnit.SECONDS));
        }));
        when((SagaActor) SagaActorState.READY, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(TxStartedEvent.class, SagaData.class, (txStartedEvent, sagaData2) -> {
            DomainEvent addTxEventDomain = new AddTxEventDomain(txStartedEvent);
            return sagaData2.getExpirationTime() != null ? goTo(SagaActorState.PARTIALLY_ACTIVE).applying(addTxEventDomain).forMax(Duration.create(sagaData2.getTimeout(), TimeUnit.MILLISECONDS)) : goTo(SagaActorState.PARTIALLY_ACTIVE).applying(addTxEventDomain);
        }).event(SagaEndedEvent.class, (FI.Apply2<P, SagaData, PersistentFSM.State<S, SagaData, DomainEvent>>) (sagaEndedEvent, sagaData3) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(sagaEndedEvent, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE));
        }).event(SagaAbortedEvent.class, (sagaAbortedEvent, sagaData4) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(sagaAbortedEvent, SagaActorState.SUSPENDED, SuspendedType.UNPREDICTABLE));
        }).event(Collections.singletonList(StateTimeout()), SagaData.class, (obj, sagaData5) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }));
        when((SagaActor) SagaActorState.PARTIALLY_ACTIVE, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(TxEndedEvent.class, SagaData.class, (txEndedEvent, sagaData6) -> {
            DomainEvent updateTxEventDomain = new UpdateTxEventDomain(txEndedEvent);
            return sagaData6.getExpirationTime() != null ? goTo(SagaActorState.PARTIALLY_COMMITTED).applying(updateTxEventDomain).forMax(Duration.create(sagaData6.getTimeout(), TimeUnit.MILLISECONDS)) : goTo(SagaActorState.PARTIALLY_COMMITTED).applying(updateTxEventDomain);
        }).event(TxStartedEvent.class, (FI.Apply2<P, SagaData, PersistentFSM.State<S, SagaData, DomainEvent>>) (txStartedEvent2, sagaData7) -> {
            DomainEvent addTxEventDomain = new AddTxEventDomain(txStartedEvent2);
            return sagaData7.getExpirationTime() != null ? stay().applying(addTxEventDomain).forMax(Duration.create(sagaData7.getTimeout(), TimeUnit.MILLISECONDS)) : stay().applying(addTxEventDomain);
        }).event(SagaTimeoutEvent.class, (sagaTimeoutEvent, sagaData8) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(sagaTimeoutEvent, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }).event(TxAbortedEvent.class, (txAbortedEvent, sagaData9) -> {
            return goTo(SagaActorState.FAILED).applying(new UpdateTxEventDomain(txAbortedEvent));
        }).event(Collections.singletonList(StateTimeout()), SagaData.class, (obj2, sagaData10) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }));
        when((SagaActor) SagaActorState.PARTIALLY_COMMITTED, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(TxStartedEvent.class, (txStartedEvent3, sagaData11) -> {
            DomainEvent addTxEventDomain = new AddTxEventDomain(txStartedEvent3);
            return sagaData11.getExpirationTime() != null ? goTo(SagaActorState.PARTIALLY_ACTIVE).applying(addTxEventDomain).forMax(Duration.create(sagaData11.getTimeout(), TimeUnit.MILLISECONDS)) : goTo(SagaActorState.PARTIALLY_ACTIVE).applying(addTxEventDomain);
        }).event(TxEndedEvent.class, (FI.Apply2<P, SagaData, PersistentFSM.State<S, SagaData, DomainEvent>>) (txEndedEvent2, sagaData12) -> {
            DomainEvent updateTxEventDomain = new UpdateTxEventDomain(txEndedEvent2);
            return sagaData12.getExpirationTime() != null ? stay().applying(updateTxEventDomain).forMax(Duration.create(sagaData12.getTimeout(), TimeUnit.MILLISECONDS)) : stay().applying(updateTxEventDomain);
        }).event(SagaTimeoutEvent.class, (sagaTimeoutEvent2, sagaData13) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(sagaTimeoutEvent2, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }).event(SagaEndedEvent.class, (sagaEndedEvent2, sagaData14) -> {
            return goTo(SagaActorState.COMMITTED).applying(new SagaEndedDomain(sagaEndedEvent2, SagaActorState.COMMITTED));
        }).event(SagaAbortedEvent.class, (sagaAbortedEvent2, sagaData15) -> {
            return goTo(SagaActorState.FAILED).applying(new SagaEndedDomain(sagaAbortedEvent2, SagaActorState.FAILED));
        }).event(TxAbortedEvent.class, (txAbortedEvent2, sagaData16) -> {
            return goTo(SagaActorState.FAILED).applying(new UpdateTxEventDomain(txAbortedEvent2));
        }).event(Collections.singletonList(StateTimeout()), SagaData.class, (obj3, sagaData17) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(null, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }));
        when((SagaActor) SagaActorState.FAILED, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(SagaTimeoutEvent.class, SagaData.class, (sagaTimeoutEvent3, sagaData18) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(sagaTimeoutEvent3, SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }).event(TxCompensateAckSucceedEvent.class, SagaData.class, (FI.Apply2<P, Q, PersistentFSM.State<S, SagaData, DomainEvent>>) (txCompensateAckSucceedEvent, sagaData19) -> {
            return stay().applying(new UpdateTxEventDomain(txCompensateAckSucceedEvent)).andThen(exec(sagaData19 -> {
                self().tell(ComponsitedCheckEvent.builder().serviceName(txCompensateAckSucceedEvent.getServiceName()).instanceId(txCompensateAckSucceedEvent.getInstanceId()).globalTxId(txCompensateAckSucceedEvent.getGlobalTxId()).localTxId(txCompensateAckSucceedEvent.getLocalTxId()).parentTxId(txCompensateAckSucceedEvent.getParentTxId()).preState(TxState.COMPENSATED_SUCCEED).build(), self());
            }));
        }).event(TxCompensateAckFailedEvent.class, SagaData.class, (txCompensateAckFailedEvent, sagaData20) -> {
            return stay().applying(new UpdateTxEventDomain(txCompensateAckFailedEvent)).andThen(exec(sagaData20 -> {
                self().tell(ComponsitedCheckEvent.builder().serviceName(txCompensateAckFailedEvent.getServiceName()).instanceId(txCompensateAckFailedEvent.getInstanceId()).globalTxId(txCompensateAckFailedEvent.getGlobalTxId()).localTxId(txCompensateAckFailedEvent.getLocalTxId()).parentTxId(txCompensateAckFailedEvent.getParentTxId()).preState(TxState.COMPENSATED_FAILED).build(), self());
            }));
        }).event(CompensateAckTimeoutEvent.class, SagaData.class, (compensateAckTimeoutEvent, sagaData21) -> {
            return stay().applying(new UpdateTxEventDomain(compensateAckTimeoutEvent)).andThen(exec(sagaData21 -> {
                self().tell(ComponsitedCheckEvent.builder().serviceName(compensateAckTimeoutEvent.getServiceName()).instanceId(compensateAckTimeoutEvent.getInstanceId()).globalTxId(compensateAckTimeoutEvent.getGlobalTxId()).localTxId(compensateAckTimeoutEvent.getLocalTxId()).parentTxId(compensateAckTimeoutEvent.getParentTxId()).preState(TxState.COMPENSATED_FAILED).build(), self());
            }));
        }).event(ComponsitedCheckEvent.class, SagaData.class, (componsitedCheckEvent, sagaData22) -> {
            if (sagaData22.getTxEntities().hasCompensationSentTx() || sagaData22.getTxEntities().hasCompensationFailedTx()) {
                return stay().applying(new UpdateTxEventDomain(componsitedCheckEvent));
            }
            if (sagaData22.getSuspendedType() == SuspendedType.COMPENSATE_FAILED) {
                return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(componsitedCheckEvent, SagaActorState.SUSPENDED, SuspendedType.COMPENSATE_FAILED));
            }
            return goTo(SagaActorState.COMPENSATED).applying(new SagaEndedDomain(componsitedCheckEvent, SagaActorState.COMPENSATED));
        }).event(SagaAbortedEvent.class, SagaData.class, (sagaAbortedEvent3, sagaData23) -> {
            if (sagaData23.getTxEntities().hasCommittedTx()) {
                return stay().applying(new SagaEndedDomain(sagaAbortedEvent3, SagaActorState.FAILED));
            }
            if (sagaData23.getTxEntities().hasCompensationSentTx()) {
                return stay().applying(new SagaEndedDomain(sagaAbortedEvent3, SagaActorState.FAILED));
            }
            return goTo(SagaActorState.COMPENSATED).applying(new SagaEndedDomain(sagaAbortedEvent3, SagaActorState.COMPENSATED));
        }).event(TxStartedEvent.class, SagaData.class, (txStartedEvent4, sagaData24) -> {
            return stay().applying(new AddTxEventDomain(txStartedEvent4));
        }).event(TxEndedEvent.class, SagaData.class, (txEndedEvent3, sagaData25) -> {
            UpdateTxEventDomain updateTxEventDomain = new UpdateTxEventDomain(txEndedEvent3);
            return stay().applying(updateTxEventDomain).andThen(exec(sagaData25 -> {
                compensation(updateTxEventDomain, sagaData25.getTxEntities().get(txEndedEvent3.getLocalTxId()), sagaData25);
            }));
        }).event(Arrays.asList(StateTimeout()), SagaData.class, (obj4, sagaData26) -> {
            return goTo(SagaActorState.SUSPENDED).applying(new SagaEndedDomain(SagaActorState.SUSPENDED, SuspendedType.TIMEOUT));
        }));
        when((SagaActor) SagaActorState.COMMITTED, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(StopEvent.class, (stopEvent, sagaData27) -> {
            beforeStop(stopEvent, (SagaActorState) stateName(), sagaData27);
            return stop();
        }));
        when((SagaActor) SagaActorState.SUSPENDED, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(StopEvent.class, (stopEvent2, sagaData28) -> {
            beforeStop(stopEvent2, (SagaActorState) stateName(), sagaData28);
            return stop();
        }));
        when((SagaActor) SagaActorState.COMPENSATED, (FSMStateFunctionBuilder<SagaActor, D, E>) matchEvent(StopEvent.class, (stopEvent3, sagaData29) -> {
            beforeStop(stopEvent3, (SagaActorState) stateName(), sagaData29);
            return stop();
        }));
        whenUnhandled((FSMStateFunctionBuilder) matchAnyEvent((obj5, sagaData30) -> {
            if (obj5 instanceof BaseEvent) {
                LOG.debug("Unhandled event {}", obj5);
            }
            return stay();
        }));
        onTransition((FSMTransitionHandlerBuilder) matchState((Object) null, (Object) null, (FI.UnitApply2<Object, Object>) (sagaActorState, sagaActorState2) -> {
            if (stateData().getGlobalTxId() != null) {
                stateData().setLastState(sagaActorState2);
                SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()).putSagaData(stateData().getGlobalTxId(), stateData());
            }
            if (LOG.isDebugEnabled()) {
                LOG.debug("transition [{}] {} -> {}", stateData().getGlobalTxId(), sagaActorState, sagaActorState2);
            }
            if (sagaActorState2 == SagaActorState.COMMITTED || sagaActorState2 == SagaActorState.SUSPENDED || sagaActorState2 == SagaActorState.COMPENSATED) {
                self().tell(StopEvent.builder().build(), self());
            }
        }));
        onTermination((FSMStopBuilder) matchStop(Normal(), (sagaActorState3, sagaData31) -> {
            LOG.info("stopped [{}] {}", sagaData31.getGlobalTxId(), sagaActorState3);
        }));
    }

    private void beforeStop(BaseEvent baseEvent, SagaActorState sagaActorState, SagaData sagaData) {
        if (LOG.isDebugEnabled()) {
            LOG.debug("stop [{}] {}", sagaData.getGlobalTxId(), sagaActorState);
        }
        try {
            this.sagaEndTime = System.currentTimeMillis();
            sagaData.setLastState(sagaActorState);
            sagaData.setEndTime(new Date());
            sagaData.setTerminated(true);
            SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(getContext().getSystem()).stopSagaData(sagaData.getGlobalTxId(), sagaData);
            SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaEndCounter();
            SagaDataExtension.SAGA_DATA_EXTENSION_PROVIDER.get(context().system()).doSagaAvgTime(this.sagaEndTime - this.sagaBeginTime);
            getContext().getParent().tell(new ShardRegion.Passivate(PoisonPill.getInstance()), getSelf());
            deleteMessages(lastSequenceNr());
            deleteSnapshot(snapshotSequenceNr());
        } catch (Exception e) {
            LOG.error("stop [{}] fail", sagaData.getGlobalTxId());
            throw e;
        }
    }

    @Override // akka.persistence.fsm.PersistentFSM
    public SagaData applyEvent(DomainEvent domainEvent, SagaData sagaData) {
        LOG.debug("apply domain event {}", domainEvent.getEvent());
        try {
            if (recoveryRunning()) {
                LOG.info("recovery {}", domainEvent.getEvent());
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("persistence {}", domainEvent.getEvent());
            }
            if (domainEvent.getEvent() != null && !(domainEvent.getEvent() instanceof ComponsitedCheckEvent)) {
                sagaData.logEvent(domainEvent.getEvent());
            }
            if (domainEvent instanceof SagaStartedDomain) {
                SagaStartedDomain sagaStartedDomain = (SagaStartedDomain) domainEvent;
                sagaData.setServiceName(sagaStartedDomain.getEvent().getServiceName());
                sagaData.setInstanceId(sagaStartedDomain.getEvent().getInstanceId());
                sagaData.setGlobalTxId(sagaStartedDomain.getEvent().getGlobalTxId());
                sagaData.setBeginTime(sagaStartedDomain.getEvent().getCreateTime());
                sagaData.setExpirationTime(sagaStartedDomain.getExpirationTime());
            } else if (domainEvent instanceof AddTxEventDomain) {
                AddTxEventDomain addTxEventDomain = (AddTxEventDomain) domainEvent;
                if (sagaData.getTxEntities().exists(addTxEventDomain.getEvent().getLocalTxId())) {
                    LOG.warn("TxEntity {} already exists", addTxEventDomain.getEvent().getLocalTxId());
                } else {
                    TxEntity build = TxEntity.builder().serviceName(addTxEventDomain.getEvent().getServiceName()).instanceId(addTxEventDomain.getEvent().getInstanceId()).globalTxId(addTxEventDomain.getEvent().getGlobalTxId()).localTxId(addTxEventDomain.getEvent().getLocalTxId()).parentTxId(addTxEventDomain.getEvent().getParentTxId()).compensationMethod(addTxEventDomain.getCompensationMethod()).payloads(addTxEventDomain.getPayloads()).state(addTxEventDomain.getState()).reverseRetries(addTxEventDomain.getReverseRetries()).reverseTimeout(addTxEventDomain.getReverseTimeout()).retryDelayInMilliseconds(addTxEventDomain.getRetryDelayInMilliseconds()).beginTime(addTxEventDomain.getEvent().getCreateTime()).build();
                    sagaData.getTxEntities().put(build.getLocalTxId(), build);
                }
            } else if (domainEvent instanceof UpdateTxEventDomain) {
                UpdateTxEventDomain updateTxEventDomain = (UpdateTxEventDomain) domainEvent;
                TxEntity txEntity = sagaData.getTxEntities().get(updateTxEventDomain.getLocalTxId());
                txEntity.setEndTime(updateTxEventDomain.getEvent().getCreateTime());
                if (updateTxEventDomain.getState() == TxState.COMMITTED) {
                    txEntity.setState(updateTxEventDomain.getState());
                } else if (updateTxEventDomain.getState() == TxState.FAILED) {
                    txEntity.setState(updateTxEventDomain.getState());
                    txEntity.setThrowablePayLoads(updateTxEventDomain.getThrowablePayLoads());
                    sagaData.getTxEntities().forEachReverse((str, txEntity2) -> {
                        if (txEntity2.getState() == TxState.COMMITTED && compensation(updateTxEventDomain, txEntity2, sagaData)) {
                        }
                    });
                } else if (updateTxEventDomain.getState() == TxState.COMPENSATED_SUCCEED) {
                    sagaData.getCompensationRunningCounter().decrementAndGet();
                    txEntity.setState(TxState.COMPENSATED_SUCCEED);
                    LOG.info("compensate is succeed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId());
                } else if (updateTxEventDomain.getState() == TxState.COMPENSATED_FAILED) {
                    sagaData.getCompensationRunningCounter().decrementAndGet();
                    txEntity.setState(TxState.COMPENSATED_FAILED);
                    txEntity.setThrowablePayLoads(updateTxEventDomain.getThrowablePayLoads());
                    if (txEntity.getReverseRetries() <= 0 || txEntity.getRetriesCounter().get() >= txEntity.getReverseRetries()) {
                        sagaData.setSuspendedType(SuspendedType.COMPENSATE_FAILED);
                        self().tell(ComponsitedCheckEvent.builder().serviceName(txEntity.getServiceName()).instanceId(txEntity.getInstanceId()).globalTxId(txEntity.getGlobalTxId()).localTxId(txEntity.getLocalTxId()).preState(TxState.COMPENSATED_FAILED).parentTxId(txEntity.getParentTxId()).build(), self());
                    } else {
                        sagaData.getTxEntities().forEachReverse((str2, txEntity3) -> {
                            if ((txEntity3.getState() == TxState.COMMITTED || txEntity3.getState() == TxState.COMPENSATED_FAILED) && !compensation(updateTxEventDomain, txEntity3, sagaData)) {
                            }
                        });
                    }
                }
            } else if (domainEvent instanceof SagaEndedDomain) {
                SagaEndedDomain sagaEndedDomain = (SagaEndedDomain) domainEvent;
                if (sagaEndedDomain.getState() == SagaActorState.FAILED) {
                    sagaData.getTxEntities().forEachReverse((str3, txEntity4) -> {
                        if (txEntity4.getState() == TxState.COMMITTED && compensation(sagaEndedDomain, txEntity4, sagaData)) {
                        }
                    });
                } else if (sagaEndedDomain.getState() == SagaActorState.SUSPENDED) {
                    sagaData.setEndTime(domainEvent.getEvent() != null ? domainEvent.getEvent().getCreateTime() : new Date());
                    sagaData.setSuspendedType(sagaEndedDomain.getSuspendedType());
                } else if (sagaEndedDomain.getState() == SagaActorState.COMPENSATED) {
                    sagaData.setEndTime(domainEvent.getEvent() != null ? domainEvent.getEvent().getCreateTime() : new Date());
                } else if (sagaEndedDomain.getState() == SagaActorState.COMMITTED) {
                    sagaData.setEndTime(domainEvent.getEvent() != null ? domainEvent.getEvent().getCreateTime() : new Date());
                }
            }
        } catch (Exception e) {
            LOG.error("apply {}", domainEvent.getEvent(), e);
            LOG.error(e.getMessage(), (Throwable) e);
            beforeStop(domainEvent.getEvent(), SagaActorState.SUSPENDED, sagaData);
            stop();
        }
        return sagaData;
    }

    @Override // akka.persistence.fsm.AbstractPersistentFSM, akka.persistence.fsm.PersistentFSM
    public void onRecoveryCompleted() {
        if (stateName() != SagaActorState.IDLE) {
            LOG.info("recovery completed [{}] state={}", stateData().getGlobalTxId(), stateName());
        }
    }

    @Override // akka.persistence.fsm.AbstractPersistentFSM
    public Class<DomainEvent> domainEventClass() {
        return DomainEvent.class;
    }

    @Override // akka.persistence.PersistenceIdentity
    public String persistenceId() {
        return this.persistenceId;
    }

    private boolean compensation(DomainEvent domainEvent, TxEntity txEntity, SagaData sagaData) {
        sagaData.getCompensationRunningCounter().incrementAndGet();
        txEntity.setState(TxState.COMPENSATION_SENT);
        try {
            LOG.info("compensate {} {} [{}] {}", txEntity.getServiceName(), txEntity.getInstanceId(), txEntity.getGlobalTxId(), txEntity.getLocalTxId());
            SpringAkkaExtension.SPRING_EXTENSION_PROVIDER.get(context().system()).compensate(txEntity);
            return true;
        } catch (Exception e) {
            LOG.error("compensate failed [{}] {}", txEntity.getGlobalTxId(), txEntity.getLocalTxId(), e);
            if (txEntity.getReverseRetries() > 0 && txEntity.getRetriesCounter().incrementAndGet() < txEntity.getReverseRetries()) {
                LOG.info("Retry compensate {}/{} [{}] {} after {} ms", Integer.valueOf(txEntity.getRetriesCounter().get() + 1), Integer.valueOf(txEntity.getReverseRetries()), txEntity.getGlobalTxId(), txEntity.getLocalTxId(), Integer.valueOf(txEntity.getRetryDelayInMilliseconds()));
                try {
                    Thread.sleep(txEntity.getRetryDelayInMilliseconds());
                } catch (InterruptedException e2) {
                    LOG.error(e2.getMessage(), (Throwable) e2);
                }
            }
            if (e instanceof TimeoutException) {
                StringWriter stringWriter = new StringWriter();
                e.printStackTrace(new PrintWriter(stringWriter));
                String stringWriter2 = stringWriter.toString();
                if (stringWriter2.length() > Environment.getInstance().getPayloadsMaxLength()) {
                    stringWriter2 = stringWriter2.substring(0, Environment.getInstance().getPayloadsMaxLength());
                }
                self().tell(CompensateAckTimeoutEvent.builder().createTime(new Date(System.currentTimeMillis())).globalTxId(txEntity.getGlobalTxId()).parentTxId(txEntity.getParentTxId()).localTxId(txEntity.getLocalTxId()).serviceName(txEntity.getServiceName()).instanceId(txEntity.getInstanceId()).payloads(stringWriter2.getBytes()).build(), self());
            }
            if (!(e instanceof AlphaException)) {
                return false;
            }
            self().tell(TxCompensateAckFailedEvent.builder().serviceName(txEntity.getServiceName()).instanceId(txEntity.getInstanceId()).globalTxId(txEntity.getGlobalTxId()).localTxId(txEntity.getLocalTxId()).parentTxId(txEntity.getParentTxId()).payloads(e.getMessage().getBytes()).build(), self());
            return false;
        }
    }
}
