package org.apache.beam.fn.harness.state;

import com.google.auto.service.AutoService;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectStreamException;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.function.Supplier;
import org.apache.beam.fn.harness.Cache;
import org.apache.beam.fn.harness.Caches;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.runners.core.construction.CoderTranslation;
import org.apache.beam.runners.core.construction.CoderTranslator;
import org.apache.beam.runners.core.construction.CoderTranslatorRegistrar;
import org.apache.beam.runners.core.construction.ModelCoders;
import org.apache.beam.sdk.coders.IterableLikeCoder;
import org.apache.beam.sdk.fn.stream.PrefetchableIterable;
import org.apache.beam.sdk.fn.stream.PrefetchableIterators;
import org.apache.beam.sdk.util.BufferedElementCountingOutputStream;
import org.apache.beam.sdk.util.VarInt;
import org.apache.beam.vendor.grpc.v1p48p1.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.io.ByteStreams;

/* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterable.class */
public class StateBackedIterable<T> implements Iterable<T>, Serializable {

    @VisibleForTesting
    final BeamFnApi.StateRequest request;

    @VisibleForTesting
    final List<T> prefix;
    private final transient PrefetchableIterable<T> suffix;

    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterable$Coder.class */
    public static class Coder<T> extends IterableLikeCoder<T, Iterable<T>> {
        private final Supplier<Cache<?, ?>> cache;
        private final BeamFnStateClient beamFnStateClient;
        private final Supplier<String> instructionId;

        public Coder(Supplier<Cache<?, ?>> supplier, BeamFnStateClient beamFnStateClient, Supplier<String> supplier2, org.apache.beam.sdk.coders.Coder<T> coder) {
            super(coder, "StateBackedIterable");
            this.cache = supplier;
            this.beamFnStateClient = beamFnStateClient;
            this.instructionId = supplier2;
        }

        @Override // org.apache.beam.sdk.coders.IterableLikeCoder
        protected Iterable<T> decodeToIterable(List<T> list) {
            return list;
        }

        @Override // org.apache.beam.sdk.coders.IterableLikeCoder
        protected Iterable<T> decodeToIterable(List<T> list, long j, InputStream inputStream) throws IOException {
            if (j == -1) {
                return new StateBackedIterable(this.cache.get(), this.beamFnStateClient, this.instructionId.get(), BeamFnApi.StateKey.newBuilder().setRunner(BeamFnApi.StateKey.Runner.newBuilder().setKey(ByteString.readFrom(ByteStreams.limit(inputStream, VarInt.decodeLong(inputStream))))).build(), getElemCoder(), list);
            }
            throw new IllegalStateException(String.format("StateBackedIterable expected terminator of 0 or -1 but received %s.", Long.valueOf(j)));
        }

        @Override // org.apache.beam.sdk.coders.IterableLikeCoder, org.apache.beam.sdk.coders.Coder
        public void encode(Iterable<T> iterable, OutputStream outputStream) throws IOException {
            if (!(iterable instanceof StateBackedIterable)) {
                super.encode((Coder<T>) iterable, outputStream);
                return;
            }
            StateBackedIterable stateBackedIterable = (StateBackedIterable) iterable;
            DataOutputStream dataOutputStream = new DataOutputStream(outputStream);
            dataOutputStream.writeInt(-1);
            BufferedElementCountingOutputStream bufferedElementCountingOutputStream = new BufferedElementCountingOutputStream(dataOutputStream, -1L);
            for (T t : stateBackedIterable.prefix) {
                bufferedElementCountingOutputStream.markElementStart();
                getElemCoder().encode(t, bufferedElementCountingOutputStream);
            }
            bufferedElementCountingOutputStream.finish();
            dataOutputStream.flush();
            VarInt.encode(stateBackedIterable.request.getStateKey().getRunner().getKey().size(), outputStream);
            stateBackedIterable.request.getStateKey().getRunner().getKey().writeTo(outputStream);
        }
    }

    @AutoService({CoderTranslatorRegistrar.class})
    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterable$Registrar.class */
    public static class Registrar implements CoderTranslatorRegistrar {
        @Override // org.apache.beam.runners.core.construction.CoderTranslatorRegistrar
        public Map<Class<? extends org.apache.beam.sdk.coders.Coder>, String> getCoderURNs() {
            return Collections.singletonMap(Coder.class, ModelCoders.STATE_BACKED_ITERABLE_CODER_URN);
        }

        @Override // org.apache.beam.runners.core.construction.CoderTranslatorRegistrar
        public Map<Class<? extends org.apache.beam.sdk.coders.Coder>, CoderTranslator<? extends org.apache.beam.sdk.coders.Coder>> getCoderTranslators() {
            return ImmutableMap.of(Coder.class, new Translator());
        }
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterable$StateBackedIterableTranslationContext.class */
    public interface StateBackedIterableTranslationContext extends CoderTranslation.TranslationContext {
        Supplier<Cache<?, ?>> getCache();

        BeamFnStateClient getStateClient();

        Supplier<String> getCurrentInstructionId();
    }

    /* loaded from: input_file:org/apache/beam/fn/harness/state/StateBackedIterable$Translator.class */
    private static class Translator implements CoderTranslator<Coder<?>> {
        private Translator() {
        }

        @Override // org.apache.beam.runners.core.construction.CoderTranslator
        public List<? extends org.apache.beam.sdk.coders.Coder<?>> getComponents(Coder<?> coder) {
            return Collections.singletonList(coder.getElemCoder());
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.beam.runners.core.construction.CoderTranslator
        public Coder<?> fromComponents(List<org.apache.beam.sdk.coders.Coder<?>> list, byte[] bArr, CoderTranslation.TranslationContext translationContext) {
            if (translationContext instanceof StateBackedIterableTranslationContext) {
                return new Coder<>(((StateBackedIterableTranslationContext) translationContext).getCache(), ((StateBackedIterableTranslationContext) translationContext).getStateClient(), ((StateBackedIterableTranslationContext) translationContext).getCurrentInstructionId(), (org.apache.beam.sdk.coders.Coder) Iterables.getOnlyElement(list));
            }
            throw new IllegalStateException(String.format("Unable to construct coder %s. Expected translation context %s but received %s.", ModelCoders.STATE_BACKED_ITERABLE_CODER_URN, StateBackedIterableTranslationContext.class.getName(), translationContext.getClass().getName()));
        }

        @Override // org.apache.beam.runners.core.construction.CoderTranslator
        public /* bridge */ /* synthetic */ Coder<?> fromComponents(List list, byte[] bArr, CoderTranslation.TranslationContext translationContext) {
            return fromComponents((List<org.apache.beam.sdk.coders.Coder<?>>) list, bArr, translationContext);
        }
    }

    public StateBackedIterable(Cache<?, ?> cache, BeamFnStateClient beamFnStateClient, String str, BeamFnApi.StateKey stateKey, org.apache.beam.sdk.coders.Coder<T> coder, List<T> list) {
        this.request = BeamFnApi.StateRequest.newBuilder().setInstructionId(str).setStateKey(stateKey).build();
        this.prefix = list;
        this.suffix = StateFetchingIterators.readAllAndDecodeStartingFrom(Caches.subCache(cache, stateKey, new Object[0]), beamFnStateClient, this.request, coder);
    }

    @Override // java.lang.Iterable
    public Iterator<T> iterator() {
        return PrefetchableIterators.concat(this.prefix.iterator(), this.suffix.iterator());
    }

    protected Object writeReplace() throws ObjectStreamException {
        return ImmutableList.copyOf(this);
    }
}
