/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AsyncExceptionChecker;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousFileReaderOperator<OUT, S extends Serializable>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<FileInputSplit, OUT>,
OutputTypeConfigurable<OUT>,
AsyncExceptionChecker {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
    private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1L, -1L, null);
    private FileInputFormat<OUT> format;
    private TypeSerializer<OUT> serializer;
    private transient Object checkpointLock;
    private transient SplitReader<S, OUT> reader;
    private transient SourceFunction.SourceContext<OUT> readerContext;
    private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;

    public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
        this.format = (FileInputFormat)Preconditions.checkNotNull(format);
    }

    @Override
    public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
        this.serializer = outTypeInfo.createSerializer(executionConfig);
    }

    @Override
    public void open() throws Exception {
        super.open();
        Preconditions.checkState((this.reader == null ? 1 : 0) != 0, (Object)"The reader is already initialized.");
        Preconditions.checkState((this.serializer != null ? 1 : 0) != 0, (Object)"The serializer has not been set. Probably the setOutputType() was not called. Please report it.");
        this.format.setRuntimeContext((RuntimeContext)this.getRuntimeContext());
        this.format.configure(new Configuration());
        this.checkpointLock = this.getContainingTask().getCheckpointLock();
        TimeCharacteristic timeCharacteristic = this.getOperatorConfig().getTimeCharacteristic();
        switch (timeCharacteristic) {
            case EventTime: {
                this.readerContext = new StreamSource.ManualWatermarkContext<OUT>(this, this.checkpointLock, this.output);
                break;
            }
            case IngestionTime: {
                long watermarkInterval = this.getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval();
                this.readerContext = new StreamSource.AutomaticWatermarkContext<OUT>(this, this.checkpointLock, this.output, watermarkInterval);
                break;
            }
            case ProcessingTime: {
                this.readerContext = new StreamSource.NonTimestampContext<OUT>(this, this.checkpointLock, this.output);
                break;
            }
            default: {
                throw new Exception(String.valueOf((Object)timeCharacteristic));
            }
        }
        this.reader = new SplitReader(this.format, this.serializer, this.readerContext, this.checkpointLock, this.readerState);
        this.readerState = null;
        this.reader.start();
    }

    @Override
    public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
        ((SplitReader)this.reader).addSplit(element.getValue());
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
    }

    @Override
    public void dispose() {
        super.dispose();
        this.reader.cancel();
        try {
            this.reader.join(200L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        while (this.reader.isAlive()) {
            StackTraceElement[] stack;
            StringBuilder bld = new StringBuilder();
            for (StackTraceElement e : stack = this.reader.getStackTrace()) {
                bld.append(e).append('\n');
            }
            LOG.warn("The reader is stuck in method:\n {}", (Object)bld.toString());
            this.reader.interrupt();
            try {
                this.reader.join(50L);
            }
            catch (InterruptedException interruptedException) {}
        }
        this.reader = null;
        this.readerContext = null;
        this.readerState = null;
        this.format = null;
        this.serializer = null;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            ((SplitReader)this.reader).addSplit(ContinuousFileReaderOperator.EOS);
            this.checkpointLock.wait();
        }
        if (this.readerContext != null) {
            this.readerContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.readerContext.close();
        }
        this.output.close();
    }

    @Override
    public void checkAsyncException() {
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        AbstractStateBackend.CheckpointStateOutputStream os;
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        try {
            os = this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
        }
        catch (Exception e) {
            try {
                taskState.discardState();
            }
            catch (Exception discardException) {
                LOG.warn("Could not discard stream task state of {}.", (Object)this.getOperatorName(), (Object)discardException);
            }
            throw new Exception("Could not create the checkpoint state output stream for " + this.getOperatorName() + '.', e);
        }
        try {
            ObjectOutputStream oos = new ObjectOutputStream((OutputStream)os);
            AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
            Tuple3 readerState = ((SplitReader)this.reader).getReaderState();
            List pendingSplits = (List)readerState.f0;
            FileInputSplit currSplit = (FileInputSplit)readerState.f1;
            Serializable formatState = (Serializable)readerState.f2;
            oos.writeObject(currSplit);
            ov.writeInt(pendingSplits.size());
            for (FileInputSplit split : pendingSplits) {
                oos.writeObject(split);
            }
            oos.writeObject(formatState);
            oos.flush();
        }
        catch (Exception exception) {
            try {
                taskState.discardState();
            }
            catch (Exception discardException) {
                LOG.warn("Could not discard the stream task state of {}.", (Object)this.getOperatorName(), (Object)discardException);
            }
            try {
                os.close();
            }
            catch (Exception closingException) {
                LOG.warn("Could not close the checkpoint state output stream belonging to {}. The written data might not be deleted.", (Object)this.getOperatorName(), (Object)closingException);
            }
            throw new Exception("Could not write the stream task state of " + this.getOperatorName() + " into the checkpoint state output view.", exception);
        }
        try {
            taskState.setOperatorState((StateHandle<?>)os.closeAndGetHandle());
        }
        catch (Exception e) {
            try {
                taskState.discardState();
            }
            catch (Exception discardException) {
                LOG.warn("Could not discard stream task state of {}.", (Object)this.getOperatorName(), (Object)discardException);
            }
            throw new Exception("Could not close and get state handle from checkpoint state output stream belonging to " + this.getOperatorName() + '.', e);
        }
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState state) throws Exception {
        super.restoreState(state);
        StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
        InputStream is = (InputStream)stream.getState(this.getUserCodeClassloader());
        ObjectInputStream ois = new ObjectInputStream(is);
        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
        FileInputSplit currSplit = (FileInputSplit)ois.readObject();
        LinkedList<FileInputSplit> pendingSplits = new LinkedList<FileInputSplit>();
        int noOfSplits = div.readInt();
        for (int i = 0; i < noOfSplits; ++i) {
            FileInputSplit split = (FileInputSplit)ois.readObject();
            pendingSplits.add(split);
        }
        Serializable formatState = (Serializable)ois.readObject();
        Preconditions.checkState((this.readerState == null ? 1 : 0) != 0, (Object)"The reader state has already been initialized.");
        this.readerState = new Tuple3(pendingSplits, (Object)currSplit, (Object)formatState);
        div.close();
    }

    private class SplitReader<S extends Serializable, OT>
    extends Thread {
        private volatile boolean isRunning;
        private final FileInputFormat<OT> format;
        private final TypeSerializer<OT> serializer;
        private final Object checkpointLock;
        private final SourceFunction.SourceContext<OT> readerContext;
        private final Queue<FileInputSplit> pendingSplits;
        private FileInputSplit currentSplit = null;
        private S restoredFormatState = null;
        private volatile boolean isSplitOpen = false;

        private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, SourceFunction.SourceContext<OT> readerContext, Object checkpointLock, Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
            this.format = (FileInputFormat)Preconditions.checkNotNull(format, (String)"Unspecified FileInputFormat.");
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer, (String)"Unspecified Serializer.");
            this.readerContext = (SourceFunction.SourceContext)Preconditions.checkNotNull(readerContext, (String)"Unspecified Reader Context.");
            this.checkpointLock = Preconditions.checkNotNull((Object)checkpointLock, (String)"Unspecified checkpoint lock.");
            this.pendingSplits = new ArrayDeque<FileInputSplit>();
            this.isRunning = true;
            if (restoredState != null) {
                List pending = (List)restoredState.f0;
                FileInputSplit current = (FileInputSplit)restoredState.f1;
                Serializable formatState = (Serializable)restoredState.f2;
                for (FileInputSplit split : pending) {
                    this.pendingSplits.add(split);
                }
                this.currentSplit = current;
                this.restoredFormatState = formatState;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addSplit(FileInputSplit split) {
            Preconditions.checkNotNull((Object)split, (String)"Cannot insert a null value in the pending splits queue.");
            Object object = this.checkpointLock;
            synchronized (object) {
                this.pendingSplits.add(split);
            }
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object completedSplitsCounter;
            try {
                completedSplitsCounter = ContinuousFileReaderOperator.this.getMetricGroup().counter("numSplitsProcessed");
                this.format.openInputFormat();
                while (this.isRunning) {
                    Object object = this.checkpointLock;
                    synchronized (object) {
                        if (this.currentSplit != null) {
                            if (this.currentSplit.equals((Object)EOS)) {
                                this.isRunning = false;
                                break;
                            }
                            if (this.format instanceof CheckpointableInputFormat && this.restoredFormatState != null) {
                                CheckpointableInputFormat checkpointableFormat = (CheckpointableInputFormat)this.format;
                                checkpointableFormat.reopen((InputSplit)this.currentSplit, this.restoredFormatState);
                            } else {
                                LOG.info("Format " + this.format.getClass().getName() + " does not support checkpointing.");
                                this.format.open(this.currentSplit);
                            }
                            this.restoredFormatState = null;
                        } else {
                            this.currentSplit = this.pendingSplits.poll();
                            if (this.currentSplit == null) {
                                this.checkpointLock.wait(50L);
                                continue;
                            }
                            if (this.currentSplit.equals((Object)EOS)) {
                                this.isRunning = false;
                                break;
                            }
                            this.format.open(this.currentSplit);
                        }
                        this.isSplitOpen = true;
                    }
                    LOG.info("Reading split: " + this.currentSplit);
                    try {
                        Object nextElement = this.serializer.createInstance();
                        while (!this.format.reachedEnd()) {
                            Object object2 = this.checkpointLock;
                            synchronized (object2) {
                                nextElement = this.format.nextRecord(nextElement);
                                if (nextElement == null) {
                                    break;
                                }
                                this.readerContext.collect(nextElement);
                            }
                        }
                        completedSplitsCounter.inc();
                    }
                    finally {
                        object = this.checkpointLock;
                        synchronized (object) {
                            this.format.close();
                            this.isSplitOpen = false;
                            this.currentSplit = null;
                        }
                    }
                }
            }
            catch (Throwable e) {
                if (this.isRunning) {
                    LOG.error("Caught exception processing split: ", (Object)this.currentSplit);
                }
                ContinuousFileReaderOperator.this.getContainingTask().failExternally(e);
            }
            finally {
                completedSplitsCounter = this.checkpointLock;
                synchronized (completedSplitsCounter) {
                    LOG.info("Reader terminated, and exiting...");
                    this.format.closeInputFormat();
                    this.isSplitOpen = false;
                    this.currentSplit = null;
                    this.isRunning = false;
                    this.checkpointLock.notifyAll();
                }
            }
        }

        private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
            ArrayList<FileInputSplit> snapshot = new ArrayList<FileInputSplit>(this.pendingSplits.size());
            for (FileInputSplit split : this.pendingSplits) {
                snapshot.add(split);
            }
            if (this.currentSplit != null && this.currentSplit.equals((Object)this.pendingSplits.peek())) {
                this.pendingSplits.remove();
            }
            if (this.currentSplit != null) {
                if (this.format instanceof CheckpointableInputFormat) {
                    CheckpointableInputFormat checkpointableFormat = (CheckpointableInputFormat)this.format;
                    Serializable formatState = this.isSplitOpen ? checkpointableFormat.getCurrentState() : this.restoredFormatState;
                    return new Tuple3(snapshot, (Object)this.currentSplit, (Object)formatState);
                }
                LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
                return new Tuple3(snapshot, (Object)this.currentSplit, null);
            }
            return new Tuple3(snapshot, null, null);
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

