package com.google.cloud.dataflow.sdk.runners.worker;

import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.KvCoder;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.runners.worker.windmill.Windmill;
import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
import com.google.cloud.dataflow.sdk.util.CloudObject;
import com.google.cloud.dataflow.sdk.util.ExecutionContext;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
import com.google.cloud.dataflow.sdk.values.KV;
import java.io.IOException;
import java.io.InputStream;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/UngroupedWindmillReader.class */
public class UngroupedWindmillReader<T> extends Reader<WindowedValue<T>> {
    private final Coder<T> valueCoder;
    private final Coder<Collection<? extends BoundedWindow>> windowsCoder;
    private StreamingModeExecutionContext context;

    /* loaded from: input_file:com/google/cloud/dataflow/sdk/runners/worker/UngroupedWindmillReader$UngroupedWindmillReaderIterator.class */
    class UngroupedWindmillReaderIterator extends Reader.AbstractReaderIterator<WindowedValue<T>> {
        private int bundleIndex = 0;
        private int messageIndex = 0;

        UngroupedWindmillReaderIterator() {
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public boolean hasNext() throws IOException {
            Windmill.WorkItem work = UngroupedWindmillReader.this.context.getWork();
            return this.bundleIndex < work.getMessageBundlesCount() && this.messageIndex < work.getMessageBundles(this.bundleIndex).getMessagesCount();
        }

        @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader.ReaderIterator
        public WindowedValue<T> next() throws IOException {
            Windmill.Message messages = UngroupedWindmillReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessages(this.messageIndex);
            if (this.messageIndex >= UngroupedWindmillReader.this.context.getWork().getMessageBundles(this.bundleIndex).getMessagesCount() - 1) {
                this.messageIndex = 0;
                this.bundleIndex++;
            } else {
                this.messageIndex++;
            }
            Instant instant = new Instant(TimeUnit.MICROSECONDS.toMillis(messages.getTimestamp()));
            InputStream newInput = messages.getData().newInput();
            InputStream newInput2 = messages.getMetadata().newInput();
            Collection<? extends BoundedWindow> decodeMetadataWindows = WindmillSink.decodeMetadataWindows(UngroupedWindmillReader.this.windowsCoder, messages.getMetadata());
            PaneInfo decodeMetadataPane = WindmillSink.decodeMetadataPane(messages.getMetadata());
            if (!(UngroupedWindmillReader.this.valueCoder instanceof KvCoder)) {
                UngroupedWindmillReader.this.notifyElementRead(newInput.available() + newInput2.available());
                return WindowedValue.of(decode(UngroupedWindmillReader.this.valueCoder, newInput), instant, decodeMetadataWindows, decodeMetadataPane);
            }
            KvCoder kvCoder = (KvCoder) UngroupedWindmillReader.this.valueCoder;
            InputStream newInput3 = UngroupedWindmillReader.this.context.getSerializedKey().newInput();
            UngroupedWindmillReader.this.notifyElementRead(newInput3.available() + newInput.available() + newInput2.available());
            return WindowedValue.of(KV.of(decode(kvCoder.getKeyCoder(), newInput3), decode(kvCoder.getValueCoder(), newInput)), instant, decodeMetadataWindows, decodeMetadataPane);
        }

        private <T> T decode(Coder<T> coder, InputStream inputStream) throws IOException {
            return coder.decode(inputStream, Coder.Context.OUTER);
        }
    }

    UngroupedWindmillReader(Coder<WindowedValue<T>> coder, StreamingModeExecutionContext streamingModeExecutionContext) {
        WindowedValue.FullWindowedValueCoder fullWindowedValueCoder = (WindowedValue.FullWindowedValueCoder) coder;
        this.valueCoder = fullWindowedValueCoder.getValueCoder();
        this.windowsCoder = fullWindowedValueCoder.getWindowsCoder();
        this.context = streamingModeExecutionContext;
    }

    public static <T> UngroupedWindmillReader<T> create(PipelineOptions pipelineOptions, CloudObject cloudObject, Coder coder, ExecutionContext executionContext) {
        return new UngroupedWindmillReader<>(coder, (StreamingModeExecutionContext) executionContext);
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public Reader.ReaderIterator<WindowedValue<T>> iterator() throws IOException {
        return new UngroupedWindmillReaderIterator();
    }

    @Override // com.google.cloud.dataflow.sdk.util.common.worker.Reader
    public boolean supportsRestart() {
        return true;
    }
}
