package org.apache.beam.fn.harness.state;

import java.util.Collections;
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.fn.stream.DataStreams;
import org.apache.beam.vendor.grpc.v1p36p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;

/* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators.class */
public class StateFetchingIterators {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$FirstPageAndRemainder.class */
    public static class FirstPageAndRemainder {
        private final BeamFnStateClient beamFnStateClient;
        private final BeamFnApi.StateRequest stateRequestForFirstChunk;
        private ByteString firstPage;
        private ByteString continuationToken;

        private FirstPageAndRemainder(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest) {
            this.firstPage = null;
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequest;
        }

        public ByteString firstPage() {
            if (this.firstPage == null) {
                CompletableFuture<BeamFnApi.StateResponse> completableFuture = new CompletableFuture<>();
                this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(this.stateRequestForFirstChunk.getGet()), completableFuture);
                try {
                    BeamFnApi.StateResponse stateResponse = completableFuture.get();
                    this.continuationToken = stateResponse.getGet().getContinuationToken();
                    this.firstPage = stateResponse.getGet().getData();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException(e);
                } catch (ExecutionException e2) {
                    if (e2.getCause() == null) {
                        throw new IllegalStateException(e2);
                    }
                    Throwables.throwIfUnchecked(e2.getCause());
                    throw new IllegalStateException(e2.getCause());
                }
            }
            return this.firstPage;
        }

        public Iterator<ByteString> remainder() {
            firstPage();
            return ByteString.EMPTY.equals(this.continuationToken) ? Collections.emptyIterator() : new LazyBlockingStateFetchingIterator(this.beamFnStateClient, this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(this.continuationToken)).build());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$LazyBlockingStateFetchingIterator.class */
    public static class LazyBlockingStateFetchingIterator implements Iterator<ByteString> {
        private final BeamFnStateClient beamFnStateClient;
        private final BeamFnApi.StateRequest stateRequestForFirstChunk;
        private State currentState = State.READ_REQUIRED;
        private ByteString continuationToken;
        private ByteString next;
        private CompletableFuture<BeamFnApi.StateResponse> prefetchedResponse;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$LazyBlockingStateFetchingIterator$State.class */
        public enum State {
            READ_REQUIRED,
            HAS_NEXT,
            EOF
        }

        LazyBlockingStateFetchingIterator(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest) {
            this.beamFnStateClient = beamFnStateClient;
            this.stateRequestForFirstChunk = stateRequest;
            this.continuationToken = stateRequest.getGet().getContinuationToken();
        }

        private void prefetch() {
            if (this.prefetchedResponse == null && this.currentState == State.READ_REQUIRED) {
                this.prefetchedResponse = new CompletableFuture<>();
                this.beamFnStateClient.handle(this.stateRequestForFirstChunk.toBuilder().setGet(BeamFnApi.StateGetRequest.newBuilder().setContinuationToken(this.continuationToken)), this.prefetchedResponse);
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            switch (this.currentState) {
                case EOF:
                    return false;
                case READ_REQUIRED:
                    prefetch();
                    try {
                        BeamFnApi.StateResponse stateResponse = this.prefetchedResponse.get();
                        this.prefetchedResponse = null;
                        this.continuationToken = stateResponse.getGet().getContinuationToken();
                        this.next = stateResponse.getGet().getData();
                        this.currentState = State.HAS_NEXT;
                        return true;
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new IllegalStateException(e);
                    } catch (ExecutionException e2) {
                        if (e2.getCause() == null) {
                            throw new IllegalStateException(e2);
                        }
                        Throwables.throwIfUnchecked(e2.getCause());
                        throw new IllegalStateException(e2.getCause());
                    }
                case HAS_NEXT:
                    return true;
                default:
                    throw new IllegalStateException(String.format("Unknown state %s", this.currentState));
            }
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // java.util.Iterator
        public ByteString next() {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            if (ByteString.EMPTY.equals(this.continuationToken)) {
                this.currentState = State.EOF;
            } else {
                this.currentState = State.READ_REQUIRED;
                prefetch();
            }
            return this.next;
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateFetchingIterators$LazySingletonIterator.class */
    static class LazySingletonIterator<T> implements Iterator<T> {
        private final Supplier<T> supplier;
        private boolean hasNext;

        private LazySingletonIterator(Supplier<T> supplier) {
            this.supplier = supplier;
            this.hasNext = true;
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.hasNext;
        }

        @Override // java.util.Iterator
        public T next() {
            this.hasNext = false;
            return this.supplier.get();
        }
    }

    private StateFetchingIterators() {
    }

    public static Iterator<ByteString> readAllStartingFrom(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest) {
        return new LazyBlockingStateFetchingIterator(beamFnStateClient, stateRequest);
    }

    public static <T> Iterable<T> readAllAndDecodeStartingFrom(BeamFnStateClient beamFnStateClient, BeamFnApi.StateRequest stateRequest, Coder<T> coder) {
        FirstPageAndRemainder firstPageAndRemainder = new FirstPageAndRemainder(beamFnStateClient, stateRequest);
        Objects.requireNonNull(firstPageAndRemainder);
        return Iterables.concat(new LazyCachingIteratorToIterable(new DataStreams.DataStreamDecoder(coder, DataStreams.inbound(new LazySingletonIterator(firstPageAndRemainder::firstPage)))), () -> {
            return new DataStreams.DataStreamDecoder(coder, DataStreams.inbound(firstPageAndRemainder.remainder()));
        });
    }
}
