package org.apache.beam.runners.twister2.translation.wrappers;

import edu.iu.dsc.tws.api.config.Config;
import edu.iu.dsc.tws.api.tset.TSetContext;
import edu.iu.dsc.tws.api.tset.fn.BaseSourceFunc;
import java.io.IOException;
import java.io.ObjectStreamException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.logging.Logger;
import org.apache.beam.runners.core.construction.Environments;
import org.apache.beam.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.core.construction.SdkComponents;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.twister2.Twister2TranslationContext;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.WindowedValue;

/* loaded from: input_file:org/apache/beam/runners/twister2/translation/wrappers/Twister2BoundedSource.class */
public class Twister2BoundedSource<T> extends BaseSourceFunc<WindowedValue<T>> {
    private static final Logger LOG = Logger.getLogger(Twister2BoundedSource.class.getName());
    private transient BoundedSource<T> source;
    private int numPartitions;
    private long splitSize;
    private transient Config twister2Config;
    private List<? extends Source<T>> partitionedSources;
    private Source<T> localPartition;
    private transient PipelineOptions options;
    private String serializedOptions;
    private transient Iterator<WindowedValue<T>> readerIterator;
    private static final long DEFAULT_BUNDLE_SIZE = 67108864;
    private boolean isInitialized;
    private byte[] sourceBytes;

    /* loaded from: input_file:org/apache/beam/runners/twister2/translation/wrappers/Twister2BoundedSource$ReaderToIteratorAdapter.class */
    static class ReaderToIteratorAdapter<T> implements Iterator<WindowedValue<T>> {
        private static final boolean FAILED_TO_OBTAIN_NEXT = false;
        private static final boolean SUCCESSFULLY_OBTAINED_NEXT = true;
        private final List<Source.Reader<T>> readers;
        Source.Reader<T> reader;
        private int readerIndex;
        private boolean started = false;
        private boolean closed = false;
        private WindowedValue<T> next = null;

        public ReaderToIteratorAdapter(List<Source.Reader<T>> list) {
            this.readerIndex = FAILED_TO_OBTAIN_NEXT;
            this.readers = list;
            this.readerIndex = FAILED_TO_OBTAIN_NEXT;
            this.reader = list.get(this.readerIndex);
        }

        private boolean tryProduceNext() {
            try {
                if (!seekNext()) {
                    return false;
                }
                this.next = WindowedValue.timestampedValueInGlobalWindow(this.reader.getCurrent(), this.reader.getCurrentTimestamp());
                return true;
            } catch (Exception e) {
                throw new RuntimeException("Failed to read data.", e);
            }
        }

        private void close() {
            try {
                this.reader.close();
                this.readerIndex += SUCCESSFULLY_OBTAINED_NEXT;
                if (this.readerIndex == this.readers.size()) {
                    this.closed = true;
                } else {
                    this.reader = this.readers.get(this.readerIndex);
                    this.started = false;
                }
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        private boolean seekNext() throws IOException {
            if (this.started) {
                return !this.closed && advance(false);
            }
            this.started = true;
            return this.reader.start() || advance(true);
        }

        private boolean advance(boolean z) throws IOException {
            if (!z && this.reader.advance()) {
                return true;
            }
            close();
            return seekNext();
        }

        private WindowedValue<T> consumeCurrent() {
            if (this.next == null) {
                throw new NoSuchElementException();
            }
            WindowedValue<T> windowedValue = this.next;
            this.next = null;
            return windowedValue;
        }

        private WindowedValue<T> consumeNext() {
            if (this.next == null) {
                tryProduceNext();
            }
            return consumeCurrent();
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            return this.next != null || tryProduceNext();
        }

        @Override // java.util.Iterator
        public WindowedValue<T> next() {
            return consumeNext();
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    private Twister2BoundedSource() {
        this.splitSize = 100L;
        this.isInitialized = false;
        this.isInitialized = false;
    }

    public Twister2BoundedSource(BoundedSource<T> boundedSource, Twister2TranslationContext twister2TranslationContext, PipelineOptions pipelineOptions) {
        this.splitSize = 100L;
        this.isInitialized = false;
        this.source = boundedSource;
        this.options = pipelineOptions;
        this.serializedOptions = new SerializablePipelineOptions(pipelineOptions).toString();
        SdkComponents create = SdkComponents.create();
        create.registerEnvironment(Environments.createOrGetDefaultEnvironment(pipelineOptions.as(PortablePipelineOptions.class)));
        this.sourceBytes = ReadTranslation.toProto(this.source, create).getPayload().toByteArray();
    }

    public void prepare(TSetContext tSetContext) {
        initTransient();
        this.numPartitions = tSetContext.getParallelism();
        try {
            this.splitSize = this.source.getEstimatedSizeBytes(this.options) / this.numPartitions;
        } catch (Exception e) {
            LOG.warning(String.format("Failed to get estimated bundle size for source %s, using default bundle size of %d bytes.", this.source.toString(), Long.valueOf(DEFAULT_BUNDLE_SIZE)));
        }
        this.twister2Config = tSetContext.getConfig();
        int index = tSetContext.getIndex();
        ArrayList arrayList = new ArrayList();
        try {
            this.partitionedSources = this.source.split(this.splitSize, this.options);
            if (this.partitionedSources.size() == 0) {
                arrayList.add(this.source);
            } else if (this.numPartitions == this.partitionedSources.size()) {
                this.localPartition = this.partitionedSources.get(index);
                arrayList.add(this.localPartition);
            } else {
                int floor = (int) Math.floor(this.partitionedSources.size() / this.numPartitions);
                int size = this.partitionedSources.size() % this.numPartitions;
                int i = floor + (index < size ? 1 : 0);
                int i2 = (floor * index) + (index < size ? index : size);
                for (int i3 = i2; i3 < i2 + i; i3++) {
                    arrayList.add(this.partitionedSources.get(i3));
                }
            }
            ArrayList arrayList2 = new ArrayList();
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add(createReader((Source) it.next()));
            }
            this.readerIterator = new ReaderToIteratorAdapter(arrayList2);
        } catch (Exception e2) {
            throw new RuntimeException("Failed to create partitions for source " + this.source.getClass().getSimpleName(), e2);
        }
    }

    public boolean hasNext() {
        return this.readerIterator.hasNext();
    }

    /* renamed from: next, reason: merged with bridge method [inline-methods] */
    public WindowedValue<T> m5next() {
        return this.readerIterator.next();
    }

    private BoundedSource.BoundedReader<T> createReader(Source<T> source) {
        try {
            return ((BoundedSource) source).createReader(this.options);
        } catch (IOException e) {
            throw new RuntimeException("Failed to create reader from a BoundedSource.", e);
        }
    }

    private void initTransient() {
        if (this.isInitialized) {
            return;
        }
        this.options = new SerializablePipelineOptions(this.serializedOptions).get();
        this.source = (BoundedSource) SerializableUtils.deserializeFromByteArray(this.sourceBytes, "WindowFn");
        this.isInitialized = true;
    }

    protected Object readResolve() throws ObjectStreamException {
        return this;
    }
}
