package com.couchbase.client.dcp.buffer;

import com.couchbase.client.dcp.Client;
import com.couchbase.client.dcp.buffer.DcpOps;
import com.couchbase.client.dcp.conductor.BucketConfigSource;
import com.couchbase.client.dcp.conductor.DcpChannel;
import com.couchbase.client.dcp.config.HostAndPort;
import com.couchbase.client.dcp.core.state.NotConnectedException;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelHandlerContext;
import com.couchbase.client.dcp.deps.io.netty.channel.ChannelInboundHandlerAdapter;
import io.micrometer.core.instrument.Metrics;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.LongAdder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import rx.SingleSubscriber;
import rx.Subscription;
import rx.schedulers.Schedulers;

/* loaded from: input_file:com/couchbase/client/dcp/buffer/PersistencePollingHandler.class */
public class PersistencePollingHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(PersistencePollingHandler.class);
    private static final LongAdder scheduledPollingTasks = (LongAdder) Objects.requireNonNull(Metrics.globalRegistry.gauge("dcp.scheduled.polling.tasks", new LongAdder()));
    private final Client.Environment env;
    private final BucketConfigSource bucketConfigSource;
    private final DcpOps dcpOps;
    private final PersistedSeqnos persistedSeqnos;
    private final AtomicBoolean loggedClosureWarning = new AtomicBoolean();
    private Subscription configSubscription;
    private int activeGroupId;

    public PersistencePollingHandler(Client.Environment environment, BucketConfigSource bucketConfigSource, DcpRequestDispatcher dcpRequestDispatcher) {
        this.env = (Client.Environment) Objects.requireNonNull(environment);
        this.bucketConfigSource = (BucketConfigSource) Objects.requireNonNull(bucketConfigSource);
        this.persistedSeqnos = (PersistedSeqnos) Objects.requireNonNull(environment.persistedSeqnos());
        this.dcpOps = new DcpOpsImpl(dcpRequestDispatcher);
    }

    @Override // com.couchbase.client.dcp.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.dcp.deps.io.netty.channel.ChannelInboundHandler
    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelInactive(channelHandlerContext);
        if (this.configSubscription != null) {
            this.configSubscription.unsubscribe();
        }
        this.activeGroupId++;
    }

    @Override // com.couchbase.client.dcp.deps.io.netty.channel.ChannelInboundHandlerAdapter, com.couchbase.client.dcp.deps.io.netty.channel.ChannelInboundHandler
    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
        super.channelActive(channelHandlerContext);
        this.configSubscription = this.bucketConfigSource.configs().observeOn(Schedulers.from(channelHandlerContext.executor())).subscribe(dcpBucketConfig -> {
            reconfigure(channelHandlerContext, dcpBucketConfig);
        });
    }

    private void reconfigure(ChannelHandlerContext channelHandlerContext, DcpBucketConfig dcpBucketConfig) {
        LOGGER.debug("Reconfiguring persistence pollers.");
        int i = this.activeGroupId + 1;
        this.activeGroupId = i;
        this.persistedSeqnos.reset(dcpBucketConfig);
        LOGGER.debug("Starting persistence polling group {}", Integer.valueOf(i));
        try {
            for (PartitionInstance partitionInstance : dcpBucketConfig.getAbsentPartitionInstances()) {
                LOGGER.debug("Partition instance {} is absent, will assume all seqnos persisted.", partitionInstance);
                this.persistedSeqnos.markAsAbsent(partitionInstance);
            }
            HostAndPort hostAndPort = DcpChannel.getHostAndPort(channelHandlerContext.channel());
            List<PartitionInstance> hostedPartitions = dcpBucketConfig.getHostedPartitions(hostAndPort);
            LOGGER.debug("Node {} hosts partitions {}", hostAndPort, hostedPartitions);
            for (PartitionInstance partitionInstance2 : hostedPartitions) {
                this.dcpOps.getFailoverLog(partitionInstance2.partition()).subscribe(failoverLogResponse -> {
                    observeAndRepeat(channelHandlerContext, partitionInstance2, failoverLogResponse.getCurrentVbuuid(), i);
                }, th -> {
                    if (th instanceof DcpOps.BadResponseStatusException) {
                        logWarningAndClose(channelHandlerContext, "Failed to fetch failover log for {}. Server response: {}", partitionInstance2, th.getMessage());
                    } else {
                        logWarningAndClose(channelHandlerContext, "Failed to fetch failover log for {}.", partitionInstance2, th);
                    }
                });
            }
        } catch (Throwable th2) {
            logWarningAndClose(channelHandlerContext, "Failed to reconfigure persistence poller.", th2);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void scheduleObserveAndRepeat(ChannelHandlerContext channelHandlerContext, PartitionInstance partitionInstance, long j, int i, int i2) {
        if (i2 < 1) {
            throw new IllegalArgumentException("Interval multiplier must be > 0");
        }
        try {
            channelHandlerContext.executor().schedule(() -> {
                scheduledPollingTasks.decrement();
                observeAndRepeat(channelHandlerContext, partitionInstance, j, i);
            }, this.env.persistencePollingIntervalMillis() * i2, TimeUnit.MILLISECONDS);
            scheduledPollingTasks.increment();
        } catch (Throwable th) {
            logWarningAndClose(channelHandlerContext, "Failed to schedule observeSeqno.", th);
        }
    }

    private void observeAndRepeat(final ChannelHandlerContext channelHandlerContext, final PartitionInstance partitionInstance, final long j, final int i) {
        if (this.activeGroupId != i) {
            LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", Integer.valueOf(i), partitionInstance);
        } else if (this.env.streamEventBuffer().hasBufferedEvents(partitionInstance.partition())) {
            this.dcpOps.observeSeqno(partitionInstance.partition(), j).subscribe(new SingleSubscriber<ObserveSeqnoResponse>() { // from class: com.couchbase.client.dcp.buffer.PersistencePollingHandler.1
                public void onSuccess(ObserveSeqnoResponse observeSeqnoResponse) {
                    try {
                        if (PersistencePollingHandler.this.activeGroupId != i) {
                            PersistencePollingHandler.LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", Integer.valueOf(i), partitionInstance);
                            return;
                        }
                        long vbuuid = observeSeqnoResponse.vbuuid();
                        PersistencePollingHandler.this.env.streamEventBuffer().onSeqnoPersisted(observeSeqnoResponse.vbid(), PersistencePollingHandler.this.persistedSeqnos.update(partitionInstance, vbuuid, observeSeqnoResponse.persistSeqno()));
                        PersistencePollingHandler.this.scheduleObserveAndRepeat(channelHandlerContext, partitionInstance, vbuuid, i, 1);
                    } catch (Throwable th) {
                        PersistencePollingHandler.this.logWarningAndClose(channelHandlerContext, "Fatal error in observeAndRepeat handling observeSeqno response.", th);
                    }
                }

                public void onError(Throwable th) {
                    if (PersistencePollingHandler.this.activeGroupId != i || (th instanceof NotConnectedException)) {
                        PersistencePollingHandler.LOGGER.debug("Polling group {} is no longer active; stopping polling for {}", Integer.valueOf(i), partitionInstance);
                        return;
                    }
                    if (!(th instanceof DcpOps.BadResponseStatusException)) {
                        PersistencePollingHandler.this.logWarningAndClose(channelHandlerContext, "observeSeqno failed.", th);
                        return;
                    }
                    DcpOps.BadResponseStatusException badResponseStatusException = (DcpOps.BadResponseStatusException) th;
                    if (!badResponseStatusException.status().isTemporary()) {
                        PersistencePollingHandler.this.logWarningAndClose(channelHandlerContext, "observeSeqno failed with status code " + badResponseStatusException.status(), new Object[0]);
                    } else {
                        PersistencePollingHandler.LOGGER.debug("observeSeqno failed with status code " + badResponseStatusException.status() + " ; will retry after an extended delay.");
                        PersistencePollingHandler.this.scheduleObserveAndRepeat(channelHandlerContext, partitionInstance, j, i, 10);
                    }
                }
            });
        } else {
            LOGGER.trace("No buffered events; skipping observeSeqno for partition instance {}", partitionInstance);
            scheduleObserveAndRepeat(channelHandlerContext, partitionInstance, j, i, 1);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void logWarningAndClose(ChannelHandlerContext channelHandlerContext, String str, Object... objArr) {
        if (!this.loggedClosureWarning.compareAndSet(false, true)) {
            LOGGER.trace("Closing channel; " + str, objArr);
        } else {
            LOGGER.warn("Closing channel; " + str, objArr);
            channelHandlerContext.close();
        }
    }
}
