package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ThreadStopRequested;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionLogEvent;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Stability.Internal
/* loaded from: input_file:com/couchbase/transactions/cleanup/TransactionsCleanup.class */
public class TransactionsCleanup {
    public static String CATEGORY = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup";
    public static String CATEGORY_STATS = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup.stats";
    public static String CATEGORY_CLIENT_RECORD = TransactionLogEvent.DEFAULT_CATEGORY + ".clientrecord";
    public static String LOST_CATEGORY = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup.lost";
    public static String REGULAR_CATEGORY = TransactionLogEvent.DEFAULT_CATEGORY + ".cleanup.regular";
    private final TransactionConfig config;
    private final CountDownLatch stopLatch;
    private final ClusterData clusterData;
    private final LostCleanupDistributed lostCleanup;
    private final SimpleEventBusLogger LOGGER;
    private final SimpleEventBusLogger LOGGER_REGULAR;
    private final CleanerFactory cleanerFactory;
    private final DelayQueue<CleanupRequest> cleanupQueue = new DelayQueue<>();
    private volatile boolean stop = false;

    public TransactionsCleanup(TransactionConfig transactionConfig, ClusterData clusterData) {
        this.LOGGER = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), CATEGORY);
        this.LOGGER_REGULAR = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), REGULAR_CATEGORY);
        this.config = transactionConfig;
        this.clusterData = clusterData;
        this.lostCleanup = new LostCleanupDistributed(transactionConfig, clusterData, () -> {
            return getCleaner();
        });
        int i = 0;
        this.cleanerFactory = transactionConfig.cleanerFactory();
        this.LOGGER.info("Cleanup settings; regular cleanup thread enabled=" + transactionConfig.runRegularAttemptsCleanupThread() + ", lost cleanup thread enabled=" + transactionConfig.runLostAttemptsCleanupThread());
        if (transactionConfig.runRegularAttemptsCleanupThread()) {
            runRegularAttemptsCleanupThread();
            i = 0 + 1;
        }
        if (transactionConfig.runLostAttemptsCleanupThread()) {
            this.lostCleanup.start();
        }
        this.stopLatch = new CountDownLatch(i);
    }

    boolean logDebug() {
        return true;
    }

    boolean logVerbose() {
        return true;
    }

    public ClusterData clusterData() {
        return this.clusterData;
    }

    public List<TransactionCleanupAttempt> forceCleanupQueueEmpty() {
        this.LOGGER.info("Forcing synchronous cleanup of all " + this.cleanupQueue.size() + " cleanup requests");
        SpanWrapper create = SpanWrapper.create(this.config, "transaction_force_cleanup_queue_empty");
        create.start();
        Flux flatMap = Flux.fromStream(this.cleanupQueue.stream()).flatMap(cleanupRequest -> {
            this.LOGGER_REGULAR.info("Forcing cleanup of " + cleanupRequest.attemptId());
            return getCleaner().performCleanup(cleanupRequest, true, create).onErrorResume(th -> {
                this.LOGGER_REGULAR.info(String.format("While trying to force cleanup for attempt %s got error '%s', continuing and leaving a lost txn", cleanupRequest.attemptId(), th.toString()));
                return Mono.empty();
            });
        });
        this.LOGGER_REGULAR.info("Finished synchronous cleanup of all cleanup requests");
        create.finish();
        return (List) flatMap.collectList().block();
    }

    public void stopBackgroundProcesses() {
        synchronized (this) {
            this.stop = true;
        }
        this.LOGGER.info(String.format("Waiting for %d regular background threads to exit", Long.valueOf(this.stopLatch.getCount())));
        try {
            if (!this.stopLatch.await(10L, TimeUnit.SECONDS)) {
                this.LOGGER.info("Background threads did not stop in expected time");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        this.lostCleanup.stop();
        this.LOGGER.info("Background threads have exitted");
    }

    public LostCleanupDistributed lostCleanup() {
        return this.lostCleanup;
    }

    private void runRegularAttemptsCleanupThread() {
        Objects.requireNonNull(this.LOGGER);
        this.LOGGER_REGULAR.info("Starting background cleanup thread to find transactions from this client");
        Flux.interval(Duration.of(100L, ChronoUnit.MILLIS), Schedulers.elastic()).flatMap(l -> {
            if (!this.stop) {
                return Mono.just(l);
            }
            this.LOGGER_REGULAR.info("Stopping background cleanup thread for transactions from this client");
            this.stopLatch.countDown();
            return Mono.error(new ThreadStopRequested());
        }).flatMap(l2 -> {
            CleanupRequest poll;
            ArrayList arrayList = new ArrayList();
            do {
                poll = this.cleanupQueue.poll();
                if (poll != null) {
                    arrayList.add(poll);
                }
            } while (poll != null);
            return Flux.fromIterable(arrayList);
        }).flatMap(cleanupRequest -> {
            SpanWrapper create = SpanWrapper.create(this.config, "transaction_cleanup");
            Cleaner cleaner = getCleaner();
            return cleaner.performCleanup(cleanupRequest, true, create).doOnSuccess(transactionCleanupAttempt -> {
                this.LOGGER_REGULAR.debug(String.format("result of cleanup request %s: success=%s", cleanupRequest, Boolean.valueOf(transactionCleanupAttempt.success())));
                if (transactionCleanupAttempt.success()) {
                    return;
                }
                transactionCleanupAttempt.logs().forEach(logDefer -> {
                    this.LOGGER_REGULAR.debug(String.format("result of cleanup request %s log: %s", cleanupRequest, logDefer.toString()));
                });
            }).onErrorResume(th -> {
                Duration timeBeforeRehandlingFailedCleanup = cleaner.timeBeforeRehandlingFailedCleanup();
                this.clusterData.cluster().environment().eventBus().publish(new CleanupFailedEvent(cleanupRequest, th));
                this.LOGGER_REGULAR.debug(String.format("error while handling cleanup request %s, retrying in %dmsecs: '%s'", cleanupRequest, Long.valueOf(timeBeforeRehandlingFailedCleanup.toMillis()), th));
                this.cleanupQueue.add((DelayQueue<CleanupRequest>) new CleanupRequest(cleanupRequest.attemptId(), cleanupRequest.atrId(), cleanupRequest.atrCollection(), cleanupRequest.state(), cleanupRequest.stagedReplaces(), cleanupRequest.stagedRemoves(), cleanupRequest.stagedInserts(), timeBeforeRehandlingFailedCleanup, cleanupRequest.forwardCompatibility(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - cleanupRequest.createdAt())));
                return Mono.empty();
            }).doOnSubscribe(subscription -> {
                create.start();
            }).doFinally(signalType -> {
                create.finish();
            });
        }).retryWhen(Retry.allBut(new Class[]{ThreadStopRequested.class}).exponentialBackoff(Duration.of(10L, ChronoUnit.MILLIS), Duration.of(2000L, ChronoUnit.MILLIS)).doOnRetry(retryContext -> {
            this.LOGGER_REGULAR.debug(String.format("retrying regular cleanup on error '%s'", retryContext.exception()));
        }).retryMax(100000L)).subscribe(transactionCleanupAttempt -> {
        }, th -> {
            if (th instanceof ThreadStopRequested) {
                return;
            }
            this.LOGGER_REGULAR.warn("regular cleanup thread ended with exception " + th);
        }, () -> {
            this.LOGGER_REGULAR.warn("regular cleanup thread ending");
        });
    }

    public Cleaner getCleaner() {
        return this.cleanerFactory.create(this.config, this.clusterData);
    }

    public void clearCleanupQueue() {
        this.cleanupQueue.clear();
    }

    public int cleanupQueueLength() {
        return this.cleanupQueue.size();
    }

    public void add(CleanupRequest cleanupRequest) {
        this.cleanupQueue.add((DelayQueue<CleanupRequest>) cleanupRequest);
    }

    public Mono<Void> forceATRCleanup(ReactiveCollection reactiveCollection, String str) {
        SpanWrapper create = SpanWrapper.create(this.config, "transaction_force_atr_cleanup");
        return ActiveTransactionRecord.getAtr(reactiveCollection, str, OptionsWrapperUtil.kvTimeoutNonMutating(this.config, reactiveCollection.core()), create).flatMap(optional -> {
            return optional.isPresent() ? Mono.just(optional.get()) : Mono.empty();
        }).doOnError(th -> {
            this.LOGGER_REGULAR.debug(String.format("Got error '%s' while cleaning up ATR %s/%s", th, reactiveCollection.name(), str));
        }).flatMapMany(atr -> {
            return Flux.fromIterable(atr.entries());
        }).flatMap(aTREntry -> {
            return getCleaner().performCleanup(CleanupRequest.fromAtrEntry(reactiveCollection, aTREntry), false, create);
        }).then().doOnSubscribe(subscription -> {
            create.start();
        }).doFinally(signalType -> {
            create.finish();
        });
    }
}
