/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.io.IOException;
import java.time.Clock;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsSnapshotBuilder;
import org.apache.pulsar.shade.io.netty.buffer.ByteBuf;
import org.apache.pulsar.shade.io.prometheus.client.Gauge;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ClusterMessageId;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshot;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotRequest;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsUpdate;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.Runnables;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicatedSubscriptionsController
implements AutoCloseable,
Topic.PublishContext {
    private static final Logger log = LoggerFactory.getLogger(ReplicatedSubscriptionsController.class);
    private final PersistentTopic topic;
    private final String localCluster;
    private long lastCompletedSnapshotStartTime = 0L;
    private String lastCompletedSnapshotId;
    private volatile Position positionOfLastLocalMarker;
    private final ScheduledFuture<?> timer;
    private final ConcurrentMap<String, ReplicatedSubscriptionsSnapshotBuilder> pendingSnapshots = new ConcurrentHashMap<String, ReplicatedSubscriptionsSnapshotBuilder>();
    private static final Gauge pendingSnapshotsMetric = (Gauge)Gauge.build("pulsar_replicated_subscriptions_pending_snapshots", "Counter of currently pending snapshots").register();

    public ReplicatedSubscriptionsController(PersistentTopic topic, String localCluster) {
        this.topic = topic;
        this.localCluster = localCluster;
        this.timer = topic.getBrokerService().pulsar().getExecutor().scheduleAtFixedRate(Runnables.catchingAndLoggingThrowables(this::startNewSnapshot), 0L, topic.getBrokerService().pulsar().getConfiguration().getReplicatedSubscriptionsSnapshotFrequencyMillis(), TimeUnit.MILLISECONDS);
    }

    public void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
        Object m = null;
        try {
            switch (markerType) {
                case 10: {
                    this.receivedSnapshotRequest(Markers.parseReplicatedSubscriptionsSnapshotRequest(payload));
                    break;
                }
                case 11: {
                    this.receivedSnapshotResponse(position, Markers.parseReplicatedSubscriptionsSnapshotResponse(payload));
                    break;
                }
                case 13: {
                    this.receiveSubscriptionUpdated(Markers.parseReplicatedSubscriptionsUpdate(payload));
                    break;
                }
            }
        }
        catch (IOException e) {
            log.warn("[{}] Failed to parse marker: {}", (Object)this.topic.getName(), (Object)e);
        }
    }

    public void localSubscriptionUpdated(String subscriptionName, ReplicatedSubscriptionsSnapshot snapshot) {
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Updating subscription to snapshot {}", new Object[]{this.topic, subscriptionName, snapshot.getClustersList().stream().map(cmid -> String.format("%s -> %d:%d", cmid.getCluster(), cmid.getMessageId().getLedgerId(), cmid.getMessageId().getEntryId())).collect(Collectors.toList())});
        }
        TreeMap<String, MarkersMessageIdData> clusterIds = new TreeMap<String, MarkersMessageIdData>();
        int size = snapshot.getClustersCount();
        for (int i = 0; i < size; ++i) {
            ClusterMessageId cmid2 = snapshot.getClusterAt(i);
            clusterIds.put(cmid2.getCluster(), cmid2.getMessageId());
        }
        ByteBuf subscriptionUpdate = Markers.newReplicatedSubscriptionsUpdate(subscriptionName, clusterIds);
        this.writeMarker(subscriptionUpdate);
    }

    private void receivedSnapshotRequest(ReplicatedSubscriptionsSnapshotRequest request) {
        Replicator replicator = this.topic.getReplicators().get(request.getSourceCluster());
        if (!replicator.isConnected()) {
            this.topic.startReplProducers();
        }
        PositionImpl lastMsgId = (PositionImpl)this.topic.getLastPosition();
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received snapshot request. Last msg id: {}", (Object)this.topic.getName(), (Object)lastMsgId);
        }
        ByteBuf marker = Markers.newReplicatedSubscriptionsSnapshotResponse(request.getSnapshotId(), request.getSourceCluster(), this.localCluster, lastMsgId.getLedgerId(), lastMsgId.getEntryId());
        this.writeMarker(marker);
    }

    private void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
        String snapshotId = response.getSnapshotId();
        ReplicatedSubscriptionsSnapshotBuilder builder = (ReplicatedSubscriptionsSnapshotBuilder)this.pendingSnapshots.get(snapshotId);
        if (builder == null) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Received late reply for timed-out snapshot {} from {}", new Object[]{this.topic.getName(), snapshotId, response.getCluster().getCluster()});
            }
            return;
        }
        builder.receivedSnapshotResponse(position, response);
    }

    private void receiveSubscriptionUpdated(ReplicatedSubscriptionsUpdate update) {
        PersistentSubscription sub;
        MarkersMessageIdData updatedMessageId = null;
        int size = update.getClustersCount();
        for (int i = 0; i < size; ++i) {
            ClusterMessageId cmid = update.getClusterAt(i);
            if (!this.localCluster.equals(cmid.getCluster())) continue;
            updatedMessageId = cmid.getMessageId();
        }
        if (updatedMessageId == null) {
            return;
        }
        PositionImpl pos = new PositionImpl(updatedMessageId.getLedgerId(), updatedMessageId.getEntryId());
        if (log.isDebugEnabled()) {
            log.debug("[{}][{}] Received update for subscription to {}", new Object[]{this.topic, update.getSubscriptionName(), pos});
        }
        if ((sub = this.topic.getSubscription(update.getSubscriptionName())) != null) {
            sub.acknowledgeMessage(Collections.singletonList(pos), CommandAck.AckType.Cumulative, Collections.emptyMap());
        } else {
            log.info("[{}][{}] Creating subscription at {}:{} after receiving update from replicated subcription", new Object[]{this.topic, update.getSubscriptionName(), updatedMessageId.getLedgerId(), pos});
            this.topic.createSubscription(update.getSubscriptionName(), CommandSubscribe.InitialPosition.Latest, true, null);
        }
    }

    private void startNewSnapshot() {
        this.cleanupTimedOutSnapshots();
        if (this.topic.getLastDataMessagePublishedTimestamp() < this.lastCompletedSnapshotStartTime || this.topic.getLastDataMessagePublishedTimestamp() == 0L) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] There is no new data in topic. Skipping snapshot creation.", (Object)this.topic.getName());
            }
            return;
        }
        MutableBoolean anyReplicatorDisconnected = new MutableBoolean();
        this.topic.getReplicators().forEach((cluster, replicator) -> {
            if (!replicator.isConnected()) {
                anyReplicatorDisconnected.setTrue();
            }
        });
        if (anyReplicatorDisconnected.isTrue()) {
            if (log.isDebugEnabled()) {
                log.debug("[{}] Do not attempt to create snapshot when some of the clusters are not reachable.", (Object)this.topic.getName());
            }
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Starting snapshot creation.", (Object)this.topic.getName());
        }
        pendingSnapshotsMetric.inc();
        ReplicatedSubscriptionsSnapshotBuilder builder = new ReplicatedSubscriptionsSnapshotBuilder(this, this.topic.getReplicators().keys(), this.topic.getBrokerService().pulsar().getConfiguration(), Clock.systemUTC());
        this.pendingSnapshots.put(builder.getSnapshotId(), builder);
        builder.start();
    }

    public Optional<String> getLastCompletedSnapshotId() {
        return Optional.ofNullable(this.lastCompletedSnapshotId);
    }

    private void cleanupTimedOutSnapshots() {
        Iterator it = this.pendingSnapshots.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry entry = it.next();
            if (!((ReplicatedSubscriptionsSnapshotBuilder)entry.getValue()).isTimedOut()) continue;
            if (log.isDebugEnabled()) {
                log.debug("[{}] Snapshot creation timed out for {}", (Object)this.topic.getName(), entry.getKey());
            }
            pendingSnapshotsMetric.dec();
            it.remove();
        }
    }

    void snapshotCompleted(String snapshotId) {
        ReplicatedSubscriptionsSnapshotBuilder snapshot = (ReplicatedSubscriptionsSnapshotBuilder)this.pendingSnapshots.remove(snapshotId);
        pendingSnapshotsMetric.dec();
        this.lastCompletedSnapshotId = snapshotId;
        if (snapshot != null) {
            this.lastCompletedSnapshotStartTime = snapshot.getStartTimeMillis();
        }
    }

    void writeMarker(ByteBuf marker) {
        try {
            this.topic.publishMessage(marker, this);
        }
        finally {
            marker.release();
        }
    }

    @Override
    public void completed(Exception e, long ledgerId, long entryId) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Published marker at {}:{}. Exception: {}", new Object[]{this.topic.getName(), ledgerId, entryId, e});
        }
        this.positionOfLastLocalMarker = new PositionImpl(ledgerId, entryId);
    }

    PersistentTopic topic() {
        return this.topic;
    }

    String localCluster() {
        return this.localCluster;
    }

    @Override
    public boolean isMarkerMessage() {
        return true;
    }

    @Override
    public void close() {
        this.timer.cancel(true);
    }
}

