package org.hibernate.search.mapper.orm.outboxpolling.event.impl;

import java.lang.invoke.MethodHandles;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.List;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import org.hibernate.search.engine.reporting.FailureHandler;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.Agent;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.AgentPersister;
import org.hibernate.search.mapper.orm.outboxpolling.cluster.impl.AgentReference;
import org.hibernate.search.mapper.orm.outboxpolling.logging.impl.Log;
import org.hibernate.search.util.common.logging.impl.LoggerFactory;
import org.hibernate.search.util.common.spi.ToStringTreeAppendable;
import org.hibernate.search.util.common.spi.ToStringTreeAppender;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/hibernate/search/mapper/orm/outboxpolling/event/impl/AbstractAgentClusterLink.class */
public abstract class AbstractAgentClusterLink<R> implements ToStringTreeAppendable {
    private static final Log log = (Log) LoggerFactory.make(Log.class, MethodHandles.lookup());
    protected final FailureHandler failureHandler;
    protected final Clock clock;
    protected final Duration pollingInterval;
    protected final Duration pulseInterval;
    protected final Duration pulseExpiration;
    private final AgentPersister agentPersister;

    /* loaded from: input_file:org/hibernate/search/mapper/orm/outboxpolling/event/impl/AbstractAgentClusterLink$WriteAction.class */
    protected interface WriteAction<R> {
        R applyAndReturnInstructions(Instant instant, Agent agent, AgentPersister agentPersister);
    }

    public AbstractAgentClusterLink(AgentPersister agentPersister, FailureHandler failureHandler, Clock clock, Duration duration, Duration duration2, Duration duration3) {
        this.agentPersister = agentPersister;
        this.failureHandler = failureHandler;
        this.clock = clock;
        this.pollingInterval = duration;
        this.pulseInterval = duration2;
        this.pulseExpiration = duration3;
    }

    public String toString() {
        return toStringTree();
    }

    public void appendTo(ToStringTreeAppender toStringTreeAppender) {
        toStringTreeAppender.attribute("agentPersister", this.agentPersister).attribute("pollingInterval", this.pollingInterval).attribute("pulseInterval", this.pulseInterval).attribute("pulseExpiration", this.pulseExpiration);
    }

    public final R pulse(AgentClusterLinkContext agentClusterLinkContext) {
        Agent ensureRegistered = ensureRegistered(agentClusterLinkContext);
        List<Agent> findAllOrderById = agentClusterLinkContext.agentRepository().findAllOrderById();
        Instant instant = this.clock.instant();
        log.tracef("Agent '%s': starting pulse at %s with self = %s, all agents = %s", new Object[]{selfReference(), instant, ensureRegistered, findAllOrderById});
        List<Agent> list = (List) findAllOrderById.stream().filter(Predicate.isEqual(ensureRegistered).negate()).filter(agent -> {
            return agent.getExpiration().isBefore(instant);
        }).collect(Collectors.toList());
        if (!list.isEmpty()) {
            log.removingTimedOutAgents(selfReference(), list);
            agentClusterLinkContext.agentRepository().delete(list);
            log.infof("Agent '%s': reassessing the new situation in the next pulse", selfReference());
            return instructCommitAndRetryPulseAfterDelay(instant, this.pollingInterval);
        }
        WriteAction<R> doPulse = doPulse(findAllOrderById, ensureRegistered);
        agentClusterLinkContext.commitAndBeginNewTransaction();
        Instant instant2 = this.clock.instant();
        Agent findSelfExpectRegistered = findSelfExpectRegistered(agentClusterLinkContext);
        findSelfExpectRegistered.setExpiration(instant2.plus((TemporalAmount) this.pulseExpiration));
        R applyAndReturnInstructions = doPulse.applyAndReturnInstructions(instant2, findSelfExpectRegistered, this.agentPersister);
        log.tracef("Agent '%s': ending pulse at %s with self = %s", selfReference(), instant2, findSelfExpectRegistered);
        return applyAndReturnInstructions;
    }

    private Agent ensureRegistered(AgentClusterLinkContext agentClusterLinkContext) {
        Agent findSelf = this.agentPersister.findSelf(agentClusterLinkContext.agentRepository());
        if (findSelf == null) {
            this.agentPersister.createSelf(agentClusterLinkContext.agentRepository(), this.clock.instant().plus((TemporalAmount) this.pulseExpiration));
            agentClusterLinkContext.commitAndBeginNewTransaction();
            findSelf = findSelfExpectRegistered(agentClusterLinkContext);
        }
        return findSelf;
    }

    private Agent findSelfExpectRegistered(AgentClusterLinkContext agentClusterLinkContext) {
        Agent findSelf = this.agentPersister.findSelf(agentClusterLinkContext.agentRepository());
        if (findSelf == null) {
            throw log.agentRegistrationIneffective(selfReference());
        }
        return findSelf;
    }

    protected abstract WriteAction<R> doPulse(List<Agent> list, Agent agent);

    protected abstract R instructCommitAndRetryPulseAfterDelay(Instant instant, Duration duration);

    public final void leaveCluster(AgentClusterLinkContext agentClusterLinkContext) {
        this.agentPersister.leaveCluster(agentClusterLinkContext.agentRepository());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AgentReference selfReference() {
        return this.agentPersister.selfReference();
    }

    final AgentPersister getAgentPersisterForTests() {
        return this.agentPersister;
    }
}
