package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.cnc.Event;
import com.couchbase.client.core.cnc.EventBus;
import com.couchbase.client.core.cnc.RequestTracer;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.codec.Transcoder;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.InsertOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.RemoveOptions;
import com.couchbase.transactions.TransactionGetResult;
import com.couchbase.transactions.components.ATREntry;
import com.couchbase.transactions.components.ATRUtil;
import com.couchbase.transactions.components.DocRecord;
import com.couchbase.transactions.components.DocumentGetter;
import com.couchbase.transactions.config.MergedTransactionConfig;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.forwards.ForwardCompatibility;
import com.couchbase.transactions.forwards.ForwardCompatibilityStages;
import com.couchbase.transactions.forwards.Supported;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.log.TransactionCleanupAttempt;
import com.couchbase.transactions.log.TransactionLogger;
import com.couchbase.transactions.support.AttemptStates;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.support.SpanWrapperUtil;
import com.couchbase.transactions.support.TransactionFields;
import com.couchbase.transactions.util.DebugUtil;
import com.couchbase.transactions.util.TriFunction;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.annotation.Nullable;
import reactor.util.function.Tuple2;

@Stability.Internal
/* loaded from: input_file:com/couchbase/transactions/cleanup/Cleaner.class */
public class Cleaner {
    private final MergedTransactionConfig config;
    private final ClusterData clusterData;
    private final SimpleEventBusLogger LOGGER;
    private final EventBus eventBus;
    private static final Duration TIME_BEFORE_REHANDLING_FAILED_CLEANUP_DEFAULT;
    protected Optional<Duration> timeBeforeRehandlingFailedCleanupDefault = Optional.empty();
    private static final int BEING_LOGGING_FAILED_CLEANUPS_AT_WARN_AFTER_X_MINUTES = 2880;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.couchbase.transactions.cleanup.Cleaner$1, reason: invalid class name */
    /* loaded from: input_file:com/couchbase/transactions/cleanup/Cleaner$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$couchbase$transactions$support$AttemptStates = new int[AttemptStates.values().length];

        static {
            try {
                $SwitchMap$com$couchbase$transactions$support$AttemptStates[AttemptStates.COMMITTED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$couchbase$transactions$support$AttemptStates[AttemptStates.ABORTED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$couchbase$transactions$support$AttemptStates[AttemptStates.PENDING.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$couchbase$transactions$support$AttemptStates[AttemptStates.COMPLETED.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$couchbase$transactions$support$AttemptStates[AttemptStates.ROLLED_BACK.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            try {
                $SwitchMap$com$couchbase$transactions$support$AttemptStates[AttemptStates.NOT_STARTED.ordinal()] = 6;
            } catch (NoSuchFieldError e6) {
            }
        }
    }

    public Cleaner(MergedTransactionConfig mergedTransactionConfig, ClusterData clusterData) {
        this.eventBus = clusterData.cluster().environment().eventBus();
        this.LOGGER = new SimpleEventBusLogger(this.eventBus, TransactionsCleanup.CATEGORY + ".cleaner");
        this.config = mergedTransactionConfig;
        this.clusterData = clusterData;
    }

    private Transcoder getTranscoder() {
        return this.clusterData.cluster().environment().transcoder();
    }

    public Duration timeBeforeRehandlingFailedCleanup() {
        return this.timeBeforeRehandlingFailedCleanupDefault.orElse(TIME_BEFORE_REHANDLING_FAILED_CLEANUP_DEFAULT);
    }

    Mono<Void> cleanupDocs(TransactionLogger transactionLogger, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        String attemptId = cleanupRequest.attemptId();
        switch (AnonymousClass1.$SwitchMap$com$couchbase$transactions$support$AttemptStates[cleanupRequest.state().ordinal()]) {
            case TransactionGetResult.IDX_TRANSACTION_ID /* 1 */:
                return commitDocs(transactionLogger, attemptId, cleanupRequest.stagedInserts(), cleanupRequest, spanWrapper).then(commitDocs(transactionLogger, attemptId, cleanupRequest.stagedReplaces(), cleanupRequest, spanWrapper)).then(removeDocsStagedForRemoval(transactionLogger, attemptId, cleanupRequest.stagedRemoves(), cleanupRequest, spanWrapper));
            case TransactionGetResult.IDX_ATTEMPT_ID /* 2 */:
                return removeDocs(transactionLogger, attemptId, cleanupRequest.stagedInserts(), cleanupRequest, spanWrapper).then(removeTxnLinks(transactionLogger, attemptId, cleanupRequest.stagedReplaces(), cleanupRequest, spanWrapper)).then(removeTxnLinks(transactionLogger, attemptId, cleanupRequest.stagedRemoves(), cleanupRequest, spanWrapper));
            case TransactionGetResult.IDX_STAGED_CONTENT /* 3 */:
                transactionLogger.logDefer(cleanupRequest.attemptId(), "No docs cleanup possible as txn in state %s, just removing", Event.Severity.DEBUG, cleanupRequest.state());
                return Mono.empty();
            case TransactionGetResult.IDX_ATR_BUCKET_NAME /* 4 */:
            case 5:
            case TransactionGetResult.IDX_RESTORE_FIELDS /* 6 */:
            default:
                transactionLogger.logDefer(cleanupRequest.attemptId(), "No docs cleanup to do as txn in state %s, just removing", Event.Severity.DEBUG, cleanupRequest.state());
                return Mono.empty();
        }
    }

    private Mono<Void> commitDocs(TransactionLogger transactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(transactionLogger, str, list, spanWrapper, true, (reactiveCollection, transactionGetResult, lookupInResult) -> {
            if (!$assertionsDisabled && !transactionGetResult.links().isDocumentInTransaction()) {
                throw new AssertionError();
            }
            if (!$assertionsDisabled && !transactionGetResult.links().stagedContent().isPresent()) {
                throw new AssertionError();
            }
            JsonObject fromJson = JsonObject.fromJson(transactionGetResult.links().stagedContent().get());
            return beforeCommitDoc(transactionGetResult.id()).then(Mono.defer(() -> {
                return lookupInResult.isDeleted() ? reactiveCollection.insert(transactionGetResult.id(), fromJson, OptionsWrapperUtil.wrap(InsertOptions.insertOptions(), this.config, cleanupRequest.durabilityLevel(), reactiveCollection.core(), spanWrapper).clientContext(OptionsWrapperUtil.createClientContext("Cleaner::commitDocs"))) : reactiveCollection.mutateIn(transactionGetResult.id(), Arrays.asList(MutateInSpec.remove(TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY).xattr(), MutateInSpec.replace("", fromJson)), OptionsWrapperUtil.wrap(MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("Cleaner::commitDocs")).cas(transactionGetResult.cas()), this.config, cleanupRequest.durabilityLevel(), reactiveCollection.core(), spanWrapper));
            })).doOnSubscribe(subscription -> {
                transactionLogger.logDefer(str, "removing txn links and writing content to doc %s", Event.Severity.DEBUG, DebugUtil.docId(transactionGetResult));
            }).then();
        });
    }

    private Mono<Void> removeTxnLinks(TransactionLogger transactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(transactionLogger, str, list, spanWrapper, false, (reactiveCollection, transactionGetResult, lookupInResult) -> {
            return beforeRemoveLinks(transactionGetResult.id()).then(reactiveCollection.mutateIn(transactionGetResult.id(), Arrays.asList(MutateInSpec.remove(TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY).xattr()), OptionsWrapperUtil.wrap(MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("Cleaner::removeTxnLinks")).accessDeleted(lookupInResult.isDeleted()).cas(transactionGetResult.cas()), this.config, cleanupRequest.durabilityLevel(), reactiveCollection.core(), spanWrapper))).doOnSubscribe(subscription -> {
                transactionLogger.logDefer(str, "removing txn links from doc %s", Event.Severity.DEBUG, DebugUtil.docId(transactionGetResult));
            }).then();
        });
    }

    private Mono<Void> removeDocsStagedForRemoval(TransactionLogger transactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(transactionLogger, str, list, spanWrapper, true, (reactiveCollection, transactionGetResult, lookupInResult) -> {
            return transactionGetResult.links().isDocumentBeingRemoved() ? beforeRemoveDocStagedForRemoval(transactionGetResult.id()).then(reactiveCollection.remove(transactionGetResult.id(), OptionsWrapperUtil.wrap(RemoveOptions.removeOptions(), this.config, cleanupRequest.durabilityLevel(), reactiveCollection.core(), spanWrapper).cas(transactionGetResult.cas()))).doOnSubscribe(subscription -> {
                transactionLogger.debug(str, "removing doc %s", transactionGetResult.id());
            }).then() : Mono.create(monoSink -> {
                transactionLogger.debug(str, "doc %s does not have expected remove indication, skipping", DebugUtil.docId(transactionGetResult));
                monoSink.success();
            });
        });
    }

    private Mono<Void> removeDocs(TransactionLogger transactionLogger, String str, List<DocRecord> list, CleanupRequest cleanupRequest, SpanWrapper spanWrapper) {
        return doPerDoc(transactionLogger, str, list, spanWrapper, false, (reactiveCollection, transactionGetResult, lookupInResult) -> {
            return beforeRemoveDoc(transactionGetResult.id()).then(Mono.defer(() -> {
                return lookupInResult.isDeleted() ? transactionGetResult.collection().mutateIn(transactionGetResult.id(), Collections.singletonList(MutateInSpec.remove(TransactionFields.TRANSACTION_INTERFACE_PREFIX_ONLY).xattr()), OptionsWrapperUtil.wrap(MutateInOptions.mutateInOptions(), this.config, cleanupRequest.durabilityLevel(), transactionGetResult.collection().core(), spanWrapper).cas(transactionGetResult.cas()).accessDeleted(true)) : reactiveCollection.remove(transactionGetResult.id(), OptionsWrapperUtil.wrap(RemoveOptions.removeOptions(), this.config, cleanupRequest.durabilityLevel(), transactionGetResult.collection().core(), spanWrapper).cas(transactionGetResult.cas()));
            })).doOnSubscribe(subscription -> {
                transactionLogger.debug(str, "removing doc %s", DebugUtil.docId(transactionGetResult));
            }).then();
        });
    }

    private Mono<Void> doPerDoc(TransactionLogger transactionLogger, String str, List<DocRecord> list, SpanWrapper spanWrapper, boolean z, TriFunction<ReactiveCollection, TransactionGetResult, LookupInResult, Mono<Void>> triFunction) {
        return Flux.fromIterable(list).concatMap(docRecord -> {
            return beforeDocGet(docRecord.id()).then(doPerDocGotDoc(transactionLogger, str, spanWrapper, z, triFunction, docRecord, this.clusterData.getBucketFromName(docRecord.bucketName()).scope(docRecord.scopeName()).collection(docRecord.collectionName())));
        }).then();
    }

    private Mono<Void> doPerDocGotDoc(TransactionLogger transactionLogger, String str, SpanWrapper spanWrapper, boolean z, TriFunction<ReactiveCollection, TransactionGetResult, LookupInResult, Mono<Void>> triFunction, DocRecord docRecord, ReactiveCollection reactiveCollection) {
        return DocumentGetter.justGetDoc(reactiveCollection, this.config, docRecord.id(), spanWrapper, getTranscoder(), true, transactionLogger).flatMap(optional -> {
            if (!optional.isPresent()) {
                transactionLogger.debug(str, "could not get doc %s, skipping", DebugUtil.docId(reactiveCollection, docRecord.id()));
                return Mono.empty();
            }
            TransactionGetResult transactionGetResult = (TransactionGetResult) ((Tuple2) optional.get()).getT1();
            LookupInResult lookupInResult = (LookupInResult) ((Tuple2) optional.get()).getT2();
            transactionLogger.debug(str, "handling doc %s with cas %d and links %s, isTombstone=%s", DebugUtil.docId(transactionGetResult), Long.valueOf(transactionGetResult.cas()), transactionGetResult.links(), Boolean.valueOf(lookupInResult.isDeleted()));
            if (!transactionGetResult.links().isDocumentInTransaction()) {
                transactionLogger.debug(str, "no staged content for doc %s, assuming it was committed and skipping", DebugUtil.docId(transactionGetResult));
                return Mono.empty();
            }
            if (!transactionGetResult.links().stagedAttemptId().get().equals(str)) {
                transactionLogger.debug(str, "for doc %s, staged version is for a different attempt %s, skipping", DebugUtil.docId(transactionGetResult), transactionGetResult.links().stagedAttemptId().get());
                return Mono.empty();
            }
            if (z && transactionGetResult.links().crc32OfStaging().isPresent()) {
                Object obj = (String) transactionGetResult.links().crc32OfStaging().get();
                String str2 = transactionGetResult.documentMetadata().get().crc32().get();
                transactionLogger.debug(str, "checking whether document %s has changed since staging, crc32 then %s now %s", DebugUtil.docId(transactionGetResult), obj, str2);
                if (!str2.equals(obj)) {
                    transactionLogger.warn(str, "document %s has changed since staging, ignoring it to avoid data loss", DebugUtil.docId(transactionGetResult));
                    return Mono.empty();
                }
            }
            return (Mono) triFunction.apply(reactiveCollection, transactionGetResult, lookupInResult);
        }).onErrorResume(th -> {
            ErrorClasses classify = ErrorClasses.classify(th);
            transactionLogger.debug(str, "got exception while handling doc %s: %s", DebugUtil.docId(reactiveCollection, docRecord.id()), DebugUtil.dbg(th));
            if (classify != ErrorClasses.FAIL_CAS_MISMATCH) {
                return Mono.error(th);
            }
            transactionLogger.debug(str, "got CAS mismatch while cleaning up doc %s, failing this cleanup attempt (it will be retried)", DebugUtil.docId(reactiveCollection, docRecord.id()));
            return Mono.error(th);
        });
    }

    private RequestTracer tracer() {
        return this.clusterData.cluster().environment().requestTracer();
    }

    public Mono<TransactionCleanupAttempt> cleanupATREntry(ReactiveCollection reactiveCollection, String str, String str2, ATREntry aTREntry, boolean z) {
        return performCleanup(CleanupRequest.fromAtrEntry(reactiveCollection, aTREntry), z, null);
    }

    public Mono<TransactionCleanupAttempt> performCleanup(CleanupRequest cleanupRequest, boolean z, @Nullable SpanWrapper spanWrapper) {
        SpanWrapper attribute = SpanWrapperUtil.createOp(null, tracer(), cleanupRequest.atrCollection(), cleanupRequest.atrId(), "cleanup.req", spanWrapper).attribute("db.couchbase.transactions.cleanup.req.attempt_id", cleanupRequest.attemptId()).attribute("db.couchbase.transactions.cleanup.req.age_ms", Long.valueOf(cleanupRequest.ageMillis())).attribute("db.couchbase.transactions.cleanup.req.durability", cleanupRequest.durabilityLevel()).attribute("db.couchbase.transactions.cleanup.req.state", cleanupRequest.state()).attribute("db.couchbase.transactions.cleanup.req.initiated_from_queue", Boolean.valueOf(z));
        return Mono.defer(() -> {
            ReactiveCollection atrCollection = cleanupRequest.atrCollection();
            String atrId = cleanupRequest.atrId();
            String attemptId = cleanupRequest.attemptId();
            TransactionLogger transactionLogger = new TransactionLogger(this.clusterData.cluster().environment().eventBus(), ATRUtil.getAtrDebug(atrCollection, atrId).toString(), false, Event.Severity.INFO);
            transactionLogger.logDefer(attemptId, "Cleaning up ATR entry (isRegular=%s) %s", Event.Severity.DEBUG, Boolean.valueOf(z), cleanupRequest);
            return ForwardCompatibility.check(ForwardCompatibilityStages.CLEANUP_ENTRY, cleanupRequest.forwardCompatibility(), transactionLogger, Supported.SUPPORTED).then(cleanupDocs(transactionLogger, cleanupRequest, attribute)).doOnSuccess(r3 -> {
                onCleanupDocsCompleted();
            }).then(removeATREntry(cleanupRequest.state(), atrCollection, atrId, attemptId, transactionLogger, attribute, cleanupRequest)).then(Mono.fromCallable(() -> {
                onCleanupCompleted();
                TransactionCleanupAttempt transactionCleanupAttempt = new TransactionCleanupAttempt(Event.Severity.DEBUG, true, z, transactionLogger.logs(), attemptId, atrId, atrCollection, cleanupRequest, "");
                this.eventBus.publish(transactionCleanupAttempt);
                return transactionCleanupAttempt;
            })).onErrorResume(th -> {
                long minutes = TimeUnit.MILLISECONDS.toMinutes(cleanupRequest.ageMillis());
                transactionLogger.logDefer(attemptId, "error while attempting to cleanup ATR entry %s, entry is %d mins old, cleanup will retry later: %s", Event.Severity.WARN, ATRUtil.getAtrDebug(atrCollection, atrId), Long.valueOf(minutes), DebugUtil.dbg(th));
                Event.Severity severity = Event.Severity.DEBUG;
                String str = "";
                if (minutes >= 2880) {
                    severity = Event.Severity.WARN;
                    str = "despite being " + minutes + " mins old which could indicate a serious error - please raise with support.  Diagnostics: ";
                }
                TransactionCleanupAttempt transactionCleanupAttempt = new TransactionCleanupAttempt(severity, false, z, transactionLogger.logs(), attemptId, atrId, atrCollection, cleanupRequest, str);
                this.eventBus.publish(transactionCleanupAttempt);
                return Mono.just(transactionCleanupAttempt);
            }).doOnError(th2 -> {
                attribute.failWith(th2);
            }).doFinally(signalType -> {
                attribute.finish();
            });
        });
    }

    Mono<Object> removeATREntry(AttemptStates attemptStates, ReactiveCollection reactiveCollection, String str, String str2, TransactionLogger transactionLogger, SpanWrapper spanWrapper, CleanupRequest cleanupRequest) {
        ArrayList arrayList = new ArrayList();
        if (attemptStates == AttemptStates.PENDING) {
            arrayList.add(MutateInSpec.insert("attempts." + str2 + "." + TransactionFields.ATR_FIELD_COMMIT_ONLY_IF_NOT_ABORTED, 0).xattr());
        }
        arrayList.add(MutateInSpec.remove("attempts." + str2).xattr());
        return beforeAtrRemove().then(reactiveCollection.mutateIn(str, arrayList, OptionsWrapperUtil.wrap(MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("Cleaner::removeATREntry")), this.config, cleanupRequest.durabilityLevel(), reactiveCollection.core(), spanWrapper))).doOnNext(mutateInResult -> {
            transactionLogger.debug(str2, "successfully removed ATR entry");
        }).onErrorResume(th -> {
            ErrorClasses classify = ErrorClasses.classify(th);
            transactionLogger.debug(str2, "got exception while removing ATR entry %s: %s", str, DebugUtil.dbg(th));
            if (classify == ErrorClasses.FAIL_PATH_NOT_FOUND) {
                transactionLogger.logDefer(str2, "failed to remove %s as entry isn't there, likely due to concurrent cleanup", Event.Severity.DEBUG, ATRUtil.getAtrDebug(reactiveCollection, str));
                return Mono.empty();
            }
            if (classify != ErrorClasses.FAIL_PATH_ALREADY_EXISTS) {
                return Mono.error(th);
            }
            transactionLogger.logDefer(str2, "not removing %s as it has changed from PENDING to COMMITTED", Event.Severity.DEBUG, ATRUtil.getAtrDebug(reactiveCollection, str));
            return Mono.error(th);
        }).map(mutateInResult2 -> {
            return mutateInResult2;
        });
    }

    protected Mono<Integer> beforeCommitDoc(String str) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeDocGet(String str) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeRemoveDocStagedForRemoval(String str) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeRemoveDoc(String str) {
        return Mono.just(1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Mono<Integer> beforeAtrGet(String str) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeAtrRemove() {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeRemoveLinks(String str) {
        return Mono.just(1);
    }

    void onCleanupDocsCompleted() {
    }

    void onCleanupCompleted() {
    }

    static {
        $assertionsDisabled = !Cleaner.class.desiredAssertionStatus();
        TIME_BEFORE_REHANDLING_FAILED_CLEANUP_DEFAULT = Duration.ofSeconds(10L);
    }
}
