/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.io.InputSplit;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class ContinuousFileReaderOperator<OUT, S extends Serializable>
extends AbstractStreamOperator<OUT>
implements OneInputStreamOperator<FileInputSplit, OUT>,
OutputTypeConfigurable<OUT> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
    private static final FileInputSplit EOS = new FileInputSplit(-1, null, -1L, -1L, null);
    private transient SplitReader<S, OUT> reader;
    private transient TimestampedCollector<OUT> collector;
    private FileInputFormat<OUT> format;
    private TypeSerializer<OUT> serializer;
    private transient Object checkpointLock;
    private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;

    public ContinuousFileReaderOperator(FileInputFormat<OUT> format) {
        this.format = (FileInputFormat)Preconditions.checkNotNull(format);
    }

    @Override
    public void setOutputType(TypeInformation<OUT> outTypeInfo, ExecutionConfig executionConfig) {
        this.serializer = outTypeInfo.createSerializer(executionConfig);
    }

    @Override
    public void open() throws Exception {
        super.open();
        if (this.serializer == null) {
            throw new IllegalStateException("The serializer has not been set. Probably the setOutputType() was not called and this should not have happened. Please report it.");
        }
        this.format.setRuntimeContext((RuntimeContext)this.getRuntimeContext());
        this.format.configure(new Configuration());
        this.collector = new TimestampedCollector(this.output);
        this.checkpointLock = this.getContainingTask().getCheckpointLock();
        Preconditions.checkState((this.reader == null ? 1 : 0) != 0, (Object)"The reader is already initialized.");
        this.reader = new SplitReader(this.format, this.serializer, this.collector, this.checkpointLock, this.readerState);
        this.readerState = null;
        this.reader.start();
    }

    @Override
    public void processElement(StreamRecord<FileInputSplit> element) throws Exception {
        ((SplitReader)this.reader).addSplit(element.getValue());
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        this.output.emitWatermark(mark);
    }

    @Override
    public void dispose() {
        super.dispose();
        this.reader.cancel();
        try {
            this.reader.join(200L);
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        while (this.reader.isAlive()) {
            StackTraceElement[] stack;
            StringBuilder bld = new StringBuilder();
            for (StackTraceElement e : stack = this.reader.getStackTrace()) {
                bld.append(e).append('\n');
            }
            LOG.warn("The reader is stuck in method:\n {}", (Object)bld.toString());
            this.reader.interrupt();
            try {
                this.reader.join(50L);
            }
            catch (InterruptedException interruptedException) {}
        }
        this.reader = null;
        this.collector = null;
        this.format = null;
        this.serializer = null;
    }

    @Override
    public void close() throws Exception {
        super.close();
        if (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            ((SplitReader)this.reader).addSplit(ContinuousFileReaderOperator.EOS);
            this.checkpointLock.wait();
        }
        this.collector.close();
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        AbstractStateBackend.CheckpointStateOutputStream os = this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp);
        ObjectOutputStream oos = new ObjectOutputStream((OutputStream)os);
        AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os);
        Tuple3 readerState = ((SplitReader)this.reader).getReaderState();
        List pendingSplits = (List)readerState.f0;
        FileInputSplit currSplit = (FileInputSplit)readerState.f1;
        Serializable formatState = (Serializable)readerState.f2;
        oos.writeObject(currSplit);
        ov.writeInt(pendingSplits.size());
        for (FileInputSplit split : pendingSplits) {
            oos.writeObject(split);
        }
        oos.writeObject(formatState);
        taskState.setOperatorState((StateHandle<?>)os.closeAndGetHandle());
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState state) throws Exception {
        super.restoreState(state);
        StreamStateHandle stream = (StreamStateHandle)state.getOperatorState();
        InputStream is = (InputStream)stream.getState(this.getUserCodeClassloader());
        ObjectInputStream ois = new ObjectInputStream(is);
        DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is);
        FileInputSplit currSplit = (FileInputSplit)ois.readObject();
        LinkedList<FileInputSplit> pendingSplits = new LinkedList<FileInputSplit>();
        int noOfSplits = div.readInt();
        for (int i = 0; i < noOfSplits; ++i) {
            FileInputSplit split = (FileInputSplit)ois.readObject();
            pendingSplits.add(split);
        }
        Serializable formatState = (Serializable)ois.readObject();
        Preconditions.checkState((this.readerState == null ? 1 : 0) != 0, (Object)"The reader state has already been initialized.");
        this.readerState = new Tuple3(pendingSplits, (Object)currSplit, (Object)formatState);
        div.close();
    }

    private class SplitReader<S extends Serializable, OT>
    extends Thread {
        private volatile boolean isRunning;
        private final FileInputFormat<OT> format;
        private final TypeSerializer<OT> serializer;
        private final Object checkpointLock;
        private final TimestampedCollector<OT> collector;
        private final Queue<FileInputSplit> pendingSplits;
        private FileInputSplit currentSplit = null;
        private S restoredFormatState = null;
        private volatile boolean isSplitOpen = false;

        private SplitReader(FileInputFormat<OT> format, TypeSerializer<OT> serializer, TimestampedCollector<OT> collector, Object checkpointLock, Tuple3<List<FileInputSplit>, FileInputSplit, S> restoredState) {
            this.format = (FileInputFormat)Preconditions.checkNotNull(format, (String)"Unspecified FileInputFormat.");
            this.serializer = (TypeSerializer)Preconditions.checkNotNull(serializer, (String)"Unspecified Serializer.");
            this.pendingSplits = new LinkedList<FileInputSplit>();
            this.collector = collector;
            this.checkpointLock = checkpointLock;
            this.isRunning = true;
            if (restoredState != null) {
                List pending = (List)restoredState.f0;
                FileInputSplit current = (FileInputSplit)restoredState.f1;
                Serializable formatState = (Serializable)restoredState.f2;
                for (FileInputSplit split : pending) {
                    Preconditions.checkArgument((!this.pendingSplits.contains(split) ? 1 : 0) != 0, (Object)("Duplicate split entry to read: " + split + "."));
                    this.pendingSplits.add(split);
                }
                this.currentSplit = current;
                this.restoredFormatState = formatState;
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void addSplit(FileInputSplit split) {
            Preconditions.checkNotNull((Object)split);
            Object object = this.checkpointLock;
            synchronized (object) {
                Preconditions.checkArgument((!this.pendingSplits.contains(split) ? 1 : 0) != 0, (Object)("Duplicate split entry to read: " + split + "."));
                this.pendingSplits.add(split);
            }
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object completedSplitsCounter;
            try {
                completedSplitsCounter = ContinuousFileReaderOperator.this.getMetricGroup().counter("numSplitsProcessed");
                this.format.openInputFormat();
                while (this.isRunning) {
                    Object object = this.checkpointLock;
                    synchronized (object) {
                        if (this.currentSplit != null) {
                            if (this.currentSplit.equals((Object)EOS)) {
                                this.isRunning = false;
                                break;
                            }
                            if (this.format instanceof CheckpointableInputFormat && this.restoredFormatState != null) {
                                CheckpointableInputFormat checkpointableFormat = (CheckpointableInputFormat)this.format;
                                checkpointableFormat.reopen((InputSplit)this.currentSplit, this.restoredFormatState);
                            } else {
                                LOG.info("Format " + this.format.getClass().getName() + " used is not checkpointable.");
                                this.format.open(this.currentSplit);
                            }
                            this.restoredFormatState = null;
                        } else {
                            this.currentSplit = this.pendingSplits.poll();
                            if (this.currentSplit == null) {
                                this.checkpointLock.wait(50L);
                                continue;
                            }
                            if (this.currentSplit.equals((Object)EOS)) {
                                this.isRunning = false;
                                break;
                            }
                            this.format.open(this.currentSplit);
                        }
                        this.isSplitOpen = true;
                    }
                    LOG.info("Reading split: " + this.currentSplit);
                    try {
                        Object nextElement = this.serializer.createInstance();
                        while (!this.format.reachedEnd()) {
                            Object object2 = this.checkpointLock;
                            synchronized (object2) {
                                nextElement = this.format.nextRecord(nextElement);
                                if (nextElement == null) {
                                    break;
                                }
                                this.collector.collect(nextElement);
                            }
                        }
                        completedSplitsCounter.inc();
                    }
                    finally {
                        object = this.checkpointLock;
                        synchronized (object) {
                            this.format.close();
                            this.isSplitOpen = false;
                            this.currentSplit = null;
                        }
                    }
                }
            }
            catch (Throwable e) {
                if (this.isRunning) {
                    LOG.error("Caught exception processing split: ", (Object)this.currentSplit);
                }
                ContinuousFileReaderOperator.this.getContainingTask().failExternally(e);
            }
            finally {
                completedSplitsCounter = this.checkpointLock;
                synchronized (completedSplitsCounter) {
                    LOG.info("Reader terminated, and exiting...");
                    this.format.closeInputFormat();
                    this.isSplitOpen = false;
                    this.currentSplit = null;
                    this.isRunning = false;
                    this.checkpointLock.notifyAll();
                }
            }
        }

        private Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
            ArrayList<FileInputSplit> snapshot = new ArrayList<FileInputSplit>(this.pendingSplits.size());
            for (FileInputSplit split : this.pendingSplits) {
                snapshot.add(split);
            }
            if (this.currentSplit != null && this.currentSplit.equals((Object)this.pendingSplits.peek())) {
                this.pendingSplits.remove();
            }
            if (this.currentSplit != null) {
                if (this.format instanceof CheckpointableInputFormat) {
                    CheckpointableInputFormat checkpointableFormat = (CheckpointableInputFormat)this.format;
                    Serializable formatState = this.isSplitOpen ? checkpointableFormat.getCurrentState() : this.restoredFormatState;
                    return new Tuple3(snapshot, (Object)this.currentSplit, (Object)formatState);
                }
                LOG.info("The format used is not checkpointable. The current input split will be restarted upon recovery.");
                return new Tuple3(snapshot, (Object)this.currentSplit, null);
            }
            return new Tuple3(snapshot, null, null);
        }

        public void cancel() {
            this.isRunning = false;
        }
    }
}

