/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.runners.flink.translation.wrappers;

import java.io.IOException;
import java.util.List;
import org.apache.beam.runners.core.construction.SerializablePipelineOptions;
import org.apache.beam.runners.flink.metrics.FlinkMetricContainer;
import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil;
import org.apache.beam.runners.flink.translation.wrappers.SourceInputSplit;
import org.apache.beam.sdk.io.BoundedSource;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.transforms.windowing.PaneInfo;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
import org.apache.flink.api.common.io.RichInputFormat;
import org.apache.flink.api.common.io.statistics.BaseStatistics;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.io.InputSplitAssigner;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SourceInputFormat<T>
extends RichInputFormat<WindowedValue<T>, SourceInputSplit<T>> {
    private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class);
    private final String stepName;
    private final BoundedSource<T> initialSource;
    private transient PipelineOptions options;
    private final SerializablePipelineOptions serializedOptions;
    private transient BoundedSource.BoundedReader<T> reader;
    private boolean inputAvailable = false;
    private transient ReaderInvocationUtil<T, BoundedSource.BoundedReader<T>> readerInvoker;
    private transient FlinkMetricContainer metricContainer;

    public SourceInputFormat(String stepName, BoundedSource<T> initialSource, PipelineOptions options) {
        this.stepName = stepName;
        this.initialSource = initialSource;
        this.serializedOptions = new SerializablePipelineOptions(options);
    }

    public void configure(Configuration configuration) {
        this.options = this.serializedOptions.get();
    }

    public void open(SourceInputSplit<T> sourceInputSplit) throws IOException {
        this.metricContainer = new FlinkMetricContainer(this.getRuntimeContext());
        this.readerInvoker = new ReaderInvocationUtil(this.stepName, this.serializedOptions.get(), this.metricContainer);
        this.reader = ((BoundedSource)sourceInputSplit.getSource()).createReader(this.options);
        this.inputAvailable = this.readerInvoker.invokeStart(this.reader);
    }

    public BaseStatistics getStatistics(BaseStatistics baseStatistics) throws IOException {
        try {
            final long estimatedSize = this.initialSource.getEstimatedSizeBytes(this.options);
            return new BaseStatistics(){

                public long getTotalInputSize() {
                    return estimatedSize;
                }

                public long getNumberOfRecords() {
                    return -1L;
                }

                public float getAverageRecordWidth() {
                    return -1.0f;
                }
            };
        }
        catch (Exception e) {
            LOG.warn("Could not read Source statistics: {}", (Throwable)e);
            return null;
        }
    }

    public SourceInputSplit<T>[] createInputSplits(int numSplits) throws IOException {
        try {
            long desiredSizeBytes = this.initialSource.getEstimatedSizeBytes(this.options) / (long)numSplits;
            List shards = this.initialSource.split(desiredSizeBytes, this.options);
            int numShards = shards.size();
            SourceInputSplit[] sourceInputSplits = new SourceInputSplit[numShards];
            for (int i = 0; i < numShards; ++i) {
                sourceInputSplits[i] = new SourceInputSplit((Source)shards.get(i), i);
            }
            return sourceInputSplits;
        }
        catch (Exception e) {
            throw new IOException("Could not create input splits from Source.", e);
        }
    }

    public InputSplitAssigner getInputSplitAssigner(SourceInputSplit[] sourceInputSplits) {
        return new DefaultInputSplitAssigner((InputSplit[])sourceInputSplits);
    }

    public boolean reachedEnd() throws IOException {
        return !this.inputAvailable;
    }

    public WindowedValue<T> nextRecord(WindowedValue<T> t) throws IOException {
        if (this.inputAvailable) {
            Object current = this.reader.getCurrent();
            Instant timestamp = this.reader.getCurrentTimestamp();
            this.inputAvailable = this.readerInvoker.invokeAdvance(this.reader);
            return WindowedValue.of((Object)current, (Instant)timestamp, (BoundedWindow)GlobalWindow.INSTANCE, (PaneInfo)PaneInfo.NO_FIRING);
        }
        return null;
    }

    public void close() throws IOException {
        this.metricContainer.registerMetricsForPipelineResult();
        if (this.reader != null) {
            this.reader.close();
        }
    }
}

