package com.google.cloud.spanner.r2dbc.client;

import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.spanner.r2dbc.StatementExecutionContext;
import com.google.cloud.spanner.r2dbc.util.Assert;
import com.google.cloud.spanner.r2dbc.util.ObservableReactiveUtil;
import com.google.cloud.spanner.r2dbc.util.SpannerExceptionUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsGrpc;
import com.google.protobuf.Struct;
import com.google.spanner.admin.database.v1.DatabaseAdminGrpc;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import com.google.spanner.v1.Type;
import io.grpc.CallCredentials;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/google/cloud/spanner/r2dbc/client/GrpcClient.class */
public class GrpcClient implements Client {
    private static final String GRPC_TARGET = "dns:///spanner.googleapis.com:443";
    private static final String USER_AGENT_LIBRARY_NAME = "cloud-spanner-r2dbc";
    private static final String HEALTHCHECK_SQL = "SELECT 1";
    private static final String SESSION_NAME_MUST_NOT_BE_NULL = "Session name must not be null";
    private final ManagedChannel channel;
    private final SpannerGrpc.SpannerStub spanner;
    private final DatabaseAdminGrpc.DatabaseAdminStub databaseAdmin;
    private final OperationsGrpc.OperationsStub operations;
    private static final TransactionOptions READ_WRITE_TRANSACTION = TransactionOptions.newBuilder().setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()).build();
    private static final String PACKAGE_VERSION = GrpcClient.class.getPackage().getImplementationVersion();
    private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);

    public GrpcClient(OAuth2Credentials oAuth2Credentials) {
        CallCredentials from = MoreCallCredentials.from(oAuth2Credentials);
        this.channel = ManagedChannelBuilder.forTarget(GRPC_TARGET).userAgent("cloud-spanner-r2dbc/" + PACKAGE_VERSION).build();
        this.spanner = SpannerGrpc.newStub(this.channel).withCallCredentials(from);
        this.databaseAdmin = DatabaseAdminGrpc.newStub(this.channel).withCallCredentials(from);
        this.operations = OperationsGrpc.newStub(this.channel).withCallCredentials(from);
    }

    @VisibleForTesting
    GrpcClient(SpannerGrpc.SpannerStub spannerStub, DatabaseAdminGrpc.DatabaseAdminStub databaseAdminStub, OperationsGrpc.OperationsStub operationsStub) {
        this.spanner = spannerStub;
        this.databaseAdmin = databaseAdminStub;
        this.operations = operationsStub;
        this.channel = null;
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Transaction> beginTransaction(String str, TransactionOptions transactionOptions) {
        return Mono.defer(() -> {
            Assert.requireNonNull(str, SESSION_NAME_MUST_NOT_BE_NULL);
            BeginTransactionRequest build = BeginTransactionRequest.newBuilder().setSession(str).setOptions(transactionOptions).build();
            return ObservableReactiveUtil.unaryCall(streamObserver -> {
                this.spanner.beginTransaction(build, streamObserver);
            });
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<CommitResponse> commitTransaction(String str, Transaction transaction) {
        return Mono.defer(() -> {
            Assert.requireNonNull(str, SESSION_NAME_MUST_NOT_BE_NULL);
            Assert.requireNonEmpty(transaction.getId(), "Transaction ID must not be empty");
            CommitRequest build = CommitRequest.newBuilder().setSession(str).setTransactionId(transaction.getId()).build();
            return ObservableReactiveUtil.unaryCall(streamObserver -> {
                this.spanner.commit(build, streamObserver);
            });
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Void> rollbackTransaction(String str, Transaction transaction) {
        return Mono.defer(() -> {
            Assert.requireNonNull(str, SESSION_NAME_MUST_NOT_BE_NULL);
            Assert.requireNonEmpty(transaction.getId(), "Transaction ID must not be empty");
            RollbackRequest build = RollbackRequest.newBuilder().setSession(str).setTransactionId(transaction.getId()).build();
            return ObservableReactiveUtil.unaryCall(streamObserver -> {
                this.spanner.rollback(build, streamObserver);
            }).then();
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Session> createSession(String str) {
        return Mono.defer(() -> {
            Assert.requireNonEmpty(str, "Database name must not be empty");
            CreateSessionRequest build = CreateSessionRequest.newBuilder().setDatabase(str).build();
            return ObservableReactiveUtil.unaryCall(streamObserver -> {
                this.spanner.createSession(build, streamObserver);
            });
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Void> deleteSession(String str) {
        return Mono.defer(() -> {
            Assert.requireNonNull(str, SESSION_NAME_MUST_NOT_BE_NULL);
            DeleteSessionRequest build = DeleteSessionRequest.newBuilder().setName(str).build();
            return ObservableReactiveUtil.unaryCall(streamObserver -> {
                this.spanner.deleteSession(build, streamObserver);
            }).then();
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Flux<ResultSet> executeBatchDml(StatementExecutionContext statementExecutionContext, List<ExecuteBatchDmlRequest.Statement> list) {
        return Mono.defer(() -> {
            ExecuteBatchDmlRequest.Builder seqno = ExecuteBatchDmlRequest.newBuilder().setSession(statementExecutionContext.getSessionName()).addAllStatements(list).setSeqno(statementExecutionContext.nextSeqNum());
            if (statementExecutionContext.getTransactionId() != null) {
                seqno.setTransaction(TransactionSelector.newBuilder().setId(statementExecutionContext.getTransactionId()));
                return ObservableReactiveUtil.unaryCall(streamObserver -> {
                    this.spanner.executeBatchDml(seqno.build(), streamObserver);
                });
            }
            seqno.setTransaction(TransactionSelector.newBuilder().setBegin(READ_WRITE_TRANSACTION));
            return ObservableReactiveUtil.unaryCall(streamObserver2 -> {
                this.spanner.executeBatchDml(seqno.build(), streamObserver2);
            }).delayUntil(executeBatchDmlResponse -> {
                if (executeBatchDmlResponse.getResultSetsList().size() <= 0) {
                    return Mono.empty();
                }
                return commitTransaction(statementExecutionContext.getSessionName(), executeBatchDmlResponse.getResultSets(0).getMetadata().getTransaction());
            });
        }).flatMapMany(executeBatchDmlResponse -> {
            Flux fromIterable = Flux.fromIterable(executeBatchDmlResponse.getResultSetsList());
            if (executeBatchDmlResponse.hasStatus() && executeBatchDmlResponse.getStatus().getCode() != Status.Code.OK.value()) {
                fromIterable = fromIterable.concatWith(Mono.error(SpannerExceptionUtil.createR2dbcException(executeBatchDmlResponse.getStatus().getCode(), executeBatchDmlResponse.getStatus().getMessage())));
            }
            return fromIterable;
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Flux<PartialResultSet> executeStreamingSql(StatementExecutionContext statementExecutionContext, String str, Struct struct, Map<String, Type> map) {
        return Flux.defer(() -> {
            Assert.requireNonNull(statementExecutionContext.getSessionName(), SESSION_NAME_MUST_NOT_BE_NULL);
            ExecuteSqlRequest.Builder buildSqlRequest = buildSqlRequest(statementExecutionContext, str);
            if (struct != null) {
                buildSqlRequest.setParams(struct).putAllParamTypes(map);
            }
            if (statementExecutionContext.getTransactionId() != null) {
                buildSqlRequest.setTransaction(TransactionSelector.newBuilder().setId(statementExecutionContext.getTransactionId()).build());
                buildSqlRequest.setSeqno(statementExecutionContext.nextSeqNum());
            }
            return ObservableReactiveUtil.streamingCall(streamObserver -> {
                this.spanner.executeStreamingSql(buildSqlRequest.build(), streamObserver);
            });
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Operation> executeDdl(String str, List<String> list, Duration duration, Duration duration2) {
        UpdateDatabaseDdlRequest build = UpdateDatabaseDdlRequest.newBuilder().setDatabase(str).addAllStatements(list).build();
        return ObservableReactiveUtil.unaryCall(streamObserver -> {
            this.databaseAdmin.updateDatabaseDdl(build, streamObserver);
        }).flatMap(operation -> {
            GetOperationRequest build2 = GetOperationRequest.newBuilder().setName(operation.getName()).build();
            return ObservableReactiveUtil.unaryCall(streamObserver2 -> {
                this.operations.getOperation(build2, streamObserver2);
            }).repeatWhen(flux -> {
                return flux.delayElements(duration2);
            }).takeUntil((v0) -> {
                return v0.getDone();
            }).last().timeout(duration).handle((operation, synchronousSink) -> {
                if (operation.hasError()) {
                    synchronousSink.error(new R2dbcNonTransientResourceException(operation.getError().getMessage()));
                } else {
                    synchronousSink.next(operation);
                }
            });
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Void> close() {
        return Mono.fromRunnable(() -> {
            if (this.channel != null) {
                this.channel.shutdownNow();
            }
        });
    }

    @Override // com.google.cloud.spanner.r2dbc.client.Client
    public Mono<Boolean> healthcheck(StatementExecutionContext statementExecutionContext) {
        return Mono.defer(() -> {
            return statementExecutionContext.getSessionName() == null ? Mono.just(false) : ObservableReactiveUtil.unaryCall(streamObserver -> {
                this.spanner.executeSql(buildSqlRequest(statementExecutionContext, HEALTHCHECK_SQL).build(), streamObserver);
            }).map(resultSet -> {
                return Boolean.TRUE;
            }).onErrorResume(th -> {
                LOGGER.warn("Cloud Spanner healthcheck failed", th);
                return Mono.just(Boolean.FALSE);
            });
        });
    }

    private ExecuteSqlRequest.Builder buildSqlRequest(StatementExecutionContext statementExecutionContext, String str) {
        return ExecuteSqlRequest.newBuilder().setSql(str).setSession(statementExecutionContext.getSessionName());
    }

    @VisibleForTesting
    public SpannerGrpc.SpannerStub getSpanner() {
        return this.spanner;
    }
}
