package org.neo4j.driver.internal.reactive;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.neo4j.driver.AccessMode;
import org.neo4j.driver.Bookmark;
import org.neo4j.driver.Query;
import org.neo4j.driver.TransactionConfig;
import org.neo4j.driver.exceptions.ClientException;
import org.neo4j.driver.exceptions.TransactionNestingException;
import org.neo4j.driver.internal.async.NetworkSession;
import org.neo4j.driver.internal.async.UnmanagedTransaction;
import org.neo4j.driver.internal.cursor.RxResultCursor;
import org.neo4j.driver.internal.telemetry.ApiTelemetryWork;
import org.neo4j.driver.internal.telemetry.TelemetryApi;
import org.neo4j.driver.internal.util.Futures;
import org.neo4j.driver.reactive.RxResult;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:org/neo4j/driver/internal/reactive/AbstractReactiveSession.class */
public abstract class AbstractReactiveSession<S> {
    protected final NetworkSession session;

    public AbstractReactiveSession(NetworkSession networkSession) {
        this.session = networkSession;
    }

    protected abstract S createTransaction(UnmanagedTransaction unmanagedTransaction);

    protected abstract Publisher<Void> closeTransaction(S s, boolean z);

    /* JADX INFO: Access modifiers changed from: package-private */
    public Publisher<S> doBeginTransaction(TransactionConfig transactionConfig, ApiTelemetryWork apiTelemetryWork) {
        return doBeginTransaction(transactionConfig, null, apiTelemetryWork);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Publisher<S> doBeginTransaction(TransactionConfig transactionConfig, String str, ApiTelemetryWork apiTelemetryWork) {
        return RxUtils.createSingleItemPublisher(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.session.beginTransactionAsync(transactionConfig, str, apiTelemetryWork).whenComplete((unmanagedTransaction, th) -> {
                if (unmanagedTransaction != null) {
                    completableFuture.complete(createTransaction(unmanagedTransaction));
                } else {
                    releaseConnectionBeforeReturning(completableFuture, th);
                }
            });
            return completableFuture;
        }, () -> {
            return new IllegalStateException("Unexpected condition, begin transaction call has completed successfully with transaction being null");
        }, obj -> {
            Mono.fromDirect(closeTransaction(obj, false)).subscribe();
        });
    }

    private Publisher<S> beginTransaction(AccessMode accessMode, TransactionConfig transactionConfig, ApiTelemetryWork apiTelemetryWork) {
        return RxUtils.createSingleItemPublisher(() -> {
            CompletableFuture completableFuture = new CompletableFuture();
            this.session.beginTransactionAsync(accessMode, transactionConfig, apiTelemetryWork).whenComplete((unmanagedTransaction, th) -> {
                if (unmanagedTransaction != null) {
                    completableFuture.complete(createTransaction(unmanagedTransaction));
                } else {
                    releaseConnectionBeforeReturning(completableFuture, th);
                }
            });
            return completableFuture;
        }, () -> {
            return new IllegalStateException("Unexpected condition, begin transaction call has completed successfully with transaction being null");
        }, obj -> {
            Mono.fromDirect(closeTransaction(obj, false)).subscribe();
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Publisher<T> runTransaction(AccessMode accessMode, Function<S, ? extends Publisher<T>> function, TransactionConfig transactionConfig) {
        return this.session.retryLogic().retryRx(Flux.usingWhen(beginTransaction(accessMode, transactionConfig, new ApiTelemetryWork(TelemetryApi.MANAGED_TRANSACTION)), function.andThen(publisher -> {
            return Flux.from(publisher).handle((obj, synchronousSink) -> {
                if (obj instanceof ReactiveResult) {
                    synchronousSink.error(new ClientException(String.format("%s is not a valid return value, it should be consumed before producing a return value", ReactiveResult.class.getName())));
                    return;
                }
                if (obj instanceof org.neo4j.driver.reactive.ReactiveResult) {
                    synchronousSink.error(new ClientException(String.format("%s is not a valid return value, it should be consumed before producing a return value", org.neo4j.driver.reactive.ReactiveResult.class.getName())));
                } else if (obj instanceof RxResult) {
                    synchronousSink.error(new ClientException(String.format("%s is not a valid return value, it should be consumed before producing a return value", RxResult.class.getName())));
                } else {
                    synchronousSink.next(obj);
                }
            });
        }), obj -> {
            return closeTransaction(obj, true);
        }, (obj2, th) -> {
            return closeTransaction(obj2, false);
        }, obj3 -> {
            return closeTransaction(obj3, false);
        }));
    }

    private <T> void releaseConnectionBeforeReturning(CompletableFuture<T> completableFuture, Throwable th) {
        Throwable completionExceptionCause = Futures.completionExceptionCause(th);
        if (completionExceptionCause instanceof TransactionNestingException) {
            completableFuture.completeExceptionally(completionExceptionCause);
        } else {
            this.session.releaseConnectionAsync().whenComplete((r6, th2) -> {
                completableFuture.completeExceptionally(Futures.combineErrors(completionExceptionCause, th2));
            });
        }
    }

    public Set<Bookmark> lastBookmarks() {
        return this.session.lastBookmarks();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Publisher<T> run(Query query, TransactionConfig transactionConfig, Function<RxResultCursor, T> function) {
        CompletableFuture completableFuture = new CompletableFuture();
        AtomicReference atomicReference = new AtomicReference();
        Mono<T> doOnNext = RxUtils.createSingleItemPublisher(() -> {
            return runAsStage(query, transactionConfig, completableFuture).thenApply(rxResultCursor -> {
                atomicReference.set(rxResultCursor);
                return rxResultCursor;
            }).thenApply(function);
        }, () -> {
            return new IllegalStateException("Unexpected condition, run call has completed successfully with result being null");
        }, obj -> {
            if (obj != null) {
                ((RxResultCursor) atomicReference.get()).rollback().whenComplete((r4, th) -> {
                    if (th != null) {
                        completableFuture.completeExceptionally(th);
                    } else {
                        completableFuture.complete(null);
                    }
                });
            }
        }).doOnNext(obj2 -> {
            completableFuture.complete((RxResultCursor) atomicReference.get());
        });
        Objects.requireNonNull(completableFuture);
        return doOnNext.doOnError(completableFuture::completeExceptionally);
    }

    /* JADX WARN: Multi-variable type inference failed */
    private CompletionStage<RxResultCursor> runAsStage(Query query, TransactionConfig transactionConfig, CompletionStage<RxResultCursor> completionStage) {
        CompletionStage failedFuture;
        try {
            failedFuture = this.session.runRx(query, transactionConfig, completionStage);
        } catch (Throwable th) {
            failedFuture = Futures.failedFuture(th);
        }
        return failedFuture.handle((rxResultCursor, th2) -> {
            if (th2 != null) {
                return releaseConnectionAndRethrow(th2);
            }
            Throwable runError = rxResultCursor.getRunError();
            return runError != null ? releaseConnectionAndRethrow(runError) : CompletableFuture.completedFuture(rxResultCursor);
        }).thenCompose(Function.identity());
    }

    private <T> CompletionStage<T> releaseConnectionAndRethrow(Throwable th) {
        return (CompletionStage<T>) this.session.releaseConnectionAsync().handle((r5, th2) -> {
            if (th2 != null) {
                throw Futures.combineErrors(th, th2);
            }
            if (th instanceof RuntimeException) {
                throw ((RuntimeException) th);
            }
            throw new CompletionException(th);
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> Publisher<T> doClose() {
        NetworkSession networkSession = this.session;
        Objects.requireNonNull(networkSession);
        return RxUtils.createEmptyPublisher(networkSession::closeAsync);
    }
}
