package io.zeebe.client.task.impl.subscription;

import io.zeebe.client.event.impl.GeneralEventImpl;
import io.zeebe.client.impl.Loggers;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.CheckedConsumer;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.state.WaitState;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.agrona.concurrent.ManyToManyConcurrentArrayQueue;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriber.class */
public abstract class EventSubscriber {
    protected static final Logger LOGGER = Loggers.SUBSCRIPTION_LOGGER;
    protected static final String LOG_MESSAGE_PREFIX = "Subscriber {}: ";
    protected static final double REPLENISHMENT_THRESHOLD = 0.3d;
    protected static final int TRANSITION_DEFAULT = 0;
    protected static final int TRANSITION_REOPEN = 2;
    protected static final int TRANSITION_ABORT = 3;
    protected static final int TRANSITION_CLOSE = 4;
    protected long subscriberKey;
    protected final ManyToManyConcurrentArrayQueue<GeneralEventImpl> pendingEvents;
    protected final int capacity;
    protected final EventAcquisition acquisition;
    protected RemoteAddress eventSource;
    protected int partitionId;
    protected final OpeningState openingState = new OpeningState();
    protected final OpenState openState = new OpenState();
    protected final ClosingState closingState = new ClosingState();
    protected final ClosedState closedState = new ClosedState();
    private final StateMachine<SimpleStateMachineContext> stateMachine = StateMachine.builder(stateMachine -> {
        return new SimpleStateMachineContext(stateMachine);
    }).initialState(this.openingState).from(this.openingState).take(0).to(this.openState).from(this.openingState).take(3).to(this.closedState).from(this.openState).take(2).to(this.openingState).from(this.openState).take(3).to(this.closedState).from(this.openState).take(4).to(this.closingState).from(this.closingState).take(0).to(this.closedState).from(this.closingState).take(4).to(this.closedState).from(this.closedState).take(4).to(this.closedState).build();
    private StateMachineAgent<SimpleStateMachineContext> stateMachineAgent = new StateMachineAgent<>(this.stateMachine);
    protected AtomicBoolean isCloseIssued = new AtomicBoolean(false);
    protected final AtomicInteger eventsInProcessing = new AtomicInteger(0);
    protected final AtomicInteger eventsProcessedSinceLastReplenishment = new AtomicInteger(0);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriber$ClosedState.class */
    public class ClosedState implements WaitState<SimpleStateMachineContext> {
        ClosedState() {
        }

        @Override // io.zeebe.util.state.State
        public void onEnter(SimpleStateMachineContext simpleStateMachineContext) {
            EventSubscriber.this.acquisition.deactivateSubscriber(EventSubscriber.this);
        }

        @Override // io.zeebe.util.state.WaitState
        public void work(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriber$ClosingState.class */
    class ClosingState implements State<SimpleStateMachineContext> {
        ClosingState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            if (EventSubscriber.this.hasEventsInProcessing()) {
                return 0;
            }
            try {
                EventSubscriber.this.requestSubscriptionClose();
            } catch (Exception e) {
                EventSubscriber.LOGGER.warn("Subscriber {}: Exception when closing subscription", EventSubscriber.this, e);
            }
            simpleStateMachineContext.take(0);
            return 1;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriber$OpenState.class */
    public class OpenState implements State<SimpleStateMachineContext> {
        OpenState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            return EventSubscriber.this.replenishEventSource() ? 1 : 0;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriber$OpeningState.class */
    class OpeningState implements State<SimpleStateMachineContext> {
        protected Future<? extends EventSubscriptionCreationResult> subscriptionFuture;

        OpeningState() {
        }

        @Override // io.zeebe.util.state.State
        public boolean isInterruptable() {
            return false;
        }

        @Override // io.zeebe.util.state.State
        public void onExit() {
            this.subscriptionFuture = null;
        }

        @Override // io.zeebe.util.state.State
        public int doWork(SimpleStateMachineContext simpleStateMachineContext) throws Exception {
            if (this.subscriptionFuture == null) {
                EventSubscriber.LOGGER.debug("Subscriber {}: Opening", EventSubscriber.this);
                this.subscriptionFuture = EventSubscriber.this.requestNewSubscription();
                return 1;
            }
            if (!this.subscriptionFuture.isDone()) {
                return 0;
            }
            try {
                EventSubscriptionCreationResult eventSubscriptionCreationResult = this.subscriptionFuture.get();
                EventSubscriber.LOGGER.debug("Subscriber {} opened", EventSubscriber.this);
                EventSubscriber.this.subscriberKey = eventSubscriptionCreationResult.getSubscriberKey();
                EventSubscriber.this.partitionId = eventSubscriptionCreationResult.getPartitionId();
                EventSubscriber.this.eventSource = eventSubscriptionCreationResult.getEventPublisher();
                EventSubscriber.this.resetProcessingState();
                EventSubscriber.this.acquisition.activateSubscriber(EventSubscriber.this);
                simpleStateMachineContext.take(0);
                return 1;
            } catch (Exception e) {
                EventSubscriber.LOGGER.error("Subscriber {}; Could not open subscriber remotely. Aborting", EventSubscriber.this, e);
                simpleStateMachineContext.take(3);
                return 1;
            }
        }
    }

    public EventSubscriber(int i, int i2, EventAcquisition eventAcquisition) {
        this.pendingEvents = new ManyToManyConcurrentArrayQueue<>(i2);
        this.capacity = i2;
        this.acquisition = eventAcquisition;
        this.partitionId = i;
    }

    public int maintainState() {
        return this.stateMachineAgent.doWork();
    }

    public RemoteAddress getEventSource() {
        return this.eventSource;
    }

    public void closeAsync() {
        this.isCloseIssued.set(true);
        if (isClosed()) {
            return;
        }
        this.stateMachineAgent.addCommand(simpleStateMachineContext -> {
            simpleStateMachineContext.tryTake(4);
        });
    }

    public void reopenAsync() {
        this.stateMachineAgent.addCommand(simpleStateMachineContext -> {
            simpleStateMachineContext.tryTake(2);
        });
    }

    public boolean isOpen() {
        return this.stateMachine.isInState(this.openState);
    }

    public boolean isOpening() {
        return this.stateMachine.isInState(this.openingState);
    }

    public boolean isClosed() {
        return this.stateMachine.isInState(this.closedState);
    }

    public int size() {
        return this.pendingEvents.size();
    }

    protected boolean replenishEventSource() {
        int i = this.eventsProcessedSinceLastReplenishment.get();
        boolean z = ((double) (this.capacity - i)) < ((double) this.capacity) * REPLENISHMENT_THRESHOLD;
        if (z) {
            requestEventSourceReplenishment(i);
            this.eventsProcessedSinceLastReplenishment.addAndGet(-i);
        }
        return z;
    }

    public long getSubscriberKey() {
        return this.subscriberKey;
    }

    protected abstract void requestEventSourceReplenishment(int i);

    public boolean addEvent(GeneralEventImpl generalEventImpl) {
        boolean offer = this.pendingEvents.offer(generalEventImpl);
        if (!offer) {
            LOGGER.warn("Subscriber {}: Cannot add any more events. Event queue saturated. Postponing event {}.", this, generalEventImpl);
        }
        return offer;
    }

    protected void resetProcessingState() {
        this.pendingEvents.clear();
        this.eventsInProcessing.set(0);
        this.eventsProcessedSinceLastReplenishment.set(0);
    }

    protected boolean hasEventsInProcessing() {
        return this.eventsInProcessing.get() > 0;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        GeneralEventImpl poll;
        int size = size();
        int i = 0;
        while (true) {
            if (i >= size || !isOpen() || this.isCloseIssued.get() || (poll = this.pendingEvents.poll()) == null) {
                break;
            }
            this.eventsInProcessing.incrementAndGet();
            try {
                if (!isOpen()) {
                    break;
                }
                i++;
                logHandling(poll);
                try {
                    checkedConsumer.accept(poll);
                } catch (Exception e) {
                    onUnhandledEventHandlingException(poll, e);
                }
                this.eventsInProcessing.decrementAndGet();
                this.eventsProcessedSinceLastReplenishment.incrementAndGet();
            } finally {
                this.eventsInProcessing.decrementAndGet();
                this.eventsProcessedSinceLastReplenishment.incrementAndGet();
            }
        }
        return i;
    }

    protected void logHandling(GeneralEventImpl generalEventImpl) {
        try {
            LOGGER.trace("Subscriber {}: Handling event {}", this, generalEventImpl);
        } catch (Exception e) {
            LOGGER.warn("Could not construct or write log message", (Throwable) e);
        }
    }

    protected void onUnhandledEventHandlingException(GeneralEventImpl generalEventImpl, Exception exc) {
        throw new RuntimeException("Exception during handling of event " + generalEventImpl.getMetadata().getKey(), exc);
    }

    public abstract String getTopicName();

    public int getPartitionId() {
        return this.partitionId;
    }

    protected abstract Future<? extends EventSubscriptionCreationResult> requestNewSubscription();

    protected abstract void requestSubscriptionClose();
}
