package com.google.cloud.spanner;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.ExecutorProvider;
import com.google.cloud.spanner.AsyncResultSet;
import com.google.common.base.Function;
import com.google.common.collect.Range;
import com.google.common.truth.Truth;
import com.google.protobuf.ByteString;
import com.google.protobuf.Value;
import com.google.spanner.v1.PartialResultSet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

@RunWith(JUnit4.class)
/* loaded from: input_file:com/google/cloud/spanner/AsyncResultSetImplTest.class */
public class AsyncResultSetImplTest {
    private ExecutorProvider mockedProvider;
    private ExecutorProvider simpleProvider;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.google.cloud.spanner.AsyncResultSetImplTest$2, reason: invalid class name */
    /* loaded from: input_file:com/google/cloud/spanner/AsyncResultSetImplTest$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$com$google$cloud$spanner$AsyncResultSet$CursorState = new int[AsyncResultSet.CursorState.values().length];

        static {
            try {
                $SwitchMap$com$google$cloud$spanner$AsyncResultSet$CursorState[AsyncResultSet.CursorState.OK.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$AsyncResultSet$CursorState[AsyncResultSet.CursorState.DONE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$google$cloud$spanner$AsyncResultSet$CursorState[AsyncResultSet.CursorState.NOT_READY.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    @Before
    public void setup() {
        this.mockedProvider = (ExecutorProvider) Mockito.mock(ExecutorProvider.class);
        Mockito.when(this.mockedProvider.getExecutor()).thenReturn((ScheduledExecutorService) Mockito.mock(ScheduledExecutorService.class));
        this.simpleProvider = SpannerOptions.createAsyncExecutorProvider(1, 1L, TimeUnit.SECONDS);
    }

    @Test
    public void close() {
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.mockedProvider, (ResultSet) Mockito.mock(ResultSet.class), 10);
        asyncResultSetImpl.close();
        asyncResultSetImpl.close();
        Assert.assertThrows(IllegalStateException.class, () -> {
            asyncResultSetImpl.setCallback((Executor) Mockito.mock(Executor.class), (AsyncResultSet.ReadyCallback) Mockito.mock(AsyncResultSet.ReadyCallback.class));
        });
        Assert.assertThrows(IllegalStateException.class, () -> {
            asyncResultSetImpl.toList((Function) Mockito.mock(Function.class));
        });
        Assert.assertThrows(IllegalStateException.class, () -> {
            asyncResultSetImpl.toListAsync((Function) Mockito.mock(Function.class), (Executor) Mockito.mock(Executor.class));
        });
        AsyncResultSetImpl asyncResultSetImpl2 = new AsyncResultSetImpl(this.mockedProvider, (ResultSet) Mockito.mock(ResultSet.class), 10);
        asyncResultSetImpl2.setCallback((Executor) Mockito.mock(Executor.class), (AsyncResultSet.ReadyCallback) Mockito.mock(AsyncResultSet.ReadyCallback.class));
        asyncResultSetImpl2.close();
        asyncResultSetImpl2.cancel();
        asyncResultSetImpl2.resume();
    }

    @Test
    public void tryNextNotAllowed() {
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.mockedProvider, (ResultSet) Mockito.mock(ResultSet.class), 10);
        try {
            asyncResultSetImpl.setCallback((Executor) Mockito.mock(Executor.class), (AsyncResultSet.ReadyCallback) Mockito.mock(AsyncResultSet.ReadyCallback.class));
            Truth.assertThat(((IllegalStateException) Assert.assertThrows(IllegalStateException.class, () -> {
                asyncResultSetImpl.tryNext();
            })).getMessage()).contains("tryNext may only be called from a DataReady callback.");
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void toList() {
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            Truth.assertThat(asyncResultSetImpl.toList(structReader -> {
                return new Object();
            })).hasSize(3);
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void toListPropagatesError() {
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "invalid query")});
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            SpannerException assertThrows = Assert.assertThrows(SpannerException.class, () -> {
                asyncResultSetImpl.toList(structReader -> {
                    return new Object();
                });
            });
            Truth.assertThat(assertThrows.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
            Truth.assertThat(assertThrows.getMessage()).contains("invalid query");
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void toListAsync() throws InterruptedException, ExecutionException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            Truth.assertThat((Iterable) asyncResultSetImpl.toListAsync(structReader -> {
                return new Object();
            }, newFixedThreadPool).get()).hasSize(3);
            asyncResultSetImpl.close();
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void toListAsyncPropagatesError() throws InterruptedException {
        ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(1);
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "invalid query")});
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
                asyncResultSetImpl.toListAsync(structReader -> {
                    return new Object();
                }, newFixedThreadPool).get();
            });
            Truth.assertThat(executionException.getCause()).isInstanceOf(SpannerException.class);
            SpannerException cause = executionException.getCause();
            Truth.assertThat(cause.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
            Truth.assertThat(cause.getMessage()).contains("invalid query");
            asyncResultSetImpl.close();
            newFixedThreadPool.shutdown();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void withCallback() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicInteger atomicInteger2 = new AtomicInteger();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                AsyncResultSet.CursorState tryNext;
                atomicInteger.incrementAndGet();
                while (true) {
                    tryNext = asyncResultSet.tryNext();
                    if (tryNext != AsyncResultSet.CursorState.OK) {
                        break;
                    }
                    atomicInteger2.incrementAndGet();
                }
                if (tryNext == AsyncResultSet.CursorState.DONE) {
                    countDownLatch.countDown();
                }
                return AsyncResultSet.CallbackResponse.CONTINUE;
            });
            asyncResultSetImpl.close();
            countDownLatch.await();
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isIn(Range.closed(1, 5));
            Truth.assertThat(Integer.valueOf(atomicInteger2.get())).isEqualTo(3);
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void callbackReceivesError() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "invalid query")});
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(1);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                try {
                    asyncResultSet.tryNext();
                    linkedBlockingDeque.push(new Exception("missing expected exception"));
                } catch (SpannerException e) {
                    linkedBlockingDeque.push(e);
                }
                return AsyncResultSet.CallbackResponse.DONE;
            });
            asyncResultSetImpl.close();
            SpannerException spannerException = (Exception) linkedBlockingDeque.take();
            Truth.assertThat(spannerException).isInstanceOf(SpannerException.class);
            SpannerException spannerException2 = spannerException;
            Truth.assertThat(spannerException2.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
            Truth.assertThat(spannerException2.getMessage()).contains("invalid query");
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void callbackReceivesErrorHalfwayThrough() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true).thenThrow(new Throwable[]{SpannerExceptionFactory.newSpannerException(ErrorCode.INVALID_ARGUMENT, "invalid query")});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AtomicInteger atomicInteger = new AtomicInteger();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(1);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                try {
                    if (asyncResultSet.tryNext() != AsyncResultSet.CursorState.DONE) {
                        atomicInteger.incrementAndGet();
                        return AsyncResultSet.CallbackResponse.CONTINUE;
                    }
                } catch (SpannerException e) {
                    linkedBlockingDeque.push(e);
                }
                return AsyncResultSet.CallbackResponse.DONE;
            });
            asyncResultSetImpl.close();
            SpannerException spannerException = (Exception) linkedBlockingDeque.take();
            Truth.assertThat(spannerException).isInstanceOf(SpannerException.class);
            SpannerException spannerException2 = spannerException;
            Truth.assertThat(spannerException2.getErrorCode()).isEqualTo(ErrorCode.INVALID_ARGUMENT);
            Truth.assertThat(spannerException2.getMessage()).contains("invalid query");
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(1);
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void pauseResume() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AtomicInteger atomicInteger = new AtomicInteger();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                atomicInteger.incrementAndGet();
                if (asyncResultSet.tryNext() != AsyncResultSet.CursorState.OK) {
                    atomicBoolean.set(true);
                    return AsyncResultSet.CallbackResponse.DONE;
                }
                try {
                    linkedBlockingDeque.put(new Object());
                    return AsyncResultSet.CallbackResponse.PAUSE;
                } catch (InterruptedException e) {
                    return AsyncResultSet.CallbackResponse.DONE;
                }
            });
            int i = 0;
            while (!atomicBoolean.get()) {
                if (linkedBlockingDeque.poll(1L, TimeUnit.MILLISECONDS) != null) {
                    i++;
                }
                asyncResultSetImpl.resume();
            }
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(4);
            Truth.assertThat(Integer.valueOf(i)).isEqualTo(3);
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCallbackIsNotCalledWhilePaused() throws InterruptedException, ExecutionException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenAnswer(new Answer<Boolean>() { // from class: com.google.cloud.spanner.AsyncResultSetImplTest.1
            int row = 0;

            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public Boolean m6answer(InvocationOnMock invocationOnMock) throws Throwable {
                this.row++;
                return this.row <= 100;
            }
        });
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AtomicInteger atomicInteger = new AtomicInteger();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            ApiFuture callback = asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                Assert.assertFalse(atomicBoolean.get());
                atomicInteger.incrementAndGet();
                while (true) {
                    try {
                        switch (AnonymousClass2.$SwitchMap$com$google$cloud$spanner$AsyncResultSet$CursorState[asyncResultSet.tryNext().ordinal()]) {
                            case 1:
                                atomicBoolean.set(true);
                                linkedBlockingDeque.put(new Object());
                                return AsyncResultSet.CallbackResponse.PAUSE;
                            case 2:
                                return AsyncResultSet.CallbackResponse.DONE;
                            case 3:
                                return AsyncResultSet.CallbackResponse.CONTINUE;
                        }
                    } catch (InterruptedException e) {
                        throw SpannerExceptionFactory.propagateInterrupt(e);
                    }
                }
            });
            int i = 0;
            while (!callback.isDone()) {
                if (linkedBlockingDeque.poll(1L, TimeUnit.MILLISECONDS) != null) {
                    i++;
                }
                Thread.yield();
                atomicBoolean.set(false);
                asyncResultSetImpl.resume();
            }
            while (linkedBlockingDeque.poll() != null) {
                i++;
            }
            Assert.assertNull(callback.get());
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(101);
            Truth.assertThat(Integer.valueOf(i)).isEqualTo(100);
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testCallbackIsNotCalledWhilePausedAndCanceled() throws InterruptedException, ExecutionException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        StreamingResultSet streamingResultSet = (StreamingResultSet) Mockito.mock(StreamingResultSet.class);
        AtomicInteger atomicInteger = new AtomicInteger();
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, streamingResultSet, 10);
        try {
            Mockito.when(Boolean.valueOf(streamingResultSet.initiateStreaming((AsyncResultSet.StreamMessageListener) ArgumentMatchers.any(AsyncResultSet.StreamMessageListener.class)))).thenAnswer(invocationOnMock -> {
                asyncResultSetImpl.onStreamMessage(PartialResultSet.newBuilder().build(), false);
                return null;
            });
            ApiFuture callback = asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                atomicInteger.getAndIncrement();
                return AsyncResultSet.CallbackResponse.PAUSE;
            });
            asyncResultSetImpl.cancel();
            Assert.assertEquals(ErrorCode.CANCELLED, Assert.assertThrows(SpannerException.class, () -> {
                SpannerApiFutures.get(callback);
            }).getErrorCode());
            Assert.assertEquals(1L, atomicInteger.get());
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void cancel() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AtomicInteger atomicInteger = new AtomicInteger();
        LinkedBlockingDeque linkedBlockingDeque = new LinkedBlockingDeque(1);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                atomicInteger.incrementAndGet();
                try {
                    if (asyncResultSet.tryNext() == AsyncResultSet.CursorState.OK) {
                        try {
                            linkedBlockingDeque.put(new Object());
                        } catch (InterruptedException e) {
                            return AsyncResultSet.CallbackResponse.DONE;
                        }
                    }
                    return atomicInteger.get() == 2 ? AsyncResultSet.CallbackResponse.PAUSE : AsyncResultSet.CallbackResponse.CONTINUE;
                } catch (SpannerException e2) {
                    if (e2.getErrorCode() == ErrorCode.CANCELLED) {
                        atomicBoolean.set(true);
                    }
                    return AsyncResultSet.CallbackResponse.DONE;
                }
            });
            int i = 0;
            while (!atomicBoolean.get()) {
                if (linkedBlockingDeque.poll(1L, TimeUnit.MILLISECONDS) != null) {
                    i++;
                }
                if (i == 2) {
                    asyncResultSetImpl.cancel();
                    asyncResultSetImpl.resume();
                }
            }
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isIn(Range.closed(2, 4));
            Truth.assertThat(Integer.valueOf(i)).isIn(Range.closed(2, 3));
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void callbackReturnsError() throws InterruptedException {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AtomicInteger atomicInteger = new AtomicInteger();
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                atomicInteger.incrementAndGet();
                throw new RuntimeException("async test");
            });
            ExecutionException executionException = (ExecutionException) Assert.assertThrows(ExecutionException.class, () -> {
                asyncResultSetImpl.getResult().get();
            });
            Truth.assertThat(executionException.getCause()).isInstanceOf(SpannerException.class);
            SpannerException cause = executionException.getCause();
            Truth.assertThat(cause.getErrorCode()).isEqualTo(ErrorCode.UNKNOWN);
            Truth.assertThat(cause.getMessage()).contains("async test");
            Truth.assertThat(Integer.valueOf(atomicInteger.get())).isEqualTo(1);
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void callbackReturnsDoneBeforeEnd_shouldStopIteration() throws Exception {
        ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
        ResultSet resultSet = (ResultSet) Mockito.mock(ResultSet.class);
        Mockito.when(Boolean.valueOf(resultSet.next())).thenReturn(true, new Boolean[]{true, true, false});
        Mockito.when(resultSet.getCurrentRowAsStruct()).thenReturn((Struct) Mockito.mock(Struct.class));
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.simpleProvider, resultSet, 10);
        try {
            asyncResultSetImpl.setCallback(newSingleThreadExecutor, asyncResultSet -> {
                return AsyncResultSet.CallbackResponse.DONE;
            });
            asyncResultSetImpl.getResult().get(10L, TimeUnit.SECONDS);
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOnStreamMessageWhenResumeTokenIsPresent() {
        StreamingResultSet streamingResultSet = (StreamingResultSet) Mockito.mock(StreamingResultSet.class);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.mockedProvider, streamingResultSet, 10);
        try {
            Mockito.when(Boolean.valueOf(streamingResultSet.initiateStreaming((AsyncResultSet.StreamMessageListener) Mockito.any(AsyncResultSet.StreamMessageListener.class)))).thenReturn(true);
            asyncResultSetImpl.setCallback(Executors.newSingleThreadExecutor(), asyncResultSet -> {
                return AsyncResultSet.CallbackResponse.DONE;
            });
            asyncResultSetImpl.onStreamMessage(PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false);
            asyncResultSetImpl.onStreamMessage(PartialResultSet.newBuilder().setResumeToken(ByteString.copyFromUtf8("test")).build(), false);
            ((ScheduledExecutorService) Mockito.verify(this.mockedProvider.getExecutor(), Mockito.times(2))).execute((Runnable) Mockito.any());
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOnStreamMessageWhenCurrentBufferSizeReachedPrefetchChunkSize() {
        StreamingResultSet streamingResultSet = (StreamingResultSet) Mockito.mock(StreamingResultSet.class);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.mockedProvider, streamingResultSet, 10);
        try {
            Mockito.when(Boolean.valueOf(streamingResultSet.initiateStreaming((AsyncResultSet.StreamMessageListener) Mockito.any(AsyncResultSet.StreamMessageListener.class)))).thenReturn(true);
            asyncResultSetImpl.setCallback(Executors.newSingleThreadExecutor(), asyncResultSet -> {
                return AsyncResultSet.CallbackResponse.DONE;
            });
            asyncResultSetImpl.onStreamMessage(PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), true);
            ((ScheduledExecutorService) Mockito.verify(this.mockedProvider.getExecutor(), Mockito.times(2))).execute((Runnable) Mockito.any());
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    @Test
    public void testOnStreamMessageWhenAsyncResultIsCancelled() {
        StreamingResultSet streamingResultSet = (StreamingResultSet) Mockito.mock(StreamingResultSet.class);
        AsyncResultSetImpl asyncResultSetImpl = new AsyncResultSetImpl(this.mockedProvider, streamingResultSet, 10);
        try {
            Mockito.when(Boolean.valueOf(streamingResultSet.initiateStreaming((AsyncResultSet.StreamMessageListener) Mockito.any(AsyncResultSet.StreamMessageListener.class)))).thenReturn(true);
            asyncResultSetImpl.setCallback(Executors.newSingleThreadExecutor(), asyncResultSet -> {
                return AsyncResultSet.CallbackResponse.DONE;
            });
            asyncResultSetImpl.cancel();
            asyncResultSetImpl.onStreamMessage(PartialResultSet.newBuilder().addValues(Value.newBuilder().build()).build(), false);
            ((ScheduledExecutorService) Mockito.verify(this.mockedProvider.getExecutor(), Mockito.times(2))).execute((Runnable) Mockito.any());
            asyncResultSetImpl.close();
        } catch (Throwable th) {
            try {
                asyncResultSetImpl.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }
}
