/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.service.persistent;

import java.time.Clock;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.ReplicatedSubscriptionsController;
import org.apache.pulsar.shade.io.prometheus.client.Summary;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.MarkersMessageIdData;
import org.apache.pulsar.shade.org.apache.pulsar.common.api.proto.ReplicatedSubscriptionsSnapshotResponse;
import org.apache.pulsar.shade.org.apache.pulsar.common.protocol.Markers;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ReplicatedSubscriptionsSnapshotBuilder {
    private static final Logger log = LoggerFactory.getLogger(ReplicatedSubscriptionsSnapshotBuilder.class);
    private final String snapshotId;
    private final ReplicatedSubscriptionsController controller;
    private final Map<String, MarkersMessageIdData> responses = new TreeMap<String, MarkersMessageIdData>();
    private final List<String> remoteClusters;
    private final Set<String> missingClusters;
    private final boolean needTwoRounds;
    private boolean firstRoundComplete;
    private long startTimeMillis;
    private final long timeoutMillis;
    private final Clock clock;
    private static final Summary snapshotMetric = (Summary)Summary.build("pulsar_replicated_subscriptions_snapshot_ms", "Time taken to create a consistent snapshot across clusters").register();

    public ReplicatedSubscriptionsSnapshotBuilder(ReplicatedSubscriptionsController controller, List<String> remoteClusters, ServiceConfiguration conf, Clock clock) {
        this.snapshotId = UUID.randomUUID().toString();
        this.controller = controller;
        this.remoteClusters = remoteClusters;
        this.missingClusters = new TreeSet<String>(remoteClusters);
        this.clock = clock;
        this.timeoutMillis = TimeUnit.SECONDS.toMillis(conf.getReplicatedSubscriptionsSnapshotTimeoutSeconds());
        this.needTwoRounds = remoteClusters.size() > 1;
    }

    String getSnapshotId() {
        return this.snapshotId;
    }

    void start() {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Starting new snapshot {} - Clusters: {}", new Object[]{this.controller.topic().getName(), this.snapshotId, this.missingClusters});
        }
        this.startTimeMillis = this.clock.millis();
        this.controller.writeMarker(Markers.newReplicatedSubscriptionsSnapshotRequest(this.snapshotId, this.controller.localCluster()));
    }

    synchronized void receivedSnapshotResponse(Position position, ReplicatedSubscriptionsSnapshotResponse response) {
        if (log.isDebugEnabled()) {
            log.debug("[{}] Received response from {}", (Object)this.controller.topic().getName(), (Object)response.getCluster().getCluster());
        }
        String cluster = response.getCluster().getCluster();
        this.responses.putIfAbsent(cluster, new MarkersMessageIdData().copyFrom(response.getCluster().getMessageId()));
        this.missingClusters.remove(cluster);
        if (log.isDebugEnabled()) {
            log.debug("[{}] Missing clusters {}", (Object)this.controller.topic().getName(), this.missingClusters);
        }
        if (!this.missingClusters.isEmpty()) {
            return;
        }
        if (this.needTwoRounds && !this.firstRoundComplete) {
            this.firstRoundComplete = true;
            this.missingClusters.addAll(this.remoteClusters);
            this.controller.writeMarker(Markers.newReplicatedSubscriptionsSnapshotRequest(this.snapshotId, this.controller.localCluster()));
            return;
        }
        if (log.isDebugEnabled()) {
            log.debug("[{}] Snapshot is complete {}", (Object)this.controller.topic().getName(), (Object)this.snapshotId);
        }
        PositionImpl p = (PositionImpl)position;
        this.controller.writeMarker(Markers.newReplicatedSubscriptionsSnapshot(this.snapshotId, this.controller.localCluster(), p.getLedgerId(), p.getEntryId(), this.responses));
        this.controller.snapshotCompleted(this.snapshotId);
        double latencyMillis = this.clock.millis() - this.startTimeMillis;
        snapshotMetric.observe(latencyMillis);
    }

    boolean isTimedOut() {
        return this.startTimeMillis + this.timeoutMillis < this.clock.millis();
    }

    long getStartTimeMillis() {
        return this.startTimeMillis;
    }
}

