package com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2;

import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.ApiFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.api.core.SettableApiFuture;
import com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2.StreamConnection;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.base.Preconditions;
import com.google.cloud.spark.bigquery.repackaged.com.google.common.util.concurrent.Uninterruptibles;
import com.google.cloud.spark.bigquery.repackaged.io.grpc.Status;
import com.google.cloud.spark.bigquery.repackaged.io.grpc.StatusRuntimeException;
import java.util.Deque;
import java.util.LinkedList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

/* loaded from: input_file:com/google/cloud/spark/bigquery/repackaged/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.class */
public class StreamWriterV2 implements AutoCloseable {
    private static final Logger log = Logger.getLogger(StreamWriterV2.class.getName());
    private Lock lock;
    private Condition hasMessageInWaitingQueue;
    private final String streamName;

    @GuardedBy("lock")
    private boolean userClosed;

    @GuardedBy("lock")
    private Throwable connectionFinalStatus;

    @GuardedBy("lock")
    private final Deque<AppendRequestAndResponse> waitingRequestQueue;

    @GuardedBy("lock")
    private final Deque<AppendRequestAndResponse> inflightRequestQueue;
    private StreamConnection streamConnection;
    private Thread appendThread;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/spark/bigquery/repackaged/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2$AppendRequestAndResponse.class */
    public static final class AppendRequestAndResponse {
        final SettableApiFuture<AppendRowsResponse> appendResult = SettableApiFuture.create();
        final AppendRowsRequest message;

        AppendRequestAndResponse(AppendRowsRequest appendRowsRequest) {
            this.message = appendRowsRequest;
        }
    }

    /* loaded from: input_file:com/google/cloud/spark/bigquery/repackaged/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2$Builder.class */
    public static final class Builder {
        private String streamName;
        private BigQueryWriteClient client;

        private Builder(String str, BigQueryWriteClient bigQueryWriteClient) {
            this.streamName = (String) Preconditions.checkNotNull(str);
            this.client = (BigQueryWriteClient) Preconditions.checkNotNull(bigQueryWriteClient);
        }

        public StreamWriterV2 build() {
            return new StreamWriterV2(this);
        }
    }

    private StreamWriterV2(Builder builder) {
        this.userClosed = false;
        this.connectionFinalStatus = null;
        this.lock = new ReentrantLock();
        this.hasMessageInWaitingQueue = this.lock.newCondition();
        this.streamName = builder.streamName;
        this.waitingRequestQueue = new LinkedList();
        this.inflightRequestQueue = new LinkedList();
        this.streamConnection = new StreamConnection(builder.client, new StreamConnection.RequestCallback() { // from class: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2.1
            @Override // com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2.StreamConnection.RequestCallback
            public void run(AppendRowsResponse appendRowsResponse) {
                StreamWriterV2.this.requestCallback(appendRowsResponse);
            }
        }, new StreamConnection.DoneCallback() { // from class: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2.2
            @Override // com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2.StreamConnection.DoneCallback
            public void run(Throwable th) {
                StreamWriterV2.this.doneCallback(th);
            }
        });
        this.appendThread = new Thread(new Runnable() { // from class: com.google.cloud.spark.bigquery.repackaged.com.google.cloud.bigquery.storage.v1beta2.StreamWriterV2.3
            @Override // java.lang.Runnable
            public void run() {
                StreamWriterV2.this.appendLoop();
            }
        });
        this.appendThread.start();
    }

    public ApiFuture<AppendRowsResponse> append(AppendRowsRequest appendRowsRequest) {
        AppendRequestAndResponse appendRequestAndResponse = new AppendRequestAndResponse(appendRowsRequest);
        this.lock.lock();
        try {
            if (this.userClosed) {
                appendRequestAndResponse.appendResult.setException(new StatusRuntimeException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Stream is already closed")));
                SettableApiFuture<AppendRowsResponse> settableApiFuture = appendRequestAndResponse.appendResult;
                this.lock.unlock();
                return settableApiFuture;
            }
            if (this.connectionFinalStatus != null) {
                appendRequestAndResponse.appendResult.setException(new StatusRuntimeException(Status.fromCode(Status.Code.FAILED_PRECONDITION).withDescription("Stream is closed due to " + this.connectionFinalStatus.toString())));
                SettableApiFuture<AppendRowsResponse> settableApiFuture2 = appendRequestAndResponse.appendResult;
                this.lock.unlock();
                return settableApiFuture2;
            }
            this.waitingRequestQueue.addLast(appendRequestAndResponse);
            this.hasMessageInWaitingQueue.signal();
            SettableApiFuture<AppendRowsResponse> settableApiFuture3 = appendRequestAndResponse.appendResult;
            this.lock.unlock();
            return settableApiFuture3;
        } catch (Throwable th) {
            this.lock.unlock();
            throw th;
        }
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        log.info("User closing stream: " + this.streamName);
        this.lock.lock();
        try {
            this.userClosed = true;
            log.info("Waiting for append thread to finish. Stream: " + this.streamName);
            try {
                this.appendThread.join();
                log.info("User close complete. Stream: " + this.streamName);
            } catch (InterruptedException e) {
                log.warning("Append handler join is interrupted. Stream: " + this.streamName + " Error: " + e.toString());
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void appendLoop() {
        LinkedList linkedList = new LinkedList();
        while (!waitingQueueDrained()) {
            this.lock.lock();
            try {
                this.hasMessageInWaitingQueue.await(100L, TimeUnit.MILLISECONDS);
                while (!this.waitingRequestQueue.isEmpty()) {
                    AppendRequestAndResponse pollFirst = this.waitingRequestQueue.pollFirst();
                    this.inflightRequestQueue.addLast(pollFirst);
                    linkedList.addLast(pollFirst);
                }
            } catch (InterruptedException e) {
                log.warning("Interrupted while waiting for message. Stream: " + this.streamName + " Error: " + e.toString());
            } finally {
                this.lock.unlock();
            }
            if (!linkedList.isEmpty()) {
                while (!linkedList.isEmpty()) {
                    this.streamConnection.send(((AppendRequestAndResponse) linkedList.pollFirst()).message);
                }
            }
        }
        log.info("Cleanup starts. Stream: " + this.streamName);
        this.streamConnection.close();
        waitForDoneCallback();
        log.info("Stream connection is fully closed. Cleaning up inflight requests. Stream: " + this.streamName);
        cleanupInflightRequests();
        log.info("Append thread is done. Stream: " + this.streamName);
    }

    private boolean waitingQueueDrained() {
        boolean z;
        this.lock.lock();
        try {
            if (this.userClosed || this.connectionFinalStatus != null) {
                if (this.waitingRequestQueue.isEmpty()) {
                    z = true;
                    return z;
                }
            }
            z = false;
            return z;
        } finally {
            this.lock.unlock();
        }
    }

    private void waitForDoneCallback() {
        log.info("Waiting for done callback from stream connection. Stream: " + this.streamName);
        while (true) {
            this.lock.lock();
            try {
                if (this.connectionFinalStatus != null) {
                    return;
                }
                this.lock.unlock();
                Uninterruptibles.sleepUninterruptibly(100L, TimeUnit.MILLISECONDS);
            } finally {
                this.lock.unlock();
            }
        }
    }

    private void cleanupInflightRequests() {
        LinkedList linkedList = new LinkedList();
        this.lock.lock();
        try {
            Throwable th = this.connectionFinalStatus;
            while (!this.inflightRequestQueue.isEmpty()) {
                linkedList.addLast(this.inflightRequestQueue.pollFirst());
            }
            log.info("Cleaning " + linkedList.size() + " inflight requests with error: " + th.toString());
            while (!linkedList.isEmpty()) {
                ((AppendRequestAndResponse) linkedList.pollFirst()).appendResult.setException(th);
            }
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void requestCallback(AppendRowsResponse appendRowsResponse) {
        this.lock.lock();
        try {
            this.inflightRequestQueue.pollFirst().appendResult.set(appendRowsResponse);
        } finally {
            this.lock.unlock();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void doneCallback(Throwable th) {
        log.info("Received done callback. Stream: " + this.streamName + " Final status: " + th.toString());
        this.lock.lock();
        try {
            this.connectionFinalStatus = th;
        } finally {
            this.lock.unlock();
        }
    }

    public static Builder newBuilder(String str, BigQueryWriteClient bigQueryWriteClient) {
        return new Builder(str, bigQueryWriteClient);
    }
}
