/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.spanner.r2dbc.client;

import com.google.auth.Credentials;
import com.google.auth.oauth2.OAuth2Credentials;
import com.google.cloud.spanner.r2dbc.StatementExecutionContext;
import com.google.cloud.spanner.r2dbc.client.Client;
import com.google.cloud.spanner.r2dbc.util.Assert;
import com.google.cloud.spanner.r2dbc.util.ObservableReactiveUtil;
import com.google.cloud.spanner.r2dbc.util.SpannerExceptionUtil;
import com.google.common.annotations.VisibleForTesting;
import com.google.longrunning.GetOperationRequest;
import com.google.longrunning.Operation;
import com.google.longrunning.OperationsGrpc;
import com.google.protobuf.Struct;
import com.google.spanner.admin.database.v1.DatabaseAdminGrpc;
import com.google.spanner.admin.database.v1.UpdateDatabaseDdlRequest;
import com.google.spanner.v1.BeginTransactionRequest;
import com.google.spanner.v1.CommitRequest;
import com.google.spanner.v1.CommitResponse;
import com.google.spanner.v1.CreateSessionRequest;
import com.google.spanner.v1.DeleteSessionRequest;
import com.google.spanner.v1.ExecuteBatchDmlRequest;
import com.google.spanner.v1.ExecuteSqlRequest;
import com.google.spanner.v1.PartialResultSet;
import com.google.spanner.v1.ResultSet;
import com.google.spanner.v1.RollbackRequest;
import com.google.spanner.v1.Session;
import com.google.spanner.v1.SpannerGrpc;
import com.google.spanner.v1.Transaction;
import com.google.spanner.v1.TransactionOptions;
import com.google.spanner.v1.TransactionSelector;
import com.google.spanner.v1.Type;
import io.grpc.CallCredentials;
import io.grpc.Channel;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Status;
import io.grpc.auth.MoreCallCredentials;
import io.r2dbc.spi.R2dbcException;
import io.r2dbc.spi.R2dbcNonTransientResourceException;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class GrpcClient
implements Client {
    private static final TransactionOptions READ_WRITE_TRANSACTION = TransactionOptions.newBuilder().setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()).build();
    private static final String GRPC_TARGET = "dns:///spanner.googleapis.com:443";
    private static final String PACKAGE_VERSION = GrpcClient.class.getPackage().getImplementationVersion();
    private static final String USER_AGENT_LIBRARY_NAME = "cloud-spanner-r2dbc";
    private static final String HEALTHCHECK_SQL = "SELECT 1";
    private static final Logger LOGGER = LoggerFactory.getLogger(GrpcClient.class);
    private static final String SESSION_NAME_MUST_NOT_BE_NULL = "Session name must not be null";
    private final ManagedChannel channel;
    private final SpannerGrpc.SpannerStub spanner;
    private final DatabaseAdminGrpc.DatabaseAdminStub databaseAdmin;
    private final OperationsGrpc.OperationsStub operations;

    public GrpcClient(OAuth2Credentials credentials) {
        CallCredentials callCredentials = MoreCallCredentials.from((Credentials)credentials);
        this.channel = ManagedChannelBuilder.forTarget((String)GRPC_TARGET).userAgent("cloud-spanner-r2dbc/" + PACKAGE_VERSION).build();
        this.spanner = (SpannerGrpc.SpannerStub)SpannerGrpc.newStub((Channel)this.channel).withCallCredentials(callCredentials);
        this.databaseAdmin = (DatabaseAdminGrpc.DatabaseAdminStub)DatabaseAdminGrpc.newStub((Channel)this.channel).withCallCredentials(callCredentials);
        this.operations = (OperationsGrpc.OperationsStub)OperationsGrpc.newStub((Channel)this.channel).withCallCredentials(callCredentials);
    }

    @VisibleForTesting
    GrpcClient(SpannerGrpc.SpannerStub spanner, DatabaseAdminGrpc.DatabaseAdminStub databaseAdmin, OperationsGrpc.OperationsStub operations) {
        this.spanner = spanner;
        this.databaseAdmin = databaseAdmin;
        this.operations = operations;
        this.channel = null;
    }

    @Override
    public Mono<Transaction> beginTransaction(String sessionName, TransactionOptions transactionOptions) {
        return Mono.defer(() -> {
            Assert.requireNonNull(sessionName, SESSION_NAME_MUST_NOT_BE_NULL);
            BeginTransactionRequest beginTransactionRequest = BeginTransactionRequest.newBuilder().setSession(sessionName).setOptions(transactionOptions).build();
            return ObservableReactiveUtil.unaryCall(obs -> this.spanner.beginTransaction(beginTransactionRequest, obs));
        });
    }

    @Override
    public Mono<CommitResponse> commitTransaction(String sessionName, Transaction transaction) {
        return Mono.defer(() -> {
            Assert.requireNonNull(sessionName, SESSION_NAME_MUST_NOT_BE_NULL);
            Assert.requireNonEmpty(transaction.getId(), "Transaction ID must not be empty");
            CommitRequest commitRequest = CommitRequest.newBuilder().setSession(sessionName).setTransactionId(transaction.getId()).build();
            return ObservableReactiveUtil.unaryCall(obs -> this.spanner.commit(commitRequest, obs));
        });
    }

    @Override
    public Mono<Void> rollbackTransaction(String sessionName, Transaction transaction) {
        return Mono.defer(() -> {
            Assert.requireNonNull(sessionName, SESSION_NAME_MUST_NOT_BE_NULL);
            Assert.requireNonEmpty(transaction.getId(), "Transaction ID must not be empty");
            RollbackRequest rollbackRequest = RollbackRequest.newBuilder().setSession(sessionName).setTransactionId(transaction.getId()).build();
            return ObservableReactiveUtil.unaryCall(obs -> this.spanner.rollback(rollbackRequest, obs)).then();
        });
    }

    @Override
    public Mono<Session> createSession(String databaseName) {
        return Mono.defer(() -> {
            Assert.requireNonEmpty(databaseName, "Database name must not be empty");
            CreateSessionRequest request = CreateSessionRequest.newBuilder().setDatabase(databaseName).build();
            return ObservableReactiveUtil.unaryCall(obs -> this.spanner.createSession(request, obs));
        });
    }

    @Override
    public Mono<Void> deleteSession(String sessionName) {
        return Mono.defer(() -> {
            Assert.requireNonNull(sessionName, SESSION_NAME_MUST_NOT_BE_NULL);
            DeleteSessionRequest deleteSessionRequest = DeleteSessionRequest.newBuilder().setName(sessionName).build();
            return ObservableReactiveUtil.unaryCall(observer -> this.spanner.deleteSession(deleteSessionRequest, observer)).then();
        });
    }

    @Override
    public Flux<ResultSet> executeBatchDml(StatementExecutionContext ctx, List<ExecuteBatchDmlRequest.Statement> statements) {
        return Mono.defer(() -> {
            ExecuteBatchDmlRequest.Builder request = ExecuteBatchDmlRequest.newBuilder().setSession(ctx.getSessionName()).addAllStatements((Iterable)statements).setSeqno(ctx.nextSeqNum());
            if (ctx.getTransactionId() != null) {
                request.setTransaction(TransactionSelector.newBuilder().setId(ctx.getTransactionId()));
                return ObservableReactiveUtil.unaryCall(obs -> this.spanner.executeBatchDml(request.build(), obs));
            }
            request.setTransaction(TransactionSelector.newBuilder().setBegin(READ_WRITE_TRANSACTION));
            return ObservableReactiveUtil.unaryCall(obs -> this.spanner.executeBatchDml(request.build(), obs)).delayUntil(response -> {
                if (response.getResultSetsList().size() > 0) {
                    Transaction transaction = response.getResultSets(0).getMetadata().getTransaction();
                    return this.commitTransaction(ctx.getSessionName(), transaction);
                }
                return Mono.empty();
            });
        }).flatMapMany(response -> {
            Flux results = Flux.fromIterable((Iterable)response.getResultSetsList());
            if (response.hasStatus() && response.getStatus().getCode() != Status.Code.OK.value()) {
                R2dbcException exception = SpannerExceptionUtil.createR2dbcException(response.getStatus().getCode(), response.getStatus().getMessage());
                results = results.concatWith((Publisher)Mono.error((Throwable)exception));
            }
            return results;
        });
    }

    @Override
    public Flux<PartialResultSet> executeStreamingSql(StatementExecutionContext ctx, String sql, Struct params, Map<String, Type> types) {
        return Flux.defer(() -> {
            Assert.requireNonNull(ctx.getSessionName(), SESSION_NAME_MUST_NOT_BE_NULL);
            ExecuteSqlRequest.Builder executeSqlRequest = this.buildSqlRequest(ctx, sql);
            if (params != null) {
                executeSqlRequest.setParams(params).putAllParamTypes(types);
            }
            if (ctx.getTransactionId() != null) {
                executeSqlRequest.setTransaction(TransactionSelector.newBuilder().setId(ctx.getTransactionId()).build());
                executeSqlRequest.setSeqno(ctx.nextSeqNum());
            }
            return ObservableReactiveUtil.streamingCall(obs -> this.spanner.executeStreamingSql(executeSqlRequest.build(), obs));
        });
    }

    @Override
    public Mono<Operation> executeDdl(String fullyQualifiedDatabaseName, List<String> ddlStatements, Duration ddlOperationTimeout, Duration ddlPollInterval) {
        UpdateDatabaseDdlRequest ddlRequest = UpdateDatabaseDdlRequest.newBuilder().setDatabase(fullyQualifiedDatabaseName).addAllStatements(ddlStatements).build();
        Mono ddlResponse = ObservableReactiveUtil.unaryCall(obs -> this.databaseAdmin.updateDatabaseDdl(ddlRequest, obs));
        return ddlResponse.flatMap(ddlOperation -> {
            GetOperationRequest getRequest = GetOperationRequest.newBuilder().setName(ddlOperation.getName()).build();
            return ObservableReactiveUtil.unaryCall(obs -> this.operations.getOperation(getRequest, obs)).repeatWhen(completed -> completed.delayElements(ddlPollInterval)).takeUntil(Operation::getDone).last().timeout(ddlOperationTimeout).handle((operation, sink) -> {
                if (operation.hasError()) {
                    sink.error((Throwable)new R2dbcNonTransientResourceException(operation.getError().getMessage()));
                } else {
                    sink.next(operation);
                }
            });
        });
    }

    @Override
    public Mono<Void> close() {
        return Mono.fromRunnable(() -> {
            if (this.channel != null) {
                this.channel.shutdownNow();
            }
        });
    }

    @Override
    public Mono<Boolean> healthcheck(StatementExecutionContext ctx) {
        return Mono.defer(() -> {
            if (ctx.getSessionName() == null) {
                return Mono.just((Object)false);
            }
            return ObservableReactiveUtil.unaryCall(obs -> this.spanner.executeSql(this.buildSqlRequest(ctx, HEALTHCHECK_SQL).build(), obs)).map(rs -> Boolean.TRUE).onErrorResume(error -> {
                LOGGER.warn("Cloud Spanner healthcheck failed", error);
                return Mono.just((Object)Boolean.FALSE);
            });
        });
    }

    private ExecuteSqlRequest.Builder buildSqlRequest(StatementExecutionContext ctx, String sql) {
        return ExecuteSqlRequest.newBuilder().setSql(sql).setSession(ctx.getSessionName());
    }

    @VisibleForTesting
    public SpannerGrpc.SpannerStub getSpanner() {
        return this.spanner;
    }
}

