package org.apache.hw_v4_0_0.hedwig.server.regions;

import com.google.protobuf.ByteString;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.hw_v4_0_0.hedwig.client.api.MessageHandler;
import org.apache.hw_v4_0_0.hedwig.client.netty.HedwigSubscriber;
import org.apache.hw_v4_0_0.hedwig.exceptions.PubSubException;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.common.TopicOpQueuer;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistRequest;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionEventListener;
import org.apache.hw_v4_0_0.hedwig.util.Callback;
import org.apache.hw_v4_0_0.hedwig.util.CallbackUtils;
import org.apache.hw_v4_0_0.hedwig.util.HedwigSocketAddress;
import org.apache.hw_v4_0_0.zookkeeper.ZooKeeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/regions/RegionManager.class */
public class RegionManager implements SubscriptionEventListener {
    protected static final Logger LOGGER = LoggerFactory.getLogger(RegionManager.class);
    private final ByteString mySubId;
    private final PersistenceManager pm;
    private final ArrayList<HedwigHubClient> clients = new ArrayList<>();
    private final TopicOpQueuer queue;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/regions/RegionManager$1.class */
    public class AnonymousClass1 extends TopicOpQueuer.AsynchronousOp<Void> {
        final /* synthetic */ boolean val$synchronous;

        /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
        AnonymousClass1(TopicOpQueuer topicOpQueuer, ByteString byteString, Callback callback, Object obj, boolean z) {
            super(byteString, callback, obj);
            this.val$synchronous = z;
            topicOpQueuer.getClass();
        }

        @Override // java.lang.Runnable
        public void run() {
            final Callback<Void> multiCallback = CallbackUtils.multiCallback(RegionManager.this.clients.size(), this.val$synchronous ? this.cb : CallbackUtils.logger(RegionManager.LOGGER, "all cross-region subscriptions succeeded", "at least one cross-region subscription failed"), this.ctx);
            Iterator it = RegionManager.this.clients.iterator();
            while (it.hasNext()) {
                final HedwigSubscriber subscriber = ((HedwigHubClient) it.next()).getSubscriber();
                subscriber.asyncSubscribe(this.topic, RegionManager.this.mySubId, PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH, new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager.1.1
                    @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                    public void operationFinished(Object obj, Void r9) {
                        if (RegionManager.LOGGER.isDebugEnabled()) {
                            RegionManager.LOGGER.debug("cross-region subscription done for topic " + AnonymousClass1.this.topic.toStringUtf8());
                        }
                        try {
                            subscriber.startDelivery(AnonymousClass1.this.topic, RegionManager.this.mySubId, new MessageHandler() { // from class: org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager.1.1.1
                                @Override // org.apache.hw_v4_0_0.hedwig.client.api.MessageHandler
                                public void deliver(final ByteString byteString, ByteString byteString2, PubSubProtocol.Message message, final Callback<Void> callback, final Object obj2) {
                                    if (message.hasSrcRegion()) {
                                        PubSubProtocol.Message.newBuilder(message).setMsgId(PubSubProtocol.MessageSeqId.newBuilder(message.getMsgId()).addRemoteComponents(PubSubProtocol.RegionSpecificSeqId.newBuilder().setRegion(message.getSrcRegion()).setSeqId(message.getMsgId().getLocalComponent())));
                                    }
                                    RegionManager.this.pm.persistMessage(new PersistRequest(byteString, message, new Callback<Long>() { // from class: org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager.1.1.1.1
                                        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                                        public void operationFinished(Object obj3, Long l) {
                                            if (RegionManager.LOGGER.isDebugEnabled()) {
                                                RegionManager.LOGGER.debug("cross-region recv-fwd succeeded for topic " + byteString.toStringUtf8());
                                            }
                                            callback.operationFinished(obj2, null);
                                        }

                                        @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                                        public void operationFailed(Object obj3, PubSubException pubSubException) {
                                            if (RegionManager.LOGGER.isDebugEnabled()) {
                                                RegionManager.LOGGER.error("cross-region recv-fwd failed for topic " + byteString.toStringUtf8(), pubSubException);
                                            }
                                            callback.operationFailed(obj2, pubSubException);
                                        }
                                    }, null));
                                }
                            });
                            if (RegionManager.LOGGER.isDebugEnabled()) {
                                RegionManager.LOGGER.debug("cross-region start-delivery succeeded for topic " + AnonymousClass1.this.topic.toStringUtf8());
                            }
                            multiCallback.operationFinished(obj, null);
                        } catch (PubSubException e) {
                            if (RegionManager.LOGGER.isDebugEnabled()) {
                                RegionManager.LOGGER.error("cross-region start-delivery failed for topic " + AnonymousClass1.this.topic.toStringUtf8(), e);
                            }
                            multiCallback.operationFailed(obj, e);
                        }
                    }

                    @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
                    public void operationFailed(Object obj, PubSubException pubSubException) {
                        if (RegionManager.LOGGER.isDebugEnabled()) {
                            RegionManager.LOGGER.error("cross-region subscribe failed for topic " + AnonymousClass1.this.topic.toStringUtf8(), pubSubException);
                        }
                        multiCallback.operationFailed(obj, pubSubException);
                    }
                }, null);
            }
            if (this.val$synchronous) {
                return;
            }
            this.cb.operationFinished(null, null);
        }
    }

    public RegionManager(PersistenceManager persistenceManager, ServerConfiguration serverConfiguration, ZooKeeper zooKeeper, ScheduledExecutorService scheduledExecutorService, HedwigHubClientFactory hedwigHubClientFactory) {
        this.pm = persistenceManager;
        this.mySubId = ByteString.copyFromUtf8(SubscriptionStateUtils.HUB_SUBSCRIBER_PREFIX + serverConfiguration.getMyRegion());
        this.queue = new TopicOpQueuer(scheduledExecutorService);
        Iterator<String> it = serverConfiguration.getRegions().iterator();
        while (it.hasNext()) {
            this.clients.add(hedwigHubClientFactory.create(new HedwigSocketAddress(it.next())));
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionEventListener
    public void onFirstLocalSubscribe(ByteString byteString, boolean z, Callback<Void> callback) {
        TopicOpQueuer topicOpQueuer = this.queue;
        TopicOpQueuer topicOpQueuer2 = this.queue;
        topicOpQueuer2.getClass();
        topicOpQueuer.pushAndMaybeRun(byteString, new AnonymousClass1(topicOpQueuer2, byteString, callback, null, z));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.subscriptions.SubscriptionEventListener
    public void onLastLocalUnsubscribe(final ByteString byteString) {
        TopicOpQueuer topicOpQueuer = this.queue;
        TopicOpQueuer topicOpQueuer2 = this.queue;
        topicOpQueuer2.getClass();
        topicOpQueuer.pushAndMaybeRun(byteString, new TopicOpQueuer.AsynchronousOp<Void>(topicOpQueuer2, byteString, new Callback<Void>() { // from class: org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager.2
            @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
            public void operationFinished(Object obj, Void r6) {
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.debug("cross-region unsubscribes succeeded for topic " + byteString.toStringUtf8());
                }
            }

            @Override // org.apache.hw_v4_0_0.hedwig.util.Callback
            public void operationFailed(Object obj, PubSubException pubSubException) {
                if (RegionManager.LOGGER.isDebugEnabled()) {
                    RegionManager.LOGGER.error("cross-region unsubscribes failed for topic " + byteString.toStringUtf8(), pubSubException);
                }
            }
        }, null) { // from class: org.apache.hw_v4_0_0.hedwig.server.regions.RegionManager.3
            /* JADX WARN: 'super' call moved to the top of the method (can break code semantics) */
            {
                super(byteString, r10, r11);
                topicOpQueuer2.getClass();
            }

            @Override // java.lang.Runnable
            public void run() {
                Callback<Void> multiCallback = CallbackUtils.multiCallback(RegionManager.this.clients.size(), this.cb, this.ctx);
                Iterator it = RegionManager.this.clients.iterator();
                while (it.hasNext()) {
                    ((HedwigHubClient) it.next()).getSubscriber().asyncUnsubscribe(this.topic, RegionManager.this.mySubId, multiCallback, null);
                }
            }
        });
    }

    public void stop() {
        Iterator<HedwigHubClient> it = this.clients.iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }
}
