package io.zeebe.client.task.impl.subscription;

import io.zeebe.client.ZeebeClient;
import io.zeebe.client.cmd.ClientException;
import io.zeebe.client.event.impl.GeneralEventImpl;
import io.zeebe.client.impl.Loggers;
import io.zeebe.client.task.impl.subscription.EventSubscriber;
import io.zeebe.client.topic.Partition;
import io.zeebe.client.topic.Topic;
import io.zeebe.client.topic.Topics;
import io.zeebe.transport.RemoteAddress;
import io.zeebe.util.CheckedConsumer;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.state.WaitState;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.slf4j.Logger;

/* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup.class */
public abstract class EventSubscriberGroup<T extends EventSubscriber> {
    protected static final Logger LOG = Loggers.SUBSCRIPTION_LOGGER;
    protected static final String LOG_MESSAGE_PREFIX = "Subscriber Group {}: ";
    protected List<Partition> partitions;
    protected CompletableFuture<EventSubscriberGroup<T>> openFuture;
    protected CompletableFuture<EventSubscriberGroup<T>> closeFuture;
    protected static final int TRANSITION_DEFAULT = 0;
    protected static final int TRANSITION_OPEN = 1;
    protected static final int TRANSITION_CLOSE = 2;
    protected final EventAcquisition acquisition;
    protected final ZeebeClient client;
    protected final String topic;
    protected List<T> subscribers = new ArrayList();
    protected final EventSubscriberGroup<T>.InitState initState = new InitState();
    protected final EventSubscriberGroup<T>.DeterminePartitionsState determinePartitionsState = new DeterminePartitionsState();
    protected final EventSubscriberGroup<T>.InitiateSubscribersState initSubscribers = new InitiateSubscribersState();
    protected final EventSubscriberGroup<T>.OpeningState openingState = new OpeningState();
    protected final EventSubscriberGroup<T>.OpenedState openedState = new OpenedState();
    protected final EventSubscriberGroup<T>.InitiateCloseState initCloseState = new InitiateCloseState();
    protected final EventSubscriberGroup<T>.ClosingState closingState = new ClosingState();
    protected final EventSubscriberGroup<T>.ClosedState closedState = new ClosedState();
    private final StateMachine<GroupContext> stateMachine = StateMachine.builder(stateMachine -> {
        return new GroupContext(stateMachine);
    }).initialState(this.initState).from(this.initState).take(1).to(this.determinePartitionsState).from(this.determinePartitionsState).take(0).to(this.initSubscribers).from(this.determinePartitionsState).take(2).to(this.closedState).from(this.initSubscribers).take(0).to(this.openingState).from(this.openingState).take(0).to(this.openedState).from(this.openingState).take(2).to(this.initCloseState).from(this.openedState).take(2).to(this.initCloseState).from(this.initCloseState).take(0).to(this.closingState).from(this.initCloseState).take(2).to(this.initCloseState).from(this.closingState).take(0).to(this.closedState).from(this.closingState).take(2).to(this.closingState).from(this.closedState).take(2).to(this.closedState).build();
    private StateMachineAgent<GroupContext> stateMachineAgent = new StateMachineAgent<>(this.stateMachine);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$ClosedState.class */
    public class ClosedState implements WaitState<GroupContext> {
        ClosedState() {
        }

        @Override // io.zeebe.util.state.WaitState
        public void work(GroupContext groupContext) throws Exception {
            if (EventSubscriberGroup.this.openFuture != null) {
                EventSubscriberGroup.this.openFuture.completeExceptionally(new ClientException(String.format("Could not create subscriber group %s: %s", EventSubscriberGroup.this.describeGroupSpec(), groupContext.getCloseReason())));
                EventSubscriberGroup.this.openFuture = null;
            }
            if (EventSubscriberGroup.this.closeFuture != null) {
                EventSubscriberGroup.this.closeFuture.complete(EventSubscriberGroup.this);
                EventSubscriberGroup.this.closeFuture = null;
            }
            EventSubscriberGroup.this.acquisition.stopManageGroup(EventSubscriberGroup.this);
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$ClosingState.class */
    class ClosingState implements State<GroupContext> {
        ClosingState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(GroupContext groupContext) throws Exception {
            boolean z = true;
            Iterator<T> it = EventSubscriberGroup.this.subscribers.iterator();
            while (it.hasNext()) {
                z &= it.next().isClosed();
            }
            if (!z) {
                return 0;
            }
            groupContext.take(0);
            return 1;
        }

        @Override // io.zeebe.util.state.State
        public void onExit() {
            Iterator<T> it = EventSubscriberGroup.this.subscribers.iterator();
            while (it.hasNext()) {
                EventSubscriberGroup.this.acquisition.removeSubscriber(it.next());
            }
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$DeterminePartitionsState.class */
    class DeterminePartitionsState implements State<GroupContext> {
        protected Future<Topics> topicsFuture;

        DeterminePartitionsState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(GroupContext groupContext) throws Exception {
            if (this.topicsFuture != null) {
                return determinePartitions(groupContext);
            }
            requestTopics(groupContext);
            return 1;
        }

        @Override // io.zeebe.util.state.State
        public void onExit() {
            this.topicsFuture = null;
        }

        private void requestTopics(GroupContext groupContext) {
            Loggers.SUBSCRIPTION_LOGGER.debug("Subscriber Group {}: Determining partitions of topic", EventSubscriberGroup.this);
            this.topicsFuture = EventSubscriberGroup.this.client.topics().getTopics().executeAsync();
        }

        private int determinePartitions(GroupContext groupContext) {
            if (!this.topicsFuture.isDone()) {
                return 0;
            }
            try {
                Optional<Topic> findFirst = this.topicsFuture.get().getTopics().stream().filter(topic -> {
                    return EventSubscriberGroup.this.topic.equals(topic.getName());
                }).findFirst();
                if (findFirst.isPresent()) {
                    EventSubscriberGroup.this.partitions = findFirst.get().getPartitions();
                    groupContext.take(0);
                    return 1;
                }
                groupContext.setCloseReason(String.format("Topic %s is not known", EventSubscriberGroup.this.topic));
                groupContext.take(2);
                return 1;
            } catch (Exception e) {
                EventSubscriberGroup.LOG.error("Subscriber Group {}: Could not fetch topics", EventSubscriberGroup.this, e);
                groupContext.setCloseReason("Could not fetch topics");
                groupContext.take(2);
                return 1;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$GroupContext.class */
    public static class GroupContext extends SimpleStateMachineContext {
        private String closeReason;

        public GroupContext(StateMachine<?> stateMachine) {
            super(stateMachine);
        }

        public void setCloseReason(String str) {
            this.closeReason = str;
        }

        public String getCloseReason() {
            return this.closeReason;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$InitState.class */
    class InitState implements WaitState<GroupContext> {
        InitState() {
        }

        @Override // io.zeebe.util.state.WaitState
        public void work(GroupContext groupContext) throws Exception {
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$InitiateCloseState.class */
    class InitiateCloseState implements State<GroupContext> {
        InitiateCloseState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(GroupContext groupContext) throws Exception {
            Iterator<T> it = EventSubscriberGroup.this.subscribers.iterator();
            while (it.hasNext()) {
                it.next().closeAsync();
            }
            groupContext.take(0);
            return 1;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$InitiateSubscribersState.class */
    class InitiateSubscribersState implements State<GroupContext> {
        InitiateSubscribersState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(GroupContext groupContext) throws Exception {
            EventSubscriberGroup.LOG.debug("Subscriber Group {}: Subscribing to partitions {}", EventSubscriberGroup.this, EventSubscriberGroup.this.partitions);
            Iterator<Partition> it = EventSubscriberGroup.this.partitions.iterator();
            while (it.hasNext()) {
                EventSubscriber buildSubscriber = EventSubscriberGroup.this.buildSubscriber(it.next().getId());
                EventSubscriberGroup.this.subscribers.add(buildSubscriber);
                EventSubscriberGroup.this.acquisition.addSubscriber(buildSubscriber);
            }
            groupContext.take(0);
            return 1;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$OpenedState.class */
    class OpenedState implements State<GroupContext> {
        OpenedState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(GroupContext groupContext) throws Exception {
            if (EventSubscriberGroup.this.openFuture != null) {
                EventSubscriberGroup.this.openFuture.complete(EventSubscriberGroup.this);
                EventSubscriberGroup.this.openFuture = null;
            }
            boolean z = false;
            Iterator<T> it = EventSubscriberGroup.this.subscribers.iterator();
            while (it.hasNext()) {
                z |= it.next().isClosed();
            }
            if (!z) {
                return 0;
            }
            groupContext.setCloseReason("A subscriber closed unexpectedly.");
            EventSubscriberGroup.LOG.error("Subscriber Group {}: Closing unexpectedly", EventSubscriberGroup.this);
            groupContext.take(2);
            return 1;
        }
    }

    /* loaded from: input_file:io/zeebe/client/task/impl/subscription/EventSubscriberGroup$OpeningState.class */
    class OpeningState implements State<GroupContext> {
        OpeningState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(GroupContext groupContext) throws Exception {
            boolean z = true;
            boolean z2 = false;
            for (T t : EventSubscriberGroup.this.subscribers) {
                z &= t.isOpen();
                z2 |= t.isClosed();
            }
            if (z2) {
                groupContext.setCloseReason("A subscriber closed unexpectedly.");
                EventSubscriberGroup.LOG.error("Subscriber Group {}: Closing unexpectedly", EventSubscriberGroup.this);
                groupContext.take(2);
                return 1;
            }
            if (!z) {
                return 0;
            }
            EventSubscriberGroup.LOG.debug("Subscriber Group {}: All subscribers opened", EventSubscriberGroup.this);
            groupContext.take(0);
            return 1;
        }
    }

    public EventSubscriberGroup(EventAcquisition eventAcquisition, ZeebeClient zeebeClient, String str) {
        this.acquisition = eventAcquisition;
        this.client = zeebeClient;
        this.topic = str;
        eventAcquisition.registerSubscriptionAsync(this);
    }

    protected abstract T buildSubscriber(int i);

    public CompletableFuture<EventSubscriberGroup<T>> closeAsync() {
        CompletableFuture<EventSubscriberGroup<T>> completableFuture = new CompletableFuture<>();
        if (isClosed()) {
            completableFuture.complete(this);
            return completableFuture;
        }
        this.stateMachineAgent.addCommand(groupContext -> {
            if (!groupContext.tryTake(2)) {
                completableFuture.cancel(true);
            } else if (this.closeFuture == null) {
                this.closeFuture = completableFuture;
            } else {
                this.closeFuture.whenComplete((eventSubscriberGroup, th) -> {
                    if (th == null) {
                        completableFuture.complete(eventSubscriberGroup);
                    } else {
                        completableFuture.completeExceptionally(th);
                    }
                });
            }
        });
        return completableFuture;
    }

    public void close() {
        try {
            closeAsync().get();
        } catch (Exception e) {
            throw new ClientException("Exception while closing subscription", e);
        }
    }

    public void open() {
        try {
            openAsync().get();
        } catch (Exception e) {
            throw new ClientException("Could not open subscription: " + e.getMessage(), e);
        }
    }

    public CompletableFuture<EventSubscriberGroup<T>> openAsync() {
        CompletableFuture<EventSubscriberGroup<T>> completableFuture = new CompletableFuture<>();
        this.stateMachineAgent.addCommand(groupContext -> {
            if (groupContext.tryTake(1)) {
                this.openFuture = completableFuture;
            } else {
                completableFuture.cancel(true);
            }
        });
        return completableFuture;
    }

    public boolean isOpen() {
        return this.stateMachine.isInState(this.openedState);
    }

    public boolean isOpening() {
        return this.stateMachine.isInState(this.openingState);
    }

    public boolean isClosed() {
        return this.stateMachine.isInState(this.closedState);
    }

    public int maintainState() {
        int doWork = this.stateMachineAgent.doWork();
        Iterator<T> it = this.subscribers.iterator();
        while (it.hasNext()) {
            doWork += it.next().maintainState();
        }
        return doWork;
    }

    public abstract boolean isManagedGroup();

    public void reopenSubscribersForRemote(RemoteAddress remoteAddress) {
        for (T t : this.subscribers) {
            if (remoteAddress.equals(t.getEventSource())) {
                t.reopenAsync();
            }
        }
    }

    public abstract int poll();

    protected abstract String describeGroupSpec();

    public int pollEvents(CheckedConsumer<GeneralEventImpl> checkedConsumer) {
        int i = 0;
        Iterator<T> it = this.subscribers.iterator();
        while (it.hasNext()) {
            i += it.next().pollEvents(checkedConsumer);
        }
        return i;
    }

    public int size() {
        int i = 0;
        Iterator<T> it = this.subscribers.iterator();
        while (it.hasNext()) {
            i += it.next().size();
        }
        return i;
    }

    public List<T> getSubscribers() {
        CompletableFuture completableFuture = new CompletableFuture();
        this.stateMachineAgent.addCommand(groupContext -> {
            completableFuture.complete(new ArrayList(this.subscribers));
        });
        try {
            return (List) completableFuture.get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public String toString() {
        return describeGroupSpec();
    }
}
