package org.apache.hw_v4_0_0.hedwig.server.delivery;

import com.google.protobuf.ByteString;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.hw_v4_0_0.hedwig.client.data.TopicSubscriber;
import org.apache.hw_v4_0_0.hedwig.protocol.PubSubProtocol;
import org.apache.hw_v4_0_0.hedwig.server.common.ServerConfiguration;
import org.apache.hw_v4_0_0.hedwig.server.common.UnexpectedError;
import org.apache.hw_v4_0_0.hedwig.server.persistence.Factory;
import org.apache.hw_v4_0_0.hedwig.server.persistence.MapMethods;
import org.apache.hw_v4_0_0.hedwig.server.persistence.PersistenceManager;
import org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback;
import org.apache.hw_v4_0_0.hedwig.server.persistence.ScanRequest;
import org.apache.hw_v4_0_0.hedwig.server.subscriptions.MessageFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager.class */
public class FIFODeliveryManager implements Runnable, DeliveryManager {
    protected static final Logger logger;
    private PersistenceManager persistenceMgr;
    private ServerConfiguration cfg;
    static final /* synthetic */ boolean $assertionsDisabled;
    BlockingQueue<DeliveryManagerRequest> requestQueue = new LinkedBlockingQueue();
    Queue<ActiveSubscriberState> retryQueue = new ConcurrentLinkedQueue();
    protected boolean keepRunning = true;
    Map<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs = new HashMap();
    Map<TopicSubscriber, ActiveSubscriberState> subscriberStates = new HashMap();

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$ActiveSubscriberState.class */
    public class ActiveSubscriberState implements ScanCallback, DeliveryCallback, DeliveryManagerRequest {
        ByteString topic;
        ByteString subscriberId;
        long lastLocalSeqIdDelivered;
        DeliveryEndPoint deliveryEndPoint;
        long localSeqIdDeliveringNow;
        long lastSeqIdCommunicatedExternally;
        MessageFilter filter;
        boolean isHubSubscriber;
        static final int SEQ_ID_SLACK = 10;
        boolean connected = true;
        long lastScanErrorTime = -1;

        public ActiveSubscriberState(ByteString byteString, ByteString byteString2, long j, DeliveryEndPoint deliveryEndPoint, MessageFilter messageFilter, boolean z) {
            this.topic = byteString;
            this.subscriberId = byteString2;
            this.lastLocalSeqIdDelivered = j;
            this.deliveryEndPoint = deliveryEndPoint;
            this.filter = messageFilter;
            this.isHubSubscriber = z;
        }

        public void setNotConnected() {
            this.connected = false;
            this.deliveryEndPoint.close();
        }

        public ByteString getTopic() {
            return this.topic;
        }

        public long getLastLocalSeqIdDelivered() {
            return this.lastLocalSeqIdDelivered;
        }

        public long getLastScanErrorTime() {
            return this.lastScanErrorTime;
        }

        public void setLastScanErrorTime(long j) {
            this.lastScanErrorTime = j;
        }

        protected boolean isConnected() {
            return this.connected;
        }

        public void deliverNextMessage() {
            if (isConnected()) {
                this.localSeqIdDeliveringNow = FIFODeliveryManager.this.persistenceMgr.getSeqIdAfterSkipping(this.topic, this.lastLocalSeqIdDelivered, 1);
                FIFODeliveryManager.this.persistenceMgr.scanSingleMessage(new ScanRequest(this.topic, this.localSeqIdDeliveringNow, this, null));
            }
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback
        public void messageScanned(Object obj, PubSubProtocol.Message message) {
            if (this.connected) {
                if (this.isHubSubscriber && !message.getSrcRegion().equals(FIFODeliveryManager.this.cfg.getMyRegionByteString())) {
                    sendingFinished();
                } else {
                    this.deliveryEndPoint.send(PubSubProtocol.PubSubResponse.newBuilder().setProtocolVersion(PubSubProtocol.ProtocolVersion.VERSION_ONE).setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(0L).setMessage(message).build(), this);
                }
            }
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback
        public void scanFailed(Object obj, Exception exc) {
            if (this.connected) {
                FIFODeliveryManager.this.retryErroredSubscriberAfterDelay(this);
            }
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.ScanCallback
        public void scanFinished(Object obj, ScanCallback.ReasonForFinish reasonForFinish) {
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryCallback
        public void sendingFinished() {
            if (this.connected) {
                this.lastLocalSeqIdDelivered = this.localSeqIdDeliveringNow;
                if (this.lastLocalSeqIdDelivered > this.lastSeqIdCommunicatedExternally + 10) {
                    long j = this.lastSeqIdCommunicatedExternally;
                    this.lastSeqIdCommunicatedExternally = this.lastLocalSeqIdDelivered;
                    FIFODeliveryManager.this.moveDeliveryPtrForward(this, j, this.lastLocalSeqIdDelivered);
                }
                deliverNextMessage();
            }
        }

        public long getLastSeqIdCommunicatedExternally() {
            return this.lastSeqIdCommunicatedExternally;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryCallback
        public void permanentErrorOnSend() {
            FIFODeliveryManager.this.stopServingSubscriber(this);
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryCallback
        public void transientErrorOnSend() {
            FIFODeliveryManager.this.retryErroredSubscriberAfterDelay(this);
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            ActiveSubscriberState put = FIFODeliveryManager.this.subscriberStates.put(new TopicSubscriber(this.topic, this.subscriberId), this);
            if (put != null) {
                FIFODeliveryManager.this.stopServingSubscriber(put);
            }
            this.lastSeqIdCommunicatedExternally = this.lastLocalSeqIdDelivered;
            FIFODeliveryManager.this.addDeliveryPtr(this, Long.valueOf(this.lastLocalSeqIdDelivered));
            deliverNextMessage();
        }

        public String toString() {
            return "Topic: " + this.topic.toStringUtf8() + "DeliveryPtr: " + this.lastLocalSeqIdDelivered;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$DeliveryManagerRequest.class */
    public interface DeliveryManagerRequest {
        void performRequest();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$DeliveryPtrMove.class */
    public class DeliveryPtrMove implements DeliveryManagerRequest {
        ActiveSubscriberState subscriber;
        Long oldSeqId;
        Long newSeqId;

        public DeliveryPtrMove(ActiveSubscriberState activeSubscriberState, Long l, Long l2) {
            this.subscriber = activeSubscriberState;
            this.oldSeqId = l;
            this.newSeqId = l2;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            ByteString topic = this.subscriber.getTopic();
            long minimumSeqId = FIFODeliveryManager.this.getMinimumSeqId(topic);
            if (this.subscriber.isConnected()) {
                FIFODeliveryManager.this.removeDeliveryPtr(this.subscriber, this.oldSeqId, false, false);
                FIFODeliveryManager.this.addDeliveryPtr(this.subscriber, this.newSeqId);
            } else {
                FIFODeliveryManager.this.removeDeliveryPtr(this.subscriber, this.oldSeqId, true, true);
            }
            long minimumSeqId2 = FIFODeliveryManager.this.getMinimumSeqId(topic);
            if (minimumSeqId2 > minimumSeqId) {
                FIFODeliveryManager.this.persistenceMgr.deliveredUntil(topic, Long.valueOf(minimumSeqId2));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$HashMapSubscriberFactory.class */
    public static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> {
        static HashMapSubscriberFactory instance = new HashMapSubscriberFactory();

        protected HashMapSubscriberFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.Factory
        public Set<ActiveSubscriberState> newInstance() {
            return new HashSet();
        }
    }

    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$ShutdownDeliveryManagerRequest.class */
    protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest {
        protected ShutdownDeliveryManagerRequest() {
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            FIFODeliveryManager.this.keepRunning = false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$StopServingSubscriber.class */
    public class StopServingSubscriber implements DeliveryManagerRequest {
        ActiveSubscriberState subscriber;

        public StopServingSubscriber(ActiveSubscriberState activeSubscriberState) {
            this.subscriber = activeSubscriberState;
        }

        @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.FIFODeliveryManager.DeliveryManagerRequest
        public void performRequest() {
            this.subscriber.setNotConnected();
            FIFODeliveryManager.this.removeDeliveryPtr(this.subscriber, Long.valueOf(this.subscriber.getLastSeqIdCommunicatedExternally()), true, true);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/hw_v4_0_0/hedwig/server/delivery/FIFODeliveryManager$TreeMapLongToSetSubscriberFactory.class */
    public static class TreeMapLongToSetSubscriberFactory implements Factory<SortedMap<Long, Set<ActiveSubscriberState>>> {
        static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory();

        protected TreeMapLongToSetSubscriberFactory() {
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.hw_v4_0_0.hedwig.server.persistence.Factory
        public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() {
            return new TreeMap();
        }
    }

    public FIFODeliveryManager(PersistenceManager persistenceManager, ServerConfiguration serverConfiguration) {
        this.persistenceMgr = persistenceManager;
        new Thread(this, "DeliveryManagerThread").start();
        this.cfg = serverConfiguration;
    }

    protected void enqueueWithoutFailure(DeliveryManagerRequest deliveryManagerRequest) {
        if (!this.requestQueue.offer(deliveryManagerRequest)) {
            throw new UnexpectedError("Could not enqueue object: " + deliveryManagerRequest + " to delivery manager request queue.");
        }
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryManager
    public void startServingSubscription(ByteString byteString, ByteString byteString2, PubSubProtocol.MessageSeqId messageSeqId, DeliveryEndPoint deliveryEndPoint, MessageFilter messageFilter, boolean z) {
        enqueueWithoutFailure(new ActiveSubscriberState(byteString, byteString2, messageSeqId.getLocalComponent() - 1, deliveryEndPoint, messageFilter, z));
    }

    @Override // org.apache.hw_v4_0_0.hedwig.server.delivery.DeliveryManager
    public void stopServingSubscriber(ByteString byteString, ByteString byteString2) {
        ActiveSubscriberState activeSubscriberState = this.subscriberStates.get(new TopicSubscriber(byteString, byteString2));
        if (activeSubscriberState != null) {
            stopServingSubscriber(activeSubscriberState);
        }
    }

    protected void stopServingSubscriber(ActiveSubscriberState activeSubscriberState) {
        enqueueWithoutFailure(new StopServingSubscriber(activeSubscriberState));
    }

    public void retryErroredSubscriberAfterDelay(ActiveSubscriberState activeSubscriberState) {
        activeSubscriberState.setLastScanErrorTime(System.currentTimeMillis());
        if (!this.retryQueue.offer(activeSubscriberState)) {
            throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
        }
    }

    public void moveDeliveryPtrForward(ActiveSubscriberState activeSubscriberState, long j, long j2) {
        enqueueWithoutFailure(new DeliveryPtrMove(activeSubscriberState, Long.valueOf(j), Long.valueOf(j2)));
    }

    @Override // java.lang.Runnable
    public void run() {
        while (this.keepRunning) {
            DeliveryManagerRequest deliveryManagerRequest = null;
            try {
                deliveryManagerRequest = this.requestQueue.poll(1L, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            retryErroredSubscribers();
            if (deliveryManagerRequest != null) {
                deliveryManagerRequest.performRequest();
            }
        }
    }

    public void stop() {
        enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
    }

    protected void retryErroredSubscribers() {
        long currentTimeMillis = System.currentTimeMillis() - this.cfg.getScanBackoffPeriodMs();
        while (true) {
            ActiveSubscriberState peek = this.retryQueue.peek();
            if (peek == null || peek.getLastScanErrorTime() > currentTimeMillis) {
                return;
            }
            peek.deliverNextMessage();
            this.retryQueue.poll();
        }
    }

    protected void removeDeliveryPtr(ActiveSubscriberState activeSubscriberState, Long l, boolean z, boolean z2) {
        if (!$assertionsDisabled && l == null) {
            throw new AssertionError();
        }
        ByteString topic = activeSubscriberState.getTopic();
        SortedMap<Long, Set<ActiveSubscriberState>> sortedMap = this.perTopicDeliveryPtrs.get(topic);
        if (sortedMap == null && !z) {
            throw new UnexpectedError("No delivery pointers found while disconnecting channel for topic:" + topic);
        }
        if (null == sortedMap) {
            return;
        }
        if (!MapMethods.removeFromMultiMap(sortedMap, l, activeSubscriberState) && !z) {
            throw new UnexpectedError("Could not find subscriber:" + activeSubscriberState + " at the expected delivery pointer");
        }
        if (z2 && sortedMap.isEmpty()) {
            this.perTopicDeliveryPtrs.remove(topic);
        }
    }

    protected long getMinimumSeqId(ByteString byteString) {
        SortedMap<Long, Set<ActiveSubscriberState>> sortedMap = this.perTopicDeliveryPtrs.get(byteString);
        if (sortedMap == null || sortedMap.isEmpty()) {
            return 9223372036854775806L;
        }
        return sortedMap.firstKey().longValue();
    }

    protected void addDeliveryPtr(ActiveSubscriberState activeSubscriberState, Long l) {
        MapMethods.addToMultiMap((SortedMap) MapMethods.getAfterInsertingIfAbsent(this.perTopicDeliveryPtrs, activeSubscriberState.getTopic(), TreeMapLongToSetSubscriberFactory.instance), l, activeSubscriberState, HashMapSubscriberFactory.instance);
    }

    static {
        $assertionsDisabled = !FIFODeliveryManager.class.desiredAssertionStatus();
        logger = LoggerFactory.getLogger(FIFODeliveryManager.class);
    }
}
