/*
 * Decompiled with CFR 0.152.
 */
package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.HeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.bigquery.storage.util.Errors;
import com.google.cloud.bigquery.storage.v1.AppendRowsRequest;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.ProtoSchema;
import com.google.cloud.bigquery.storage.v1.StreamConnection;
import com.google.cloud.bigquery.storage.v1.TableSchema;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.Int64Value;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

public class StreamWriter
implements AutoCloseable {
    private static final Logger log = Logger.getLogger(StreamWriter.class.getName());
    private Lock lock;
    private Condition hasMessageInWaitingQueue;
    private Condition inflightReduced;
    private final String streamName;
    private final ProtoSchema writerSchema;
    private final long maxInflightRequests;
    private final long maxInflightBytes;
    private final FlowController.LimitExceededBehavior limitExceededBehavior;
    private final String traceId;
    @GuardedBy(value="lock")
    private long inflightRequests = 0L;
    @GuardedBy(value="lock")
    private long inflightBytes = 0L;
    @GuardedBy(value="lock")
    private long conectionRetryCountWithoutCallback = 0L;
    @GuardedBy(value="lock")
    private boolean streamConnectionIsConnected = false;
    @GuardedBy(value="lock")
    private boolean inflightCleanuped = false;
    @GuardedBy(value="lock")
    private boolean userClosed = false;
    @GuardedBy(value="lock")
    private Throwable connectionFinalStatus = null;
    @GuardedBy(value="lock")
    private final Deque<AppendRequestAndResponse> waitingRequestQueue;
    @GuardedBy(value="lock")
    private final Deque<AppendRequestAndResponse> inflightRequestQueue;
    @GuardedBy(value="lock")
    private TableSchema updatedSchema;
    private BigQueryWriteClient client;
    private boolean ownsBigQueryWriteClient = false;
    private StreamConnection streamConnection;
    private Thread appendThread;
    private final AtomicLong inflightWaitSec = new AtomicLong(0L);

    public static long getApiMaxRequestBytes() {
        return 10000000L;
    }

    private StreamWriter(Builder builder) throws IOException {
        this.lock = new ReentrantLock();
        this.hasMessageInWaitingQueue = this.lock.newCondition();
        this.inflightReduced = this.lock.newCondition();
        this.streamName = builder.streamName;
        if (builder.writerSchema == null) {
            throw new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.INVALID_ARGUMENT).withDescription("Writer schema must be provided when building this writer."));
        }
        this.writerSchema = builder.writerSchema;
        this.maxInflightRequests = builder.maxInflightRequest;
        this.maxInflightBytes = builder.maxInflightBytes;
        this.limitExceededBehavior = builder.limitExceededBehavior;
        this.traceId = builder.traceId;
        this.waitingRequestQueue = new LinkedList<AppendRequestAndResponse>();
        this.inflightRequestQueue = new LinkedList<AppendRequestAndResponse>();
        if (builder.client == null) {
            BigQueryWriteSettings stubSettings = ((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)((BigQueryWriteSettings.Builder)BigQueryWriteSettings.newBuilder().setCredentialsProvider(builder.credentialsProvider)).setTransportChannelProvider(builder.channelProvider)).setEndpoint(builder.endpoint)).setHeaderProvider((HeaderProvider)FixedHeaderProvider.create((String[])new String[]{"x-goog-request-params", "write_stream=" + this.streamName}))).build();
            this.client = BigQueryWriteClient.create(stubSettings);
            this.ownsBigQueryWriteClient = true;
        } else {
            this.client = builder.client;
            this.ownsBigQueryWriteClient = false;
        }
        this.appendThread = new Thread(new Runnable(){

            @Override
            public void run() {
                StreamWriter.this.appendLoop();
            }
        });
        this.appendThread.start();
    }

    private void resetConnection() {
        this.streamConnection = new StreamConnection(this.client, new StreamConnection.RequestCallback(){

            @Override
            public void run(AppendRowsResponse response) {
                StreamWriter.this.requestCallback(response);
            }
        }, new StreamConnection.DoneCallback(){

            @Override
            public void run(Throwable finalStatus) {
                StreamWriter.this.doneCallback(finalStatus);
            }
        });
    }

    public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
        return this.append(rows, -1L);
    }

    public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
        AppendRowsRequest.Builder requestBuilder = AppendRowsRequest.newBuilder();
        requestBuilder.setProtoRows(AppendRowsRequest.ProtoData.newBuilder().setRows(rows).build());
        if (offset >= 0L) {
            requestBuilder.setOffset(Int64Value.of((long)offset));
        }
        return this.appendInternal(requestBuilder.build());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
        AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
        if (requestWrapper.messageSize > StreamWriter.getApiMaxRequestBytes()) {
            requestWrapper.appendResult.setException((Throwable)new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.INVALID_ARGUMENT).withDescription("MessageSize is too large. Max allow: " + StreamWriter.getApiMaxRequestBytes() + " Actual: " + requestWrapper.messageSize)));
            return requestWrapper.appendResult;
        }
        this.lock.lock();
        try {
            if (this.userClosed) {
                requestWrapper.appendResult.setException((Throwable)((Object)new Exceptions.StreamWriterClosedException(Status.fromCode((Status.Code)Status.Code.FAILED_PRECONDITION).withDescription("Connection is already closed"), this.streamName)));
                SettableApiFuture<AppendRowsResponse> settableApiFuture = requestWrapper.appendResult;
                return settableApiFuture;
            }
            if ((this.inflightRequests + 1L >= this.maxInflightRequests || this.inflightBytes + requestWrapper.messageSize >= this.maxInflightBytes) && this.limitExceededBehavior == FlowController.LimitExceededBehavior.ThrowException) {
                throw new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.RESOURCE_EXHAUSTED).withDescription("Exceeds client side inflight buffer, consider add more buffer or open more connections."));
            }
            if (this.connectionFinalStatus != null) {
                requestWrapper.appendResult.setException((Throwable)((Object)new Exceptions.StreamWriterClosedException(Status.fromCode((Status.Code)Status.Code.FAILED_PRECONDITION).withDescription("Connection is closed due to " + this.connectionFinalStatus.toString()), this.streamName)));
                SettableApiFuture<AppendRowsResponse> settableApiFuture = requestWrapper.appendResult;
                return settableApiFuture;
            }
            ++this.inflightRequests;
            this.inflightBytes += requestWrapper.messageSize;
            this.waitingRequestQueue.addLast(requestWrapper);
            this.hasMessageInWaitingQueue.signal();
            this.maybeWaitForInflightQuota();
            SettableApiFuture<AppendRowsResponse> settableApiFuture = requestWrapper.appendResult;
            return settableApiFuture;
        }
        finally {
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private void maybeWaitForInflightQuota() {
        long start_time = System.currentTimeMillis();
        while (this.inflightRequests >= this.maxInflightRequests || this.inflightBytes >= this.maxInflightBytes) {
            try {
                this.inflightReduced.await(100L, TimeUnit.MILLISECONDS);
            }
            catch (InterruptedException e) {
                log.warning("Interrupted while waiting for inflight quota. Stream: " + this.streamName + " Error: " + e.toString());
                throw new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.CANCELLED).withCause((Throwable)e).withDescription("Interrupted while waiting for quota."));
            }
        }
        this.inflightWaitSec.set((System.currentTimeMillis() - start_time) / 1000L);
    }

    public long getInflightWaitSeconds() {
        return this.inflightWaitSec.longValue();
    }

    @Override
    public void close() {
        log.info("User closing stream: " + this.streamName);
        this.lock.lock();
        try {
            this.userClosed = true;
        }
        finally {
            this.lock.unlock();
        }
        log.fine("Waiting for append thread to finish. Stream: " + this.streamName);
        try {
            this.appendThread.join();
            log.info("User close complete. Stream: " + this.streamName);
        }
        catch (InterruptedException e) {
            log.warning("Append handler join is interrupted. Stream: " + this.streamName + " Error: " + e.toString());
        }
        if (this.ownsBigQueryWriteClient) {
            this.client.close();
            try {
                this.client.awaitTermination(150L, TimeUnit.SECONDS);
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void appendLoop() {
        LinkedList<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
        boolean streamNeedsConnecting = false;
        boolean isFirstRequestInConnection = true;
        while (!this.waitingQueueDrained()) {
            this.lock.lock();
            try {
                this.hasMessageInWaitingQueue.await(100L, TimeUnit.MILLISECONDS);
                boolean bl = streamNeedsConnecting = !this.streamConnectionIsConnected && this.connectionFinalStatus == null;
                if (streamNeedsConnecting) {
                    while (!this.inflightRequestQueue.isEmpty()) {
                        this.waitingRequestQueue.addFirst(this.inflightRequestQueue.pollLast());
                    }
                }
                while (!this.waitingRequestQueue.isEmpty()) {
                    AppendRequestAndResponse requestWrapper = this.waitingRequestQueue.pollFirst();
                    this.inflightRequestQueue.addLast(requestWrapper);
                    localQueue.addLast(requestWrapper);
                }
            }
            catch (InterruptedException e) {
                log.warning("Interrupted while waiting for message. Stream: " + this.streamName + " Error: " + e.toString());
            }
            finally {
                this.lock.unlock();
            }
            if (localQueue.isEmpty()) continue;
            if (streamNeedsConnecting) {
                this.lock.lock();
                try {
                    this.streamConnectionIsConnected = true;
                }
                finally {
                    this.lock.unlock();
                }
                this.resetConnection();
                isFirstRequestInConnection = true;
            }
            while (!localQueue.isEmpty()) {
                AppendRowsRequest preparedRequest = this.prepareRequestBasedOnPosition(((AppendRequestAndResponse)localQueue.pollFirst()).message, isFirstRequestInConnection);
                this.streamConnection.send(preparedRequest);
                isFirstRequestInConnection = false;
            }
        }
        log.fine("Cleanup starts. Stream: " + this.streamName);
        if (this.streamConnection != null) {
            this.streamConnection.close();
            this.waitForDoneCallback(5L, TimeUnit.MINUTES);
        }
        log.fine("Stream connection is fully closed. Cleaning up inflight requests. Stream: " + this.streamName);
        this.cleanupInflightRequests();
        log.fine("Append thread is done. Stream: " + this.streamName);
    }

    private boolean waitingQueueDrained() {
        this.lock.lock();
        try {
            boolean bl = (this.userClosed || this.connectionFinalStatus != null) && this.waitingRequestQueue.isEmpty();
            return bl;
        }
        finally {
            this.lock.unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void waitForDoneCallback(long duration, TimeUnit timeUnit) {
        log.fine("Waiting for done callback from stream connection. Stream: " + this.streamName);
        long deadline = System.nanoTime() + timeUnit.toNanos(duration);
        while (System.nanoTime() <= deadline) {
            this.lock.lock();
            try {
                if (this.connectionFinalStatus != null) {
                    return;
                }
            }
            finally {
                this.lock.unlock();
            }
            Uninterruptibles.sleepUninterruptibly((long)100L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
        this.lock.lock();
        try {
            if (this.connectionFinalStatus == null) {
                this.connectionFinalStatus = new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.CANCELLED).withDescription("Timeout waiting for DoneCallback."));
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    private AppendRowsRequest prepareRequestBasedOnPosition(AppendRowsRequest original, boolean isFirstRequest) {
        AppendRowsRequest.Builder requestBuilder = original.toBuilder();
        if (isFirstRequest) {
            if (this.writerSchema != null) {
                requestBuilder.getProtoRowsBuilder().setWriterSchema(this.writerSchema);
            }
            requestBuilder.setWriteStream(this.streamName);
            if (this.traceId != null) {
                requestBuilder.setTraceId(this.traceId);
            }
        } else {
            requestBuilder.clearWriteStream();
            requestBuilder.getProtoRowsBuilder().clearWriterSchema();
        }
        return requestBuilder.build();
    }

    private void cleanupInflightRequests() {
        Throwable finalStatus = new Exceptions.WriterClosedException(this.streamName);
        LinkedList<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
        this.lock.lock();
        try {
            if (this.connectionFinalStatus != null) {
                finalStatus = this.connectionFinalStatus;
            }
            while (!this.inflightRequestQueue.isEmpty()) {
                localQueue.addLast(this.pollInflightRequestQueue());
            }
            this.inflightCleanuped = true;
        }
        finally {
            this.lock.unlock();
        }
        log.fine("Cleaning " + localQueue.size() + " inflight requests with error: " + finalStatus);
        while (!localQueue.isEmpty()) {
            ((AppendRequestAndResponse)localQueue.pollFirst()).appendResult.setException(finalStatus);
        }
    }

    private void requestCallback(AppendRowsResponse response) {
        AppendRequestAndResponse requestWrapper;
        block12: {
            this.lock.lock();
            if (response.hasUpdatedSchema()) {
                this.updatedSchema = response.getUpdatedSchema();
            }
            try {
                if (this.conectionRetryCountWithoutCallback != 0L) {
                    this.conectionRetryCountWithoutCallback = 0L;
                }
                if (!this.inflightRequestQueue.isEmpty()) {
                    requestWrapper = this.pollInflightRequestQueue();
                    break block12;
                }
                if (this.inflightCleanuped) {
                    return;
                }
                log.log(Level.WARNING, "Unexpected: request callback called on an empty inflight queue.");
                this.connectionFinalStatus = new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.FAILED_PRECONDITION).withDescription("Request callback called on an empty inflight queue."));
                return;
            }
            finally {
                this.lock.unlock();
            }
        }
        if (response.hasError()) {
            Exceptions.StorageException storageException = Exceptions.toStorageException(response.getError(), null);
            if (storageException != null) {
                requestWrapper.appendResult.setException((Throwable)((Object)storageException));
            } else {
                StatusRuntimeException exception = new StatusRuntimeException(Status.fromCodeValue((int)response.getError().getCode()).withDescription(response.getError().getMessage()));
                requestWrapper.appendResult.setException((Throwable)exception);
            }
        } else {
            requestWrapper.appendResult.set((Object)response);
        }
    }

    private boolean isRetriableError(Throwable t) {
        Status status = Status.fromThrowable((Throwable)t);
        if (Errors.isRetryableInternalStatus(status)) {
            return true;
        }
        return status.getCode() == Status.Code.ABORTED || status.getCode() == Status.Code.UNAVAILABLE;
    }

    private void doneCallback(Throwable finalStatus) {
        log.fine("Received done callback. Stream: " + this.streamName + " Final status: " + finalStatus.toString());
        this.lock.lock();
        try {
            this.streamConnectionIsConnected = false;
            if (this.connectionFinalStatus == null) {
                if (this.isRetriableError(finalStatus) && !this.userClosed) {
                    ++this.conectionRetryCountWithoutCallback;
                    log.fine("Retriable error " + finalStatus.toString() + " received, retry count " + this.conectionRetryCountWithoutCallback + " for stream " + this.streamName);
                } else {
                    Exceptions.StorageException storageException = Exceptions.toStorageException(finalStatus);
                    this.connectionFinalStatus = storageException != null ? storageException : finalStatus;
                    log.info("Connection finished with error " + finalStatus.toString() + " for stream " + this.streamName);
                }
            }
        }
        finally {
            this.lock.unlock();
        }
    }

    @GuardedBy(value="lock")
    private AppendRequestAndResponse pollInflightRequestQueue() {
        AppendRequestAndResponse requestWrapper = this.inflightRequestQueue.pollFirst();
        --this.inflightRequests;
        this.inflightBytes -= requestWrapper.messageSize;
        this.inflightReduced.signal();
        return requestWrapper;
    }

    public static Builder newBuilder(String streamName, BigQueryWriteClient client) {
        return new Builder(streamName, client);
    }

    public static Builder newBuilder(String streamName) {
        return new Builder(streamName);
    }

    public synchronized TableSchema getUpdatedSchema() {
        return this.updatedSchema;
    }

    private static final class AppendRequestAndResponse {
        final SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create();
        final AppendRowsRequest message;
        final long messageSize;

        AppendRequestAndResponse(AppendRowsRequest message) {
            this.message = message;
            this.messageSize = message.getProtoRows().getSerializedSize();
        }
    }

    public static final class Builder {
        private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
        private static final long DEFAULT_MAX_INFLIGHT_BYTES = 0x6400000L;
        private String streamName;
        private BigQueryWriteClient client;
        private ProtoSchema writerSchema = null;
        private long maxInflightRequest = 1000L;
        private long maxInflightBytes = 0x6400000L;
        private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
        private TransportChannelProvider channelProvider = BigQueryWriteSettings.defaultGrpcTransportProviderBuilder().setChannelsPerCpu(1.0).build();
        private CredentialsProvider credentialsProvider = BigQueryWriteSettings.defaultCredentialsProviderBuilder().build();
        private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block;
        private String traceId = null;
        private TableSchema updatedTableSchema = null;

        private Builder(String streamName) {
            this.streamName = (String)Preconditions.checkNotNull((Object)streamName);
            this.client = null;
        }

        private Builder(String streamName, BigQueryWriteClient client) {
            this.streamName = (String)Preconditions.checkNotNull((Object)streamName);
            this.client = (BigQueryWriteClient)Preconditions.checkNotNull((Object)client);
        }

        public Builder setWriterSchema(ProtoSchema writerSchema) {
            this.writerSchema = writerSchema;
            return this;
        }

        public Builder setMaxInflightRequests(long value) {
            this.maxInflightRequest = value;
            return this;
        }

        public Builder setMaxInflightBytes(long value) {
            this.maxInflightBytes = value;
            return this;
        }

        public Builder setEndpoint(String endpoint) {
            this.endpoint = (String)Preconditions.checkNotNull((Object)endpoint, (Object)"Endpoint is null.");
            return this;
        }

        public Builder setChannelProvider(TransportChannelProvider channelProvider) {
            this.channelProvider = (TransportChannelProvider)Preconditions.checkNotNull((Object)channelProvider, (Object)"ChannelProvider is null.");
            return this;
        }

        public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = (CredentialsProvider)Preconditions.checkNotNull((Object)credentialsProvider, (Object)"CredentialsProvider is null.");
            return this;
        }

        public Builder setTraceId(String traceId) {
            int colonIndex = traceId.indexOf(58);
            if (colonIndex == -1 || colonIndex == 0 || colonIndex == traceId.length() - 1) {
                throw new IllegalArgumentException("TraceId must follow the format of A:B. Actual:" + traceId);
            }
            this.traceId = traceId;
            return this;
        }

        public Builder setLimitExceededBehavior(FlowController.LimitExceededBehavior limitExceededBehavior) throws StatusRuntimeException {
            if (limitExceededBehavior == FlowController.LimitExceededBehavior.Ignore) {
                throw new StatusRuntimeException(Status.fromCode((Status.Code)Status.Code.INVALID_ARGUMENT).withDescription("LimitExceededBehavior.Ignore is not supported on StreamWriter."));
            }
            this.limitExceededBehavior = limitExceededBehavior;
            return this;
        }

        public StreamWriter build() throws IOException {
            return new StreamWriter(this);
        }
    }
}

