package org.apache.beam.runners.jet.processors;

import com.hazelcast.cluster.Address;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.impl.util.ExceptionUtil;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.jet.Utils;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.util.WindowedValue;

/* loaded from: input_file:org/apache/beam/runners/jet/processors/BoundedSourceP.class */
public class BoundedSourceP<T> extends AbstractProcessor implements Traverser {
    private final Traverser<BoundedSource<T>> shardsTraverser;
    private final PipelineOptions options;
    private final Coder outputCoder;
    private final String ownerId;
    private BoundedSource.BoundedReader currentReader;

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/BoundedSourceP$BoundedSourceMetaProcessorSupplier.class */
    private static class BoundedSourceMetaProcessorSupplier<T> implements ProcessorMetaSupplier {
        private final BoundedSource<T> boundedSource;
        private final SerializablePipelineOptions options;
        private final Coder outputCoder;
        private final String ownerId;
        private transient List<? extends BoundedSource<T>> shards;

        private BoundedSourceMetaProcessorSupplier(BoundedSource<T> boundedSource, SerializablePipelineOptions serializablePipelineOptions, Coder coder, String str) {
            this.boundedSource = boundedSource;
            this.options = serializablePipelineOptions;
            this.outputCoder = coder;
            this.ownerId = str;
        }

        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            this.shards = this.boundedSource.split(Math.max(1L, this.boundedSource.getEstimatedSizeBytes(this.options.get()) / context.totalParallelism()), this.options.get());
        }

        @Nonnull
        public Function<? super Address, ? extends ProcessorSupplier> get(@Nonnull List<Address> list) {
            return address -> {
                return new BoundedSourceProcessorSupplier(Utils.roundRobinSubList(this.shards, list.indexOf(address), list.size()), this.options, this.outputCoder, this.ownerId);
            };
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/jet/processors/BoundedSourceP$BoundedSourceProcessorSupplier.class */
    private static class BoundedSourceProcessorSupplier<T> implements ProcessorSupplier {
        private final List<BoundedSource<T>> shards;
        private final SerializablePipelineOptions options;
        private final Coder outputCoder;
        private final String ownerId;

        private BoundedSourceProcessorSupplier(List<BoundedSource<T>> list, SerializablePipelineOptions serializablePipelineOptions, Coder coder, String str) {
            this.shards = list;
            this.options = serializablePipelineOptions;
            this.outputCoder = coder;
            this.ownerId = str;
        }

        public void init(@Nonnull ProcessorSupplier.Context context) {
        }

        @Nonnull
        public Collection<? extends Processor> get(int i) {
            ArrayList arrayList = new ArrayList(i);
            for (int i2 = 0; i2 < i; i2++) {
                arrayList.add(new BoundedSourceP(Utils.roundRobinSubList(this.shards, i2, i), this.options.get(), this.outputCoder, this.ownerId));
            }
            return arrayList;
        }
    }

    BoundedSourceP(List<BoundedSource<T>> list, PipelineOptions pipelineOptions, Coder coder, String str) {
        this.shardsTraverser = Traversers.traverseIterable(list);
        this.options = pipelineOptions;
        this.outputCoder = coder;
        this.ownerId = str;
    }

    protected void init(@Nonnull Processor.Context context) throws Exception {
        nextShard();
    }

    public Object next() {
        if (this.currentReader == null) {
            return null;
        }
        try {
            WindowedValue timestampedValueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(this.currentReader.getCurrent(), this.currentReader.getCurrentTimestamp());
            if (!this.currentReader.advance()) {
                nextShard();
            }
            return this.outputCoder == null ? timestampedValueInGlobalWindow : Utils.encode(timestampedValueInGlobalWindow, this.outputCoder);
        } catch (IOException e) {
            throw ExceptionUtil.rethrow(e);
        }
    }

    private void nextShard() throws IOException {
        do {
            if (this.currentReader != null) {
                this.currentReader.close();
                this.currentReader = null;
            }
            BoundedSource boundedSource = (BoundedSource) this.shardsTraverser.next();
            if (boundedSource == null) {
                return;
            } else {
                this.currentReader = boundedSource.createReader(this.options);
            }
        } while (!this.currentReader.start());
    }

    public boolean complete() {
        return emitFromTraverser(this);
    }

    public boolean isCooperative() {
        return false;
    }

    public void close() throws Exception {
        if (this.currentReader != null) {
            this.currentReader.close();
        }
    }

    public static <T> ProcessorMetaSupplier supplier(BoundedSource<T> boundedSource, SerializablePipelineOptions serializablePipelineOptions, Coder coder, String str) {
        return new BoundedSourceMetaProcessorSupplier(boundedSource, serializablePipelineOptions, coder, str);
    }
}
