package org.apache.pulsar.broker.service.persistent;

import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.prometheus.client.Gauge;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.PulsarMarkers;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/broker/service/persistent/ReplicatedSubscriptionsController.class */
public class ReplicatedSubscriptionsController implements AutoCloseable, Topic.PublishContext {
    private final PersistentTopic topic;
    private final String localCluster;
    private final ScheduledFuture<?> timer;
    private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots = new ConcurrentHashMap();
    private static final Logger log = LoggerFactory.getLogger(ReplicatedSubscriptionsController.class);
    private static final Gauge pendingSnapshotsMetric = Gauge.build("pulsar_replicated_subscriptions_pending_snapshots", "Counter of currently pending snapshots").register();

    public ReplicatedSubscriptionsController(PersistentTopic persistentTopic, String str) {
        this.topic = persistentTopic;
        this.localCluster = str;
        this.timer = persistentTopic.getBrokerService().pulsar().getExecutor().scheduleAtFixedRate(this::startNewSnapshot, 0L, persistentTopic.getBrokerService().pulsar().getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(), TimeUnit.MILLISECONDS);
    }

    public void receivedReplicatedSubscriptionMarker(Position position, int i, ByteBuf byteBuf) {
        try {
            switch (i) {
                case 10:
                    receivedSnapshotRequest(Markers.parseReplicatedSubscriptionsSnapshotRequest(byteBuf));
                    break;
                case 11:
                    receivedSnapshotResponse(position, Markers.parseReplicatedSubscriptionsSnapshotResponse(byteBuf));
                    break;
                case 13:
                    receiveSubscriptionUpdated(Markers.parseReplicatedSubscriptionsUpdate(byteBuf));
                    break;
            }
        } catch (IOException e) {
            log.warn("[{}] Failed to parse marker: {}", this.topic.getName(), e);
        }
    }

    public void localSubscriptionUpdated(String str, PulsarMarkers.ReplicatedSubscriptionsSnapshot replicatedSubscriptionsSnapshot) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Updating subscription to snapshot {}", new Object[]{this.topic, str, replicatedSubscriptionsSnapshot.getClustersList().stream().map(clusterMessageId -> {
                return String.format("%s -> %d:%d", clusterMessageId.getCluster(), Long.valueOf(clusterMessageId.getMessageId().getLedgerId()), Long.valueOf(clusterMessageId.getMessageId().getEntryId()));
            }).collect(Collectors.toList())});
        }
        TreeMap treeMap = new TreeMap();
        int clustersCount = replicatedSubscriptionsSnapshot.getClustersCount();
        for (int i = 0; i < clustersCount; i++) {
            PulsarMarkers.ClusterMessageId clusters = replicatedSubscriptionsSnapshot.getClusters(i);
            treeMap.put(clusters.getCluster(), clusters.getMessageId());
        }
        this.topic.publishMessage(Markers.newReplicatedSubscriptionsUpdate(str, treeMap), this);
    }

    private void receivedSnapshotRequest(PulsarMarkers.ReplicatedSubscriptionsSnapshotRequest replicatedSubscriptionsSnapshotRequest) {
        PositionImpl positionImpl = (PositionImpl) this.topic.getLastPosition();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received snapshot request. Last msg id: {}", this.topic.getName(), positionImpl);
        }
        this.topic.publishMessage(Markers.newReplicatedSubscriptionsSnapshotResponse(replicatedSubscriptionsSnapshotRequest.getSnapshotId(), replicatedSubscriptionsSnapshotRequest.getSourceCluster(), this.localCluster, positionImpl.getLedgerId(), positionImpl.getEntryId()), this);
    }

    private void receivedSnapshotResponse(Position position, PulsarMarkers.ReplicatedSubscriptionsSnapshotResponse replicatedSubscriptionsSnapshotResponse) {
        String snapshotId = replicatedSubscriptionsSnapshotResponse.getSnapshotId();
        ReplicatedSubscriptionsSnapshotBuilder replicatedSubscriptionsSnapshotBuilder = this.pendingSnapshots.get(snapshotId);
        if (replicatedSubscriptionsSnapshotBuilder != null) {
            replicatedSubscriptionsSnapshotBuilder.receivedSnapshotResponse(position, replicatedSubscriptionsSnapshotResponse);
        } else if (log.isDebugEnabled()) {
            log.debug("[{}] Received late reply for timed-out snapshot {} from {}", new Object[]{this.topic.getName(), snapshotId, replicatedSubscriptionsSnapshotResponse.getCluster().getCluster()});
        }
    }

    private void receiveSubscriptionUpdated(PulsarMarkers.ReplicatedSubscriptionsUpdate replicatedSubscriptionsUpdate) {
        PulsarMarkers.MessageIdData messageIdData = null;
        int clustersCount = replicatedSubscriptionsUpdate.getClustersCount();
        for (int i = 0; i < clustersCount; i++) {
            PulsarMarkers.ClusterMessageId clusters = replicatedSubscriptionsUpdate.getClusters(i);
            if (this.localCluster.equals(clusters.getCluster())) {
                messageIdData = clusters.getMessageId();
            }
        }
        if (messageIdData == null) {
            return;
        }
        PositionImpl positionImpl = new PositionImpl(messageIdData.getLedgerId(), messageIdData.getEntryId());
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received update for subscription to {}", new Object[]{this.topic, replicatedSubscriptionsUpdate.getSubscriptionName(), positionImpl});
        }
        PersistentSubscription subscription = this.topic.getSubscription(replicatedSubscriptionsUpdate.getSubscriptionName());
        if (subscription != null) {
            subscription.acknowledgeMessage(Collections.singletonList(positionImpl), PulsarApi.CommandAck.AckType.Cumulative, Collections.emptyMap());
        } else {
            log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", new Object[]{this.topic, replicatedSubscriptionsUpdate.getSubscriptionName(), Long.valueOf(messageIdData.getLedgerId()), positionImpl});
            this.topic.createSubscription(replicatedSubscriptionsUpdate.getSubscriptionName(), PulsarApi.CommandSubscribe.InitialPosition.Latest, true);
        }
    }

    private void startNewSnapshot() {
        cleanupTimedOutSnapshots();
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        this.topic.getReplicators().forEach((str, replicator) -> {
            if (replicator.isConnected()) {
                return;
            }
            atomicBoolean.set(true);
        });
        if (atomicBoolean.get()) {
            return;
        }
        pendingSnapshotsMetric.inc();
        ReplicatedSubscriptionsSnapshotBuilder replicatedSubscriptionsSnapshotBuilder = new ReplicatedSubscriptionsSnapshotBuilder(this, this.topic.getReplicators().keys(), this.topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
        this.pendingSnapshots.put(replicatedSubscriptionsSnapshotBuilder.getSnapshotId(), replicatedSubscriptionsSnapshotBuilder);
        replicatedSubscriptionsSnapshotBuilder.start();
    }

    private void cleanupTimedOutSnapshots() {
        Iterator<Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder>> it = this.pendingSnapshots.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<String, ReplicatedSubscriptionsSnapshotBuilder> next = it.next();
            if (next.getValue().isTimedOut()) {
                if (log.isDebugEnabled()) {
                    log.debug("[{}] Snapshot creation timed out for {}", this.topic.getName(), next.getKey());
                }
                pendingSnapshotsMetric.dec();
                it.remove();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void snapshotCompleted(String str) {
        this.pendingSnapshots.remove(str);
        pendingSnapshotsMetric.dec();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeMarker(ByteBuf byteBuf) {
        this.topic.publishMessage(byteBuf, this);
    }

    @Override // org.apache.pulsar.broker.service.Topic.PublishContext
    public void completed(Exception exc, long j, long j2) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Published marker at {}:{}. Exception: {}", new Object[]{this.topic.getName(), Long.valueOf(j), Long.valueOf(j2), exc});
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public PersistentTopic topic() {
        return this.topic;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String localCluster() {
        return this.localCluster;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        this.timer.cancel(true);
    }
}
