package com.couchbase.transactions.cleanup;

import com.couchbase.client.core.annotation.Stability;
import com.couchbase.client.core.retry.reactor.Retry;
import com.couchbase.client.java.ReactiveCollection;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.LookupInOptions;
import com.couchbase.client.java.kv.LookupInResult;
import com.couchbase.client.java.kv.LookupInSpec;
import com.couchbase.client.java.kv.MutateInMacro;
import com.couchbase.client.java.kv.MutateInOptions;
import com.couchbase.client.java.kv.MutateInSpec;
import com.couchbase.client.java.kv.StoreSemantics;
import com.couchbase.transactions.components.ActiveTransactionRecord;
import com.couchbase.transactions.config.TransactionConfig;
import com.couchbase.transactions.error.internal.ErrorClasses;
import com.couchbase.transactions.log.SimpleEventBusLogger;
import com.couchbase.transactions.support.OptionsWrapperUtil;
import com.couchbase.transactions.support.SpanWrapper;
import com.couchbase.transactions.util.DebugUtil;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

@Stability.Internal
/* loaded from: input_file:com/couchbase/transactions/cleanup/ClientRecord.class */
public class ClientRecord {
    private final ClusterData clusterData;
    private final SimpleEventBusLogger LOGGER;
    private final TransactionConfig config;
    private static final String FIELD_HEARTBEAT = "heartbeat_ms";
    private static final String FIELD_EXPIRES = "expires_ms";
    private static final String FIELD_NUM_ATRS = "num_atrs";
    public static final String FIELD_RECORDS = "records";
    public static final String FIELD_CLIENTS = "clients";
    public static final String FIELD_OVERRIDE = "override";
    public static final String FIELD_OVERRIDE_ENABLED = "enabled";
    public static final String FIELD_OVERRIDE_EXPIRES = "expires";
    public static String CLIENT_RECORD_DOC_ID = "_txn:client-record";
    private static int SAFETY_MARGIN_EXPIRY_MSECS = 2000;
    private static final Duration TIMEOUT = Duration.ofMillis(500);
    private static final Duration BACKOFF_START = Duration.of(10, ChronoUnit.MILLIS);
    private static final Duration BACKOFF_END = Duration.of(250, ChronoUnit.MILLIS);

    public ClientRecord(ClusterData clusterData, TransactionConfig transactionConfig) {
        this.LOGGER = new SimpleEventBusLogger(clusterData.cluster().environment().eventBus(), TransactionsCleanup.CATEGORY_CLIENT_RECORD);
        this.clusterData = clusterData;
        this.config = transactionConfig;
    }

    public Flux<Void> removeClientFromAllBuckets(String str) {
        return removeClientFromAllBuckets(str, TIMEOUT);
    }

    public Flux<Void> removeClientFromAllBuckets(String str, Duration duration) {
        return Flux.fromIterable(this.clusterData.bucketNames()).subscribeOn(Schedulers.elastic()).flatMap(str2 -> {
            return this.clusterData.getBucketDefaultCollection(str2).flatMap(reactiveCollection -> {
                return beforeRemoveClient(this).then(reactiveCollection.mutateIn(CLIENT_RECORD_DOC_ID, Arrays.asList(MutateInSpec.remove("records.clients." + str).xattr()), MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::removeClientFromAllBuckets"))));
            }).onErrorResume(th -> {
                switch (ErrorClasses.classify(th)) {
                    case FAIL_DOC_NOT_FOUND:
                        this.LOGGER.debug(String.format("%s/%s remove skipped as client record does not exist", str2, str));
                        return Mono.empty();
                    case FAIL_PATH_NOT_FOUND:
                        this.LOGGER.debug(String.format("%s/%s remove skipped as client record entry does not exist", str2, str));
                        return Mono.empty();
                    default:
                        this.LOGGER.debug(String.format("%s/%s got error while removing client from client record: %s", str2, str, DebugUtil.dbg(th)));
                        return Mono.error(th);
                }
            }).retryWhen(Retry.any().exponentialBackoff(BACKOFF_START, BACKOFF_END).doOnRetry(retryContext -> {
                this.LOGGER.info(String.format("%s/%s retrying removing client from record on error %s", str2, str, DebugUtil.dbg(retryContext.exception())));
            })).timeout(duration).doOnNext(mutateInResult -> {
                this.LOGGER.info(String.format("%s/%s removed from client record", str2, str));
            }).doOnError(th2 -> {
                this.LOGGER.info(String.format("got error while removing client record '%s'", th2));
            }).then();
        });
    }

    private Mono<Void> updateClientRecordCAS(String str, ReactiveCollection reactiveCollection, SpanWrapper spanWrapper, TransactionConfig transactionConfig) {
        return beforeUpdateCAS(this).then(reactiveCollection.mutateIn(CLIENT_RECORD_DOC_ID, Arrays.asList(MutateInSpec.upsert("dummy", (Object) null).xattr()), MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::updateClientRecordCAS")))).then().onErrorResume(th -> {
            this.LOGGER.debug(String.format("got error while updating client record CAS '%s'", th));
            return ErrorClasses.classify(th) == ErrorClasses.FAIL_DOC_NOT_FOUND ? createClientRecord(str, transactionConfig, reactiveCollection, spanWrapper).then() : Mono.error(th);
        });
    }

    public static ClientRecordDetails parseClientRecord(LookupInResult lookupInResult, String str) {
        JsonObject jsonObject = (JsonObject) lookupInResult.contentAs(0, JsonObject.class);
        JsonObject object = jsonObject.getObject(FIELD_CLIENTS);
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        object.getNames().forEach(str2 -> {
            JsonObject jsonObject2 = (JsonObject) object.get(str2);
            if (((((lookupInResult.cas() / 1000000) - ActiveTransactionRecord.parseMutationCAS(jsonObject2.getString(FIELD_HEARTBEAT))) > ((long) jsonObject2.getInt(FIELD_EXPIRES).intValue()) ? 1 : (((lookupInResult.cas() / 1000000) - ActiveTransactionRecord.parseMutationCAS(jsonObject2.getString(FIELD_HEARTBEAT))) == ((long) jsonObject2.getInt(FIELD_EXPIRES).intValue()) ? 0 : -1)) >= 0) && !str2.equals(str)) {
                arrayList.add(str2);
            } else {
                arrayList2.add(str2);
            }
        });
        if (!arrayList2.contains(str)) {
            arrayList2.add(str);
        }
        List list = (List) arrayList2.stream().sorted().collect(Collectors.toList());
        int indexOf = list.indexOf(str);
        int size = arrayList.size();
        int size2 = list.size();
        int i = size + size2;
        boolean containsKey = object.containsKey(str);
        boolean z = false;
        long j = 0;
        JsonObject object2 = jsonObject.getObject(FIELD_OVERRIDE);
        if (object2 != null) {
            z = object2.getBoolean(FIELD_OVERRIDE_ENABLED).booleanValue();
            j = object2.getLong(FIELD_OVERRIDE_EXPIRES).longValue();
        }
        return new ClientRecordDetails(size2, indexOf, !containsKey, arrayList, i, size, z, j, lookupInResult.cas());
    }

    public Mono<ClientRecordDetails> getClientRecord(String str) {
        return this.clusterData.getBucketDefaultCollection(str).flatMap(reactiveCollection -> {
            return reactiveCollection.lookupIn(CLIENT_RECORD_DOC_ID, Arrays.asList(LookupInSpec.get(FIELD_RECORDS).xattr()), LookupInOptions.lookupInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::getClientRecord")));
        }).map(lookupInResult -> {
            return parseClientRecord(lookupInResult, "not_client");
        });
    }

    public Mono<ClientRecordDetails> processClient(String str, ReactiveCollection reactiveCollection, TransactionConfig transactionConfig) {
        SpanWrapper start = SpanWrapper.create(transactionConfig, "transaction_client_record_process").withTag("client_uuid", str).withTag("bucket_name", reactiveCollection.bucketName()).withTag("collection_name", reactiveCollection.name()).start();
        String str2 = reactiveCollection.bucketName() + "/" + reactiveCollection.name() + "/" + str;
        return updateClientRecordCAS(str, reactiveCollection, start, transactionConfig).then(beforeGetRecord(this)).then(reactiveCollection.lookupIn(CLIENT_RECORD_DOC_ID, Arrays.asList(LookupInSpec.get(FIELD_RECORDS).xattr()), LookupInOptions.lookupInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::processClient")))).flatMap(lookupInResult -> {
            ClientRecordDetails parseClientRecord = parseClientRecord(lookupInResult, str);
            SimpleEventBusLogger simpleEventBusLogger = this.LOGGER;
            Object[] objArr = new Object[10];
            objArr[0] = str2;
            objArr[1] = Integer.valueOf(parseClientRecord.numExistingClients());
            objArr[2] = Integer.valueOf(parseClientRecord.numActiveClients());
            objArr[3] = Integer.valueOf(parseClientRecord.numExpiredClients());
            objArr[4] = Boolean.valueOf(!parseClientRecord.clientIsNew());
            objArr[5] = Integer.valueOf(parseClientRecord.indexOfThisClient());
            objArr[6] = Boolean.valueOf(parseClientRecord.overrideEnabled());
            objArr[7] = Long.valueOf(parseClientRecord.overrideExpires());
            objArr[8] = Long.valueOf(parseClientRecord.casNow());
            objArr[9] = Boolean.valueOf(parseClientRecord.overrideActive());
            simpleEventBusLogger.debug(String.format("%s found %d existing clients including this (%s active, %d expired), included this=%s, index of this=%d, override={enabled=%s,expires=%d,now=%d,active=%s}", objArr));
            if (parseClientRecord.overrideActive()) {
                return Mono.just(parseClientRecord);
            }
            ArrayList arrayList = new ArrayList();
            String str3 = "records.clients." + str + ".";
            arrayList.add(MutateInSpec.upsert(str3 + FIELD_HEARTBEAT, MutateInMacro.CAS).createPath());
            arrayList.add(MutateInSpec.upsert(str3 + FIELD_EXPIRES, Long.valueOf(transactionConfig.cleanupWindow().toMillis() + SAFETY_MARGIN_EXPIRY_MSECS)).xattr().createPath());
            arrayList.add(MutateInSpec.upsert(str3 + FIELD_NUM_ATRS, Integer.valueOf(transactionConfig.numAtrs())).xattr());
            parseClientRecord.expiredClientIds().stream().limit(14L).forEach(str4 -> {
                this.LOGGER.debug(String.format("%s removing expired client %s", str2, str4));
                arrayList.add(MutateInSpec.remove("records.clients." + str4).xattr());
            });
            return beforeUpdateRecord(this).then(reactiveCollection.mutateIn(CLIENT_RECORD_DOC_ID, arrayList, MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::processClient")))).thenReturn(parseClientRecord);
        }).onErrorResume(th -> {
            ErrorClasses classify = ErrorClasses.classify(th);
            this.LOGGER.debug(String.format("%s got error processing client record: %s", str2, DebugUtil.dbg(th)));
            return classify == ErrorClasses.FAIL_DOC_NOT_FOUND ? createClientRecord(str, transactionConfig, reactiveCollection, start) : Mono.error(th);
        }).retryWhen(Retry.any().exponentialBackoff(BACKOFF_START, BACKOFF_END).doOnRetry(retryContext -> {
            this.LOGGER.info(String.format("%s retrying processing client record on error '%s'", str, retryContext.exception()));
        })).timeout(TIMEOUT);
    }

    private Mono<ClientRecordDetails> createClientRecord(String str, TransactionConfig transactionConfig, ReactiveCollection reactiveCollection, SpanWrapper spanWrapper) {
        String str2 = reactiveCollection.bucketName() + "/" + reactiveCollection.name() + "/" + str;
        return beforeCreateRecord(this).then(reactiveCollection.mutateIn(CLIENT_RECORD_DOC_ID, Arrays.asList(MutateInSpec.insert("records.clients", JsonObject.create()).xattr()), OptionsWrapperUtil.wrap(MutateInOptions.mutateInOptions().clientContext(OptionsWrapperUtil.createClientContext("ClientRecord::createClientRecord")).storeSemantics(StoreSemantics.INSERT), spanWrapper, transactionConfig, reactiveCollection.core()))).doOnSubscribe(subscription -> {
            this.LOGGER.debug(String.format("%s found client record does not exist, creating and retrying", str2));
        }).onErrorResume(th -> {
            this.LOGGER.info(String.format("got error while creating client record '%s'", th));
            if (ErrorClasses.FAIL_DOC_ALREADY_EXISTS != ErrorClasses.classify(th)) {
                return Mono.error(th);
            }
            this.LOGGER.debug(String.format("%s found client record exists after retry, another client must have created it, continuing", str2));
            return Mono.empty();
        }).then(processClient(str, reactiveCollection, transactionConfig));
    }

    protected Mono<Integer> beforeCreateRecord(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeRemoveClient(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeUpdateCAS(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeGetRecord(ClientRecord clientRecord) {
        return Mono.just(1);
    }

    protected Mono<Integer> beforeUpdateRecord(ClientRecord clientRecord) {
        return Mono.just(1);
    }
}
