package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.config.ClusterConfig;
import com.couchbase.client.core.logging.RedactableArgument;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.transactions.atr.ATRIds;
import com.couchbase.transactions.components.ATREntry;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.components.CasMode;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ThreadStopRequested;
import com.couchbase.transactions.log.LostCleanupThreadEndedPrematurely;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionCleanupEndRunEvent;
import com.couchbase.transactions.log.TransactionCleanupStartRunEvent;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.util.DebugUtil;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuples;

@Stability.Internal
/* loaded from: input_file:com/couchbase/transactions/cleanup/LostCleanupDistributed.class */
public class LostCleanupDistributed {
    private final ClientRecord clientRecord;
    private final SimpleEventBusLogger LOGGER;
    private final ClusterData clusterData;
    private final TransactionConfig config;
    private final Supplier<Cleaner> cleanerSupplier;
    private CountDownLatch stopLatch;
    private final Duration actualCleanupWindow;
    private static final Duration DEFAULT_SAFETY_MARGIN = Duration.of(1500, ChronoUnit.MILLIS);
    private volatile boolean stop = false;
    private final String clientUuid = UUID.randomUUID().toString();
    private final Set<String> bucketThreads = ConcurrentHashMap.newKeySet();
    private final String bp = String.format("Client %s", this.clientUuid.substring(0, 5));

    public LostCleanupDistributed(TransactionConfig transactionConfig, ClusterData clusterData, Supplier<Cleaner> supplier) {
        this.LOGGER = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), TransactionsCleanup.LOST_CATEGORY);
        this.clientRecord = transactionConfig.clientRecordFactory().create(transactionConfig, clusterData);
        this.clusterData = clusterData;
        this.config = transactionConfig;
        this.cleanerSupplier = supplier;
        this.actualCleanupWindow = transactionConfig.cleanupWindow();
    }

    public String clientUuid() {
        return this.clientUuid;
    }

    public void stop() {
        stopThreads();
        this.clientRecord.removeClientFromAllBuckets(this.clientUuid).onErrorResume(th -> {
            this.LOGGER.warn(String.format("%s failed to remove from all buckets with err: %s", this.bp, th));
            return Mono.empty();
        }).blockLast();
        this.LOGGER.info(String.format("%s stopped lost cleanup process", this.bp));
    }

    public void stopThreads() {
        int size = this.bucketThreads.size();
        this.LOGGER.info(String.format("%s stopping lost cleanup process, waiting for %d threads to end", this.bp, Integer.valueOf(size)));
        this.stopLatch = new CountDownLatch(size);
        this.stop = true;
        try {
            if (!this.stopLatch.await(10L, TimeUnit.SECONDS)) {
                this.LOGGER.info("Lost background cleanup threads did not stop in time");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    private boolean logDebug() {
        return true;
    }

    private boolean logVerbose() {
        return true;
    }

    public static List<String> atrsToHandle(int i, int i2, int i3) {
        List<String> allAtrs = ATRIds.allAtrs(i3);
        ArrayList arrayList = new ArrayList();
        int i4 = i;
        while (true) {
            int i5 = i4;
            if (i5 >= allAtrs.size()) {
                return arrayList;
            }
            arrayList.add(allAtrs.get(i5));
            i4 = i5 + i2;
        }
    }

    public Flux<TransactionCleanupAttempt> handleATRCleanup(String str, ReactiveCollection reactiveCollection, String str2, ATRStats aTRStats, Duration duration) {
        return Flux.defer(() -> {
            long nanoTime = System.nanoTime();
            AtomicLong atomicLong = new AtomicLong(0L);
            AtomicReference atomicReference = new AtomicReference(CasMode.UNKNOWN);
            SpanWrapper start = SpanWrapper.create(this.config, "transaction_distclean_atr").start();
            Cleaner cleaner = this.cleanerSupplier.get();
            return cleaner.beforeAtrGet(str2).then(ActiveTransactionRecord.getAtr(reactiveCollection, str2, OptionsWrapperUtil.kvTimeoutNonMutating(this.config, reactiveCollection.core()), start)).flatMap(optional -> {
                atomicLong.set(System.nanoTime());
                return optional.isPresent() ? Mono.just(optional.get()) : Mono.empty();
            }).doOnError(th -> {
                this.LOGGER.debug(String.format("%s Got error '%s' while getting ATR %s/", str, th, ATRUtil.getAtrDebug(reactiveCollection, str2)));
                aTRStats.errored = Optional.of(th);
            }).flatMapMany(atr -> {
                atomicReference.set(atr.casMode());
                aTRStats.numEntries = atr.entries().size();
                aTRStats.exists = true;
                aTRStats.errored = Optional.empty();
                Collection<ATREntry> collection = (Collection) atr.entries().stream().filter(aTREntry -> {
                    return aTREntry.hasExpired(duration.toMillis(), this.config.transactionExpirationTime());
                }).collect(Collectors.toList());
                aTRStats.expired = collection;
                return Flux.fromIterable(collection);
            }).concatMap(aTREntry -> {
                if (logDebug()) {
                    this.LOGGER.verbose(String.format("%s Found expired attempt %s, expires after %d, age %d (started %d, now %d)", str, aTREntry.attemptId(), aTREntry.expiresAfterMsecs().orElse(-1), Long.valueOf(aTREntry.ageMsecs()), aTREntry.timestampStartMsecs().orElse(0L), Long.valueOf(aTREntry.cas() / 1000000)));
                }
                aTRStats.expiredEntryCleanupTotalAttempts.incrementAndGet();
                return cleaner.performCleanup(CleanupRequest.fromAtrEntry(reactiveCollection, aTREntry), false, start).onErrorResume(th2 -> {
                    aTRStats.expiredEntryCleanupFailedAttempts.incrementAndGet();
                    return Mono.empty();
                });
            }).doFinally(signalType -> {
                this.LOGGER.verbose(String.format("%s processed ATR %s after %d millis (%d fetching ATR), CAS=%s: %s", str, ATRUtil.getAtrDebug(reactiveCollection, str2), Long.valueOf((System.nanoTime() - nanoTime) / 1000000), Long.valueOf((atomicLong.get() - nanoTime) / 1000000), atomicReference.get(), aTRStats));
                start.finish();
            }).onErrorResume(th2 -> {
                return Mono.empty();
            });
        });
    }

    void configureFromClusterConfig(ClusterConfig clusterConfig) {
        if (this.stop) {
            this.LOGGER.info(String.format("ignoring new cluster config with %d buckets as stopping", Integer.valueOf(clusterConfig.bucketConfigs().size())));
        } else {
            this.LOGGER.info(String.format("new cluster config with %d buckets", Integer.valueOf(clusterConfig.bucketConfigs().size())));
            Flux.fromIterable(clusterConfig.bucketConfigs().values()).flatMap(bucketConfig -> {
                String name = bucketConfig.name();
                if (!this.bucketThreads.add(name)) {
                    return Mono.empty();
                }
                this.LOGGER.info(String.format("will start cleaning lost transactions on bucket %s", RedactableArgument.redactMeta(name)));
                return this.clusterData.getBucketFromName(name).waitUntilReady(Duration.ofSeconds(2147483647L)).then(this.clusterData.getBucketDefaultCollection(name)).flatMap(reactiveCollection -> {
                    this.LOGGER.info(String.format("%s %s/%s creating thread to handle lost transactions", this.bp, RedactableArgument.redactMeta(reactiveCollection.bucketName()), RedactableArgument.redactMeta(reactiveCollection.name())));
                    return perBucketThread(reactiveCollection).doOnError(th -> {
                        if (th instanceof ThreadStopRequested) {
                            return;
                        }
                        this.LOGGER.warn(String.format("%s %s/%s lost transactions thread has ended on error %s", this.bp, RedactableArgument.redactMeta(reactiveCollection.bucketName()), RedactableArgument.redactMeta(reactiveCollection.name()), DebugUtil.dbg(th)));
                    });
                });
            }).subscribe(r8 -> {
                this.LOGGER.warn(String.format("%s lost transactions cleanup thread(s) ending", this.bp));
            }, th -> {
                if (th instanceof ThreadStopRequested) {
                    return;
                }
                this.LOGGER.warn(String.format("%s lost transactions cleanup thread ended with exception " + th, this.bp));
                this.clusterData.cluster().environment().eventBus().publish(new LostCleanupThreadEndedPrematurely(th));
            });
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void start() {
        if (this.config.metadataCollection().isPresent()) {
            perBucketThread(this.config.metadataCollection().get().reactive()).doOnError(th -> {
                if (th instanceof ThreadStopRequested) {
                    return;
                }
                this.LOGGER.warn(String.format("%s %s/%s lost transactions thread has ended on error %s", this.bp, RedactableArgument.redactMeta(this.config.metadataCollection().get().bucketName()), RedactableArgument.redactMeta(this.config.metadataCollection().get().name()), DebugUtil.dbg(th)));
            }).subscribe();
        } else {
            configureFromClusterConfig(this.clusterData.cluster().core().clusterConfig());
            this.clusterData.cluster().core().configurationProvider().configs().subscribe(this::configureFromClusterConfig);
        }
    }

    private Mono<Void> perBucketThread(ReactiveCollection reactiveCollection) {
        return Mono.defer(() -> {
            String str = "lost/" + reactiveCollection.bucketName() + "/" + reactiveCollection.name() + "/clientId=" + this.clientUuid.substring(0, 5);
            return this.clientRecord.processClient(this.clientUuid, reactiveCollection, this.config).flatMap(clientRecordDetails -> {
                long nanoTime = System.nanoTime();
                HashMap hashMap = new HashMap();
                List<String> atrsToHandle = atrsToHandle(clientRecordDetails.indexOfThisClient(), clientRecordDetails.numActiveClients(), this.config.numAtrs());
                long max = Math.max(1L, this.actualCleanupWindow.toMillis() / atrsToHandle.size());
                if (atrsToHandle.size() < this.config.numAtrs()) {
                    atrsToHandle.forEach(str2 -> {
                    });
                } else {
                    this.LOGGER.verbose(String.format("%s owns all %d ATRs and will check them over next %dmills, checking an ATR every %dmillis", str, Integer.valueOf(this.config.numAtrs()), Long.valueOf(this.actualCleanupWindow.toMillis()), Long.valueOf(max)));
                }
                TransactionCleanupStartRunEvent transactionCleanupStartRunEvent = new TransactionCleanupStartRunEvent(reactiveCollection.bucketName(), reactiveCollection.name(), this.clientUuid, clientRecordDetails, this.actualCleanupWindow, atrsToHandle.size(), this.config.numAtrs(), Duration.ofMillis(max));
                this.clusterData.cluster().environment().eventBus().publish(transactionCleanupStartRunEvent);
                return Flux.fromIterable(atrsToHandle).delayElements(Duration.of(max, ChronoUnit.MILLIS)).concatMap(str3 -> {
                    if (logVerbose()) {
                        this.LOGGER.verbose(String.format("%s checking for lost txns in atr %s", str, ATRUtil.getAtrDebug(reactiveCollection, str3)));
                    }
                    ATRStats aTRStats = new ATRStats();
                    return checkIfThreadStopped(reactiveCollection, str3).thenMany(handleATRCleanup(str, reactiveCollection, str3, aTRStats, DEFAULT_SAFETY_MARGIN)).then(Mono.fromRunnable(() -> {
                    })).thenReturn(str3);
                }).onErrorResume(th -> {
                    if (th instanceof ThreadStopRequested) {
                        return Mono.error(th);
                    }
                    this.LOGGER.debug(String.format("%s lost cleanup thread got error '%s', continuing", str, th));
                    return Mono.empty();
                }).then().thenReturn(Tuples.of(hashMap, transactionCleanupStartRunEvent, Long.valueOf(nanoTime)));
            }).doOnNext(tuple3 -> {
                this.clusterData.cluster().environment().eventBus().publish(new TransactionCleanupEndRunEvent((TransactionCleanupStartRunEvent) tuple3.getT2(), (Map) tuple3.getT1(), Duration.ofNanos(System.nanoTime() - ((Long) tuple3.getT3()).longValue())));
            }).retryWhen(Retry.allBut(new Class[]{ThreadStopRequested.class}).exponentialBackoff(Duration.ofSeconds(1L), this.config.cleanupWindow()).doOnRetry(retryContext -> {
                this.LOGGER.debug(String.format("%s retrying lost cleanup on error %s after %s", str, DebugUtil.dbg(retryContext.exception()), retryContext.backoff()));
            })).repeat().then();
        });
    }

    private Mono<String> checkIfThreadStopped(ReactiveCollection reactiveCollection, String str) {
        return Mono.defer(() -> {
            if (!this.stop) {
                return Mono.just(str);
            }
            this.LOGGER.info(String.format("Stopping background cleanup thread for lost transactions on %s/%s", reactiveCollection.bucketName(), reactiveCollection.name()));
            this.stopLatch.countDown();
            return Mono.error(new ThreadStopRequested());
        });
    }
}
