package org.apache.zeppelin.shaded.io.atomix.protocols.backup.session;

import java.net.ConnectException;
import java.time.Duration;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import org.apache.zeppelin.shaded.com.google.common.base.Preconditions;
import org.apache.zeppelin.shaded.com.google.common.base.Throwables;
import org.apache.zeppelin.shaded.com.google.common.collect.Maps;
import org.apache.zeppelin.shaded.com.google.common.collect.Sets;
import org.apache.zeppelin.shaded.io.atomix.cluster.ClusterMembershipEvent;
import org.apache.zeppelin.shaded.io.atomix.cluster.ClusterMembershipEventListener;
import org.apache.zeppelin.shaded.io.atomix.cluster.ClusterMembershipService;
import org.apache.zeppelin.shaded.io.atomix.primitive.Consistency;
import org.apache.zeppelin.shaded.io.atomix.primitive.PrimitiveException;
import org.apache.zeppelin.shaded.io.atomix.primitive.PrimitiveState;
import org.apache.zeppelin.shaded.io.atomix.primitive.PrimitiveType;
import org.apache.zeppelin.shaded.io.atomix.primitive.Recovery;
import org.apache.zeppelin.shaded.io.atomix.primitive.Replication;
import org.apache.zeppelin.shaded.io.atomix.primitive.event.EventType;
import org.apache.zeppelin.shaded.io.atomix.primitive.event.PrimitiveEvent;
import org.apache.zeppelin.shaded.io.atomix.primitive.operation.PrimitiveOperation;
import org.apache.zeppelin.shaded.io.atomix.primitive.partition.PartitionId;
import org.apache.zeppelin.shaded.io.atomix.primitive.partition.PrimaryElection;
import org.apache.zeppelin.shaded.io.atomix.primitive.partition.PrimaryElectionEventListener;
import org.apache.zeppelin.shaded.io.atomix.primitive.partition.PrimaryTerm;
import org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient;
import org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionId;
import org.apache.zeppelin.shaded.io.atomix.protocols.backup.protocol.CloseRequest;
import org.apache.zeppelin.shaded.io.atomix.protocols.backup.protocol.ExecuteRequest;
import org.apache.zeppelin.shaded.io.atomix.protocols.backup.protocol.PrimaryBackupClientProtocol;
import org.apache.zeppelin.shaded.io.atomix.protocols.backup.protocol.PrimaryBackupResponse;
import org.apache.zeppelin.shaded.io.atomix.protocols.backup.protocol.PrimitiveDescriptor;
import org.apache.zeppelin.shaded.io.atomix.utils.concurrent.ComposableFuture;
import org.apache.zeppelin.shaded.io.atomix.utils.concurrent.ThreadContext;
import org.apache.zeppelin.shaded.io.atomix.utils.logging.ContextualLoggerFactory;
import org.apache.zeppelin.shaded.io.atomix.utils.logging.LoggerContext;
import org.apache.zeppelin.shaded.org.eclipse.aether.artifact.ArtifactProperties;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/zeppelin/shaded/io/atomix/protocols/backup/session/PrimaryBackupSessionClient.class */
public class PrimaryBackupSessionClient implements SessionClient {
    private static final int MAX_ATTEMPTS = 50;
    private static final int RETRY_DELAY = 100;
    private Logger log;
    private final PrimitiveType primitiveType;
    private final PrimitiveDescriptor descriptor;
    private final ClusterMembershipService clusterMembershipService;
    private final PrimaryBackupClientProtocol protocol;
    private final PartitionId partitionId;
    private final SessionId sessionId;
    private final PrimaryElection primaryElection;
    private final ThreadContext threadContext;
    private PrimaryTerm term;
    private final Set<Consumer<PrimitiveState>> stateChangeListeners = Sets.newIdentityHashSet();
    private final Map<EventType, Set<Consumer<PrimitiveEvent>>> eventListeners = Maps.newHashMap();
    private final PrimaryElectionEventListener primaryElectionListener = primaryElectionEvent -> {
        changeReplicas(primaryElectionEvent.term());
    };
    private final ClusterMembershipEventListener membershipEventListener = this::handleClusterEvent;
    private volatile PrimitiveState state = PrimitiveState.CLOSED;

    /* loaded from: input_file:org/apache/zeppelin/shaded/io/atomix/protocols/backup/session/PrimaryBackupSessionClient$Builder.class */
    public static abstract class Builder extends SessionClient.Builder {
        protected Consistency consistency = Consistency.SEQUENTIAL;
        protected Replication replication = Replication.ASYNCHRONOUS;
        protected Recovery recovery = Recovery.RECOVER;
        protected int numBackups = 1;
        protected int maxRetries = 0;
        protected Duration retryDelay = Duration.ofMillis(100);

        public Builder withConsistency(Consistency consistency) {
            this.consistency = (Consistency) Preconditions.checkNotNull(consistency, "consistency cannot be null");
            return this;
        }

        public Builder withReplication(Replication replication) {
            this.replication = (Replication) Preconditions.checkNotNull(replication, "replication cannot be null");
            return this;
        }

        public Builder withRecovery(Recovery recovery) {
            this.recovery = (Recovery) Preconditions.checkNotNull(recovery, "recovery cannot be null");
            return this;
        }

        public Builder withNumBackups(int i) {
            Preconditions.checkArgument(i >= 0, "numBackups must be positive");
            this.numBackups = i;
            return this;
        }

        public Builder withMaxRetries(int i) {
            Preconditions.checkArgument(i >= 0, "maxRetries must be positive");
            this.maxRetries = i;
            return this;
        }

        public Builder withRetryDelayMillis(long j) {
            return withRetryDelay(Duration.ofMillis(j));
        }

        public Builder withRetryDelay(long j, TimeUnit timeUnit) {
            return withRetryDelay(Duration.ofMillis(timeUnit.toMillis(j)));
        }

        public Builder withRetryDelay(Duration duration) {
            this.retryDelay = (Duration) Preconditions.checkNotNull(duration, "retryDelay cannot be null");
            return this;
        }
    }

    public PrimaryBackupSessionClient(String str, PartitionId partitionId, SessionId sessionId, PrimitiveType primitiveType, PrimitiveDescriptor primitiveDescriptor, ClusterMembershipService clusterMembershipService, PrimaryBackupClientProtocol primaryBackupClientProtocol, PrimaryElection primaryElection, ThreadContext threadContext) {
        this.partitionId = (PartitionId) Preconditions.checkNotNull(partitionId);
        this.sessionId = (SessionId) Preconditions.checkNotNull(sessionId);
        this.primitiveType = primitiveType;
        this.descriptor = primitiveDescriptor;
        this.clusterMembershipService = clusterMembershipService;
        this.protocol = primaryBackupClientProtocol;
        this.primaryElection = primaryElection;
        this.threadContext = threadContext;
        clusterMembershipService.addListener(this.membershipEventListener);
        primaryElection.addListener(this.primaryElectionListener);
        this.log = ContextualLoggerFactory.getLogger(getClass(), LoggerContext.builder(SessionClient.class).addValue(str).add(ArtifactProperties.TYPE, primitiveType.name()).add("name", primitiveDescriptor.name()).build2());
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public String name() {
        return this.descriptor.name();
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public PrimitiveType type() {
        return this.primitiveType;
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public ThreadContext context() {
        return this.threadContext;
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public PrimitiveState getState() {
        return this.state;
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public PartitionId partitionId() {
        return this.partitionId;
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public SessionId sessionId() {
        return this.sessionId;
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public void addStateChangeListener(Consumer<PrimitiveState> consumer) {
        this.stateChangeListeners.add(consumer);
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public void removeStateChangeListener(Consumer<PrimitiveState> consumer) {
        this.stateChangeListeners.remove(consumer);
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public CompletableFuture<byte[]> execute(PrimitiveOperation primitiveOperation) {
        ComposableFuture composableFuture = new ComposableFuture();
        this.threadContext.execute(() -> {
            if (this.term.primary() == null) {
                this.primaryElection.getTerm().whenCompleteAsync((primaryTerm, th) -> {
                    if (th != null) {
                        composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else if (primaryTerm.term() <= this.term.term() || primaryTerm.primary() == null) {
                        composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                    } else {
                        this.term = primaryTerm;
                        execute(primitiveOperation, 1, composableFuture);
                    }
                }, (Executor) this.threadContext);
            } else {
                execute(primitiveOperation, 1, composableFuture);
            }
        });
        return composableFuture;
    }

    private void execute(PrimitiveOperation primitiveOperation, int i, ComposableFuture<byte[]> composableFuture) {
        if (i > 50) {
            composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
            return;
        }
        ExecuteRequest request = ExecuteRequest.request(this.descriptor, this.sessionId.id().longValue(), this.clusterMembershipService.getLocalMember().id(), primitiveOperation);
        this.log.trace("Sending {} to {}", request, this.term.primary());
        PrimaryTerm primaryTerm = this.term;
        if (primaryTerm.primary() != null) {
            this.protocol.execute(primaryTerm.primary().memberId(), request).whenCompleteAsync((executeResponse, th) -> {
                if (th == null) {
                    this.log.trace("Received {}", executeResponse);
                    if (executeResponse.status() == PrimaryBackupResponse.Status.OK) {
                        composableFuture.complete(executeResponse.result());
                        return;
                    } else if (this.term.term() > primaryTerm.term()) {
                        execute(primitiveOperation).whenComplete((BiConsumer<? super byte[], ? super Throwable>) composableFuture);
                        return;
                    } else {
                        this.primaryElection.getTerm().whenComplete((primaryTerm2, th) -> {
                            if (th == null) {
                                if (primaryTerm2.term() <= primaryTerm.term() || primaryTerm2.primary() == null) {
                                    this.threadContext.schedule(Duration.ofMillis(100L), () -> {
                                        execute(primitiveOperation, i + 1, composableFuture);
                                    });
                                    return;
                                } else {
                                    execute(primitiveOperation).whenComplete((BiConsumer<? super byte[], ? super Throwable>) composableFuture);
                                    return;
                                }
                            }
                            Throwable rootCause = Throwables.getRootCause(th);
                            if ((rootCause instanceof PrimitiveException.Unavailable) || (rootCause instanceof TimeoutException)) {
                                this.threadContext.schedule(Duration.ofMillis(100L), () -> {
                                    execute(primitiveOperation, i + 1, composableFuture);
                                });
                            } else {
                                composableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                            }
                        });
                        return;
                    }
                }
                if (this.term.term() > primaryTerm.term()) {
                    execute(primitiveOperation).whenComplete((BiConsumer<? super byte[], ? super Throwable>) composableFuture);
                    return;
                }
                Throwable rootCause = Throwables.getRootCause(th);
                if ((rootCause instanceof PrimitiveException.Unavailable) || (rootCause instanceof TimeoutException)) {
                    this.threadContext.schedule(Duration.ofMillis(100L), () -> {
                        execute(primitiveOperation, i + 1, composableFuture);
                    });
                } else {
                    composableFuture.completeExceptionally(th);
                }
            }, (Executor) this.threadContext);
        } else {
            composableFuture.completeExceptionally(new ConnectException());
        }
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public void addEventListener(EventType eventType, Consumer<PrimitiveEvent> consumer) {
        this.eventListeners.computeIfAbsent(eventType.canonicalize(), eventType2 -> {
            return Sets.newLinkedHashSet();
        }).add(consumer);
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public void removeEventListener(EventType eventType, Consumer<PrimitiveEvent> consumer) {
        this.eventListeners.computeIfAbsent(eventType.canonicalize(), eventType2 -> {
            return Sets.newLinkedHashSet();
        }).remove(consumer);
    }

    private void changeReplicas(PrimaryTerm primaryTerm) {
        this.threadContext.execute(() -> {
            if (this.term == null || primaryTerm.term() > this.term.term()) {
                this.term = primaryTerm;
            }
        });
    }

    private void handleClusterEvent(ClusterMembershipEvent clusterMembershipEvent) {
        PrimaryTerm primaryTerm = this.term;
        if (primaryTerm != null && clusterMembershipEvent.type() == ClusterMembershipEvent.Type.MEMBER_REMOVED && clusterMembershipEvent.subject().id().equals(primaryTerm.primary().memberId())) {
            this.threadContext.execute(() -> {
                this.state = PrimitiveState.SUSPENDED;
                this.stateChangeListeners.forEach(consumer -> {
                    consumer.accept(this.state);
                });
            });
        }
    }

    private void handleEvent(PrimitiveEvent primitiveEvent) {
        this.log.trace("Received {}", primitiveEvent);
        Set<Consumer<PrimitiveEvent>> set = this.eventListeners.get(primitiveEvent.type());
        if (set != null) {
            set.forEach(consumer -> {
                consumer.accept(primitiveEvent);
            });
        }
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public CompletableFuture<SessionClient> connect() {
        CompletableFuture<SessionClient> completableFuture = new CompletableFuture<>();
        this.threadContext.execute(() -> {
            connect(1, completableFuture);
        });
        return completableFuture;
    }

    private void connect(int i, CompletableFuture<SessionClient> completableFuture) {
        if (i > 50) {
            completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
        } else {
            this.primaryElection.getTerm().whenCompleteAsync((primaryTerm, th) -> {
                if (th == null) {
                    if (primaryTerm.primary() == null) {
                        completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                        return;
                    }
                    this.term = primaryTerm;
                    this.protocol.registerEventListener(this.sessionId, this::handleEvent, this.threadContext);
                    completableFuture.complete(this);
                    return;
                }
                Throwable rootCause = Throwables.getRootCause(th);
                if ((rootCause instanceof PrimitiveException.Unavailable) || (rootCause instanceof TimeoutException)) {
                    this.threadContext.schedule(Duration.ofMillis(100L), () -> {
                        connect(i + 1, completableFuture);
                    });
                } else {
                    completableFuture.completeExceptionally(new PrimitiveException.Unavailable());
                }
            }, (Executor) this.threadContext);
        }
    }

    @Override // org.apache.zeppelin.shaded.io.atomix.primitive.session.SessionClient
    public CompletableFuture<Void> close() {
        CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        PrimaryTerm primaryTerm = this.term;
        if (primaryTerm.primary() != null) {
            this.protocol.close(primaryTerm.primary().memberId(), new CloseRequest(this.descriptor, this.sessionId.id().longValue())).whenCompleteAsync((closeResponse, th) -> {
                try {
                    this.protocol.unregisterEventListener(this.sessionId);
                    this.clusterMembershipService.removeListener(this.membershipEventListener);
                    this.primaryElection.removeListener(this.primaryElectionListener);
                    completableFuture.complete(null);
                } catch (Throwable th) {
                    completableFuture.complete(null);
                    throw th;
                }
            }, (Executor) this.threadContext);
        } else {
            completableFuture.complete(null);
        }
        return completableFuture;
    }
}
