package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.api.client.util.Preconditions;
import com.google.api.services.dataflow.model.ApproximateProgress;
import com.google.api.services.dataflow.model.Position;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.IterableCoder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.util.BatchModeExecutionContext;
import com.google.cloud.dataflow.sdk.util.CoderUtils;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.Reiterable;
import com.google.cloud.dataflow.sdk.util.common.Reiterator;
import com.google.cloud.dataflow.sdk.util.common.worker.BatchingShuffleEntryReader;
import com.google.cloud.dataflow.sdk.util.common.worker.GroupingShuffleEntryIterator;
import com.google.cloud.dataflow.sdk.util.common.worker.KeyGroupedShuffleEntries;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntry;
import com.google.cloud.dataflow.sdk.util.common.worker.ShuffleEntryReader;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.util.Iterator;
import java.util.NoSuchElementException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader.class */
public class GroupingShuffleReader<K, V> extends Reader<WindowedValue<KV<K, Reiterable<V>>>> {
    private static final Logger LOG = LoggerFactory.getLogger(GroupingShuffleReader.class);
    final byte[] shuffleReaderConfig;
    final String startShufflePosition;
    final String stopShufflePosition;
    final BatchModeExecutionContext executionContext;
    Coder<K> keyCoder;
    Coder<V> valueCoder;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader$GroupingShuffleReaderIterator.class */
    public final class GroupingShuffleReaderIterator extends Reader.AbstractReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> {
        private final Iterator<KeyGroupedShuffleEntries> groups;
        private ByteArrayShufflePosition stopPosition;
        private ByteArrayShufflePosition promisedPosition;
        private KeyGroupedShuffleEntries nextGroup = null;

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader$GroupingShuffleReaderIterator$ValuesIterable.class */
        public final class ValuesIterable implements Reiterable<V> {
            private final Reiterable<ShuffleEntry> base;

            public ValuesIterable(Reiterable<ShuffleEntry> reiterable) {
                this.base = (Reiterable) Preconditions.checkNotNull(reiterable);
            }

            @Override // java.lang.Iterable
            public GroupingShuffleReader<K, V>.GroupingShuffleReaderIterator.ValuesIterator iterator() {
                return new ValuesIterator(this.base.iterator());
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/GroupingShuffleReader$GroupingShuffleReaderIterator$ValuesIterator.class */
        public final class ValuesIterator implements Reiterator<V> {
            private final Reiterator<ShuffleEntry> base;

            public ValuesIterator(Reiterator<ShuffleEntry> reiterator) {
                this.base = (Reiterator) Preconditions.checkNotNull(reiterator);
            }

            @Override // java.util.Iterator
            public boolean hasNext() {
                return this.base.hasNext();
            }

            @Override // java.util.Iterator
            public V next() {
                try {
                    return (V) CoderUtils.decodeFromByteArray(GroupingShuffleReader.this.valueCoder, this.base.next().getValue());
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }

            @Override // java.util.Iterator
            public void remove() {
                this.base.remove();
            }

            @Override // com.google.cloud.dataflow.sdk.util.common.Reiterator
            /* renamed from: copy */
            public GroupingShuffleReader<K, V>.GroupingShuffleReaderIterator.ValuesIterator copy2() {
                return new ValuesIterator(this.base.copy2());
            }
        }

        public GroupingShuffleReaderIterator(ShuffleEntryReader shuffleEntryReader) {
            this.stopPosition = null;
            this.promisedPosition = null;
            this.promisedPosition = ByteArrayShufflePosition.fromBase64(GroupingShuffleReader.this.startShufflePosition);
            if (this.promisedPosition == null) {
                this.promisedPosition = new ByteArrayShufflePosition(new byte[0]);
            }
            this.stopPosition = ByteArrayShufflePosition.fromBase64(GroupingShuffleReader.this.stopShufflePosition);
            this.groups = new GroupingShuffleEntryIterator(shuffleEntryReader.read(this.promisedPosition, this.stopPosition)) { // from class: com.google.cloud.dataflow.sdk.runners.worker.GroupingShuffleReader.GroupingShuffleReaderIterator.1
                @Override // com.google.cloud.dataflow.sdk.util.common.worker.GroupingShuffleEntryIterator
                protected void notifyElementRead(long j) {
                    GroupingShuffleReader.this.notifyElementRead(j);
                }
            };
        }

        private void advanceIfNecessary() {
            if (this.nextGroup == null && this.groups.hasNext()) {
                this.nextGroup = this.groups.next();
                this.promisedPosition = ByteArrayShufflePosition.of(this.nextGroup.position);
            }
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public boolean hasNext() throws IOException {
            advanceIfNecessary();
            if (this.nextGroup == null) {
                return false;
            }
            return this.stopPosition == null || this.promisedPosition.compareTo(this.stopPosition) < 0;
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public WindowedValue<KV<K, Reiterable<V>>> next() throws IOException {
            if (!hasNext()) {
                throw new NoSuchElementException();
            }
            KeyGroupedShuffleEntries keyGroupedShuffleEntries = this.nextGroup;
            this.nextGroup = null;
            Object decodeFromByteArray = CoderUtils.decodeFromByteArray(GroupingShuffleReader.this.keyCoder, keyGroupedShuffleEntries.key);
            if (GroupingShuffleReader.this.executionContext != null) {
                GroupingShuffleReader.this.executionContext.setKey(decodeFromByteArray);
            }
            return WindowedValue.valueInEmptyWindows(KV.of(decodeFromByteArray, new ValuesIterable(keyGroupedShuffleEntries.values)));
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.AbstractReaderIterator, com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.Progress getProgress() {
            Position position = new Position();
            ApproximateProgress approximateProgress = new ApproximateProgress();
            position.setShufflePosition(this.promisedPosition.encodeBase64());
            approximateProgress.setPosition(position);
            return SourceTranslationUtils.cloudProgressToReaderProgress(approximateProgress);
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.AbstractReaderIterator, com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public Reader.ForkResult requestFork(Reader.ForkRequest forkRequest) {
            Preconditions.checkNotNull(forkRequest);
            Position position = SourceTranslationUtils.forkRequestToApproximateProgress(forkRequest).getPosition();
            if (position == null) {
                GroupingShuffleReader.LOG.warn("GroupingShuffleReader only supports fork at a Position. Requested: {}", forkRequest);
                return null;
            }
            String shufflePosition = position.getShufflePosition();
            if (shufflePosition == null) {
                GroupingShuffleReader.LOG.warn("GroupingShuffleReader only supports fork at a shuffle position. Requested: {}", position);
                return null;
            }
            ByteArrayShufflePosition fromBase64 = ByteArrayShufflePosition.fromBase64(shufflePosition);
            if (fromBase64.compareTo(this.promisedPosition) <= 0) {
                GroupingShuffleReader.LOG.info("Already progressed to promised shuffle position {} which is after the requested fork shuffle position {}", this.promisedPosition.encodeBase64(), shufflePosition);
                return null;
            }
            if (this.stopPosition == null || fromBase64.compareTo(this.stopPosition) < 0) {
                this.stopPosition = fromBase64;
                GroupingShuffleReader.LOG.info("Forked GroupingShuffleReader at {}", shufflePosition);
                return new Reader.ForkResultWithPosition(SourceTranslationUtils.cloudPositionToReaderPosition(position));
            }
            String valueOf = String.valueOf(String.valueOf(shufflePosition));
            String valueOf2 = String.valueOf(String.valueOf(this.stopPosition.encodeBase64()));
            throw new IllegalArgumentException(new StringBuilder(101 + valueOf.length() + valueOf2.length()).append("Fork requested at a shuffle position beyond the end of the current range: ").append(valueOf).append(" >= current stop position: ").append(valueOf2).toString());
        }
    }

    public GroupingShuffleReader(PipelineOptions pipelineOptions, byte[] bArr, String str, String str2, Coder<WindowedValue<KV<K, Iterable<V>>>> coder, BatchModeExecutionContext batchModeExecutionContext) throws Exception {
        this.shuffleReaderConfig = bArr;
        this.startShufflePosition = str;
        this.stopShufflePosition = str2;
        this.executionContext = batchModeExecutionContext;
        initCoder(coder);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public Reader.ReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> iterator() throws IOException {
        Preconditions.checkArgument(this.shuffleReaderConfig != null);
        return iterator(new BatchingShuffleEntryReader(new ChunkingShuffleBatchReader(new ApplianceShuffleReader(this.shuffleReaderConfig))));
    }

    private void initCoder(Coder<WindowedValue<KV<K, Iterable<V>>>> coder) throws Exception {
        if (!(coder instanceof WindowedValue.WindowedValueCoder)) {
            String valueOf = String.valueOf(String.valueOf(coder));
            throw new Exception(new StringBuilder(44 + valueOf.length()).append("unexpected kind of coder for WindowedValue: ").append(valueOf).toString());
        }
        Coder valueCoder = ((WindowedValue.WindowedValueCoder) coder).getValueCoder();
        if (!(valueCoder instanceof KvCoder)) {
            String valueOf2 = String.valueOf(String.valueOf("unexpected kind of coder for elements read from a key-grouping shuffle: "));
            String valueOf3 = String.valueOf(String.valueOf(valueCoder));
            throw new Exception(new StringBuilder(0 + valueOf2.length() + valueOf3.length()).append(valueOf2).append(valueOf3).toString());
        }
        KvCoder kvCoder = (KvCoder) valueCoder;
        this.keyCoder = kvCoder.getKeyCoder();
        Coder<V> valueCoder2 = kvCoder.getValueCoder();
        if (!(valueCoder2 instanceof IterableCoder)) {
            throw new Exception("unexpected kind of coder for values of KVs read from a key-grouping shuffle");
        }
        this.valueCoder = (Coder<V>) ((IterableCoder) valueCoder2).getElemCoder();
    }

    final Reader.ReaderIterator<WindowedValue<KV<K, Reiterable<V>>>> iterator(ShuffleEntryReader shuffleEntryReader) throws IOException {
        return new GroupingShuffleReaderIterator(shuffleEntryReader);
    }
}
