package org.apache.beam.fn.harness.state;

import java.util.NoSuchElementException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterator;
import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;

/* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators.class */
public class StateFetchingIterators {

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$FirstPageAndRemainder.class */
    public static class FirstPageAndRemainder<T> implements PrefetchableIterable<T> {
        private final BeamFnStateClient beamFnStateClient;
        private final BeamFnApi.StateRequest stateRequestForFirstChunk;
        private final Coder<T> valueCoder;
        private LazyCachingIteratorToIterable<T> firstPage;
        private CompletableFuture<BeamFnApi.StateResponse> firstPageResponseFuture;
        private ByteString continuationToken;

        FirstPageAndRemainder(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest, Coder<T> coder) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequest;
            this.valueCoder = coder;
        }

        @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterable, java.lang.Iterable
        public PrefetchableIterator<T> iterator() {
            return new PrefetchableIterator<T>() { // from class: org.apache.beam.fn.harness.state.StateFetchingIterators.FirstPageAndRemainder.1
                PrefetchableIterator<T> delegate;

                private void ensureDelegateExists() {
                    if (this.delegate == null) {
                        FirstPageAndRemainder.this.prefetchFirstPage();
                        if (FirstPageAndRemainder.this.firstPage == null) {
                            try {
                                BeamFnApi.StateResponse stateResponse = (BeamFnApi.StateResponse) FirstPageAndRemainder.this.firstPageResponseFuture.get();
                                FirstPageAndRemainder.this.continuationToken = stateResponse.getGet().getContinuationToken();
                                FirstPageAndRemainder.this.firstPage = new LazyCachingIteratorToIterable(new DataStreams.DataStreamDecoder(FirstPageAndRemainder.this.valueCoder, PrefetchableIterators.fromArray(stateResponse.getGet().getData())));
                            } catch (InterruptedException e) {
                                Thread.currentThread().interrupt();
                                throw new IllegalStateException(e);
                            } catch (ExecutionException e2) {
                                if (e2.getCause() == null) {
                                    throw new IllegalStateException(e2);
                                }
                                Throwables.throwIfUnchecked(e2.getCause());
                                throw new IllegalStateException(e2.getCause());
                            }
                        }
                        if (ByteString.EMPTY.equals(FirstPageAndRemainder.this.continuationToken)) {
                            this.delegate = FirstPageAndRemainder.this.firstPage.iterator();
                        } else {
                            this.delegate = PrefetchableIterators.concat(FirstPageAndRemainder.this.firstPage.iterator(), new DataStreams.DataStreamDecoder(FirstPageAndRemainder.this.valueCoder, new LazyBlockingStateFetchingIterator(FirstPageAndRemainder.this.beamFnStateClient, FirstPageAndRemainder.this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(FirstPageAndRemainder.this.continuationToken)).build())));
                        }
                    }
                }

                @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterator
                public boolean isReady() {
                    if (this.delegate != null) {
                        return this.delegate.isReady();
                    }
                    if (FirstPageAndRemainder.this.firstPageResponseFuture != null) {
                        return FirstPageAndRemainder.this.firstPageResponseFuture.isDone();
                    }
                    return false;
                }

                @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterator
                public void prefetch() {
                    if (FirstPageAndRemainder.this.firstPageResponseFuture == null) {
                        FirstPageAndRemainder.this.prefetchFirstPage();
                    } else {
                        if (this.delegate == null || this.delegate.isReady()) {
                            return;
                        }
                        this.delegate.prefetch();
                    }
                }

                @Override // java.util.Iterator
                public boolean hasNext() {
                    if (this.delegate != null) {
                        return this.delegate.hasNext();
                    }
                    ensureDelegateExists();
                    boolean hasNext = this.delegate.hasNext();
                    this.delegate.prefetch();
                    return hasNext;
                }

                @Override // java.util.Iterator
                public T next() {
                    if (this.delegate != null) {
                        return this.delegate.next();
                    }
                    ensureDelegateExists();
                    T next = this.delegate.next();
                    this.delegate.prefetch();
                    return next;
                }
            };
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void prefetchFirstPage() {
            if (this.firstPageResponseFuture == null) {
                this.firstPageResponseFuture = this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(this.stateRequestForFirstChunk.getGet()));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$LazyBlockingStateFetchingIterator.class */
    public static class LazyBlockingStateFetchingIterator implements PrefetchableIterator<ByteString> {
        private final BeamFnStateClient beamFnStateClient;
        private final BeamFnApi.StateRequest stateRequestForFirstChunk;
        private State currentState = State.READ_REQUIRED;
        private ByteString continuationToken;
        private ByteString next;
        private CompletableFuture<BeamFnApi.StateResponse> prefetchedResponse;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$LazyBlockingStateFetchingIterator$State.class */
        public enum State {
            READ_REQUIRED,
            HAS_NEXT,
            EOF
        }

        LazyBlockingStateFetchingIterator(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequest;
            this.continuationToken = stateRequest.getGet().getContinuationToken();
        }

        @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterator
        public boolean isReady() {
            return this.prefetchedResponse == null ? this.currentState != State.READ_REQUIRED : this.prefetchedResponse.isDone();
        }

        @Override // org.apache.beam.sdk.fn.stream.PrefetchableIterator
        public void prefetch() {
            if (this.currentState == State.READ_REQUIRED && this.prefetchedResponse == null) {
                this.prefetchedResponse = this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(this.continuationToken)));
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            switch (this.currentState) {
                case EOF:
                    return false;
                case READ_REQUIRED:
                    prefetch();
                    try {
                        BeamFnApi.StateResponse stateResponse = this.prefetchedResponse.get();
                        this.prefetchedResponse = null;
                        this.continuationToken = stateResponse.getGet().getContinuationToken();
                        this.next = stateResponse.getGet().getData();
                        this.currentState = State.HAS_NEXT;
                        return true;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException(e);
                    } catch (ExecutionException e2) {
                        if (e2.getCause() == null) {
                            throw new IllegalStateException(e2);
                        }
                        Throwables.throwIfUnchecked(e2.getCause());
                        throw new IllegalStateException(e2.getCause());
                    }
                case HAS_NEXT:
                    return true;
                default:
                    throw new IllegalStateException(String.format("Unknown state %s", this.currentState));
            }
        }

        @Override // java.util.Iterator
        public ByteString next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            if (ByteString.EMPTY.equals(this.continuationToken)) {
                this.currentState = State.EOF;
            } else {
                this.currentState = State.READ_REQUIRED;
                prefetch();
            }
            return this.next;
        }
    }

    private StateFetchingIterators() {
    }

    public static PrefetchableIterator<ByteString> readAllStartingFrom(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest) {
        return new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequest);
    }

    public static <T> PrefetchableIterable<T> readAllAndDecodeStartingFrom(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest, Coder<T> coder) {
        return new FirstPageAndRemainder(beamFnStateClient, stateRequest, coder);
    }
}
