package org.apache.flink.streaming.api.functions.source;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.shaded.com.google.common.base.Ascii;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.AsyncExceptionChecker;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.class */
public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends AbstractStreamOperator<OUT> implements OneInputStreamOperator<FileInputSplit, OUT>, OutputTypeConfigurable<OUT>, AsyncExceptionChecker {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(ContinuousFileReaderOperator.class);
    private static final FileInputSplit EOS = new FileInputSplit(-1, (Path) null, -1, -1, (String[]) null);
    private FileInputFormat<OUT> format;
    private TypeSerializer<OUT> serializer;
    private transient Object checkpointLock;
    private transient ContinuousFileReaderOperator<OUT, S>.SplitReader<S, OUT> reader;
    private transient SourceFunction.SourceContext<OUT> readerContext;
    private Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState;

    /* renamed from: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$streaming$api$TimeCharacteristic = new int[TimeCharacteristic.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$streaming$api$TimeCharacteristic[TimeCharacteristic.EventTime.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$TimeCharacteristic[TimeCharacteristic.IngestionTime.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$streaming$api$TimeCharacteristic[TimeCharacteristic.ProcessingTime.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator$SplitReader.class */
    private class SplitReader<S extends Serializable, OT> extends Thread {
        private volatile boolean isRunning;
        private final FileInputFormat<OT> format;
        private final TypeSerializer<OT> serializer;
        private final Object checkpointLock;
        private final SourceFunction.SourceContext<OT> readerContext;
        private final Queue<FileInputSplit> pendingSplits;
        private FileInputSplit currentSplit;
        private S restoredFormatState;
        private volatile boolean isSplitOpen;

        private SplitReader(FileInputFormat<OT> fileInputFormat, TypeSerializer<OT> typeSerializer, SourceFunction.SourceContext<OT> sourceContext, Object obj, Tuple3<List<FileInputSplit>, FileInputSplit, S> tuple3) {
            this.currentSplit = null;
            this.restoredFormatState = null;
            this.isSplitOpen = false;
            this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat, "Unspecified FileInputFormat.");
            this.serializer = (TypeSerializer) Preconditions.checkNotNull(typeSerializer, "Unspecified Serializer.");
            this.readerContext = (SourceFunction.SourceContext) Preconditions.checkNotNull(sourceContext, "Unspecified Reader Context.");
            this.checkpointLock = Preconditions.checkNotNull(obj, "Unspecified checkpoint lock.");
            this.pendingSplits = new ArrayDeque();
            this.isRunning = true;
            if (tuple3 != null) {
                List list = (List) tuple3.f0;
                FileInputSplit fileInputSplit = (FileInputSplit) tuple3.f1;
                S s = (S) tuple3.f2;
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    this.pendingSplits.add((FileInputSplit) it.next());
                }
                this.currentSplit = fileInputSplit;
                this.restoredFormatState = s;
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addSplit(FileInputSplit fileInputSplit) {
            Preconditions.checkNotNull(fileInputSplit, "Cannot insert a null value in the pending splits queue.");
            synchronized (this.checkpointLock) {
                this.pendingSplits.add(fileInputSplit);
            }
        }

        public boolean isRunning() {
            return this.isRunning;
        }

        /* JADX WARN: Code restructure failed: missing block: B:26:0x0101, code lost:
        
            org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.LOG.info("Reading split: " + r4.currentSplit);
         */
        /* JADX WARN: Code restructure failed: missing block: B:28:0x011f, code lost:
        
            r6 = r4.serializer.createInstance();
         */
        /* JADX WARN: Code restructure failed: missing block: B:30:0x012e, code lost:
        
            if (r4.format.reachedEnd() != false) goto L158;
         */
        /* JADX WARN: Code restructure failed: missing block: B:31:0x0131, code lost:
        
            r0 = r4.checkpointLock;
         */
        /* JADX WARN: Code restructure failed: missing block: B:32:0x0137, code lost:
        
            monitor-enter(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:34:0x0138, code lost:
        
            r6 = r4.format.nextRecord(r6);
         */
        /* JADX WARN: Code restructure failed: missing block: B:35:0x0142, code lost:
        
            if (r6 == null) goto L157;
         */
        /* JADX WARN: Code restructure failed: missing block: B:36:0x0145, code lost:
        
            r4.readerContext.collect(r6);
         */
        /* JADX WARN: Code restructure failed: missing block: B:38:0x0158, code lost:
        
            monitor-exit(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:43:0x0153, code lost:
        
            monitor-exit(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:45:0x0166, code lost:
        
            r0.inc();
         */
        /* JADX WARN: Code restructure failed: missing block: B:46:0x016d, code lost:
        
            r0 = r4.checkpointLock;
         */
        /* JADX WARN: Code restructure failed: missing block: B:47:0x0172, code lost:
        
            monitor-enter(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:49:0x0173, code lost:
        
            r4.format.close();
            r4.isSplitOpen = false;
            r4.currentSplit = null;
         */
        /* JADX WARN: Code restructure failed: missing block: B:50:0x0185, code lost:
        
            monitor-exit(r0);
         */
        /* JADX WARN: Code restructure failed: missing block: B:70:0x0193, code lost:
        
            r11 = move-exception;
         */
        /* JADX WARN: Code restructure failed: missing block: B:73:0x019c, code lost:
        
            monitor-enter(r4.checkpointLock);
         */
        /* JADX WARN: Code restructure failed: missing block: B:75:0x019d, code lost:
        
            r4.format.close();
            r4.isSplitOpen = false;
            r4.currentSplit = null;
         */
        /* JADX WARN: Code restructure failed: missing block: B:79:0x01be, code lost:
        
            throw r11;
         */
        @Override // java.lang.Thread, java.lang.Runnable
        /*
            Code decompiled incorrectly, please refer to instructions dump.
            To view partially-correct add '--show-bad-code' argument
        */
        public void run() {
            /*
                Method dump skipped, instructions count: 672
                To view this dump add '--comments-level debug' option
            */
            throw new UnsupportedOperationException("Method not decompiled: org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator.SplitReader.run():void");
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Tuple3<List<FileInputSplit>, FileInputSplit, S> getReaderState() throws IOException {
            ArrayList arrayList = new ArrayList(this.pendingSplits.size());
            Iterator<FileInputSplit> it = this.pendingSplits.iterator();
            while (it.hasNext()) {
                arrayList.add(it.next());
            }
            if (this.currentSplit != null && this.currentSplit.equals(this.pendingSplits.peek())) {
                this.pendingSplits.remove();
            }
            if (this.currentSplit == null) {
                return new Tuple3<>(arrayList, (Object) null, (Object) null);
            }
            if (this.format instanceof CheckpointableInputFormat) {
                return new Tuple3<>(arrayList, this.currentSplit, this.isSplitOpen ? this.format.getCurrentState() : this.restoredFormatState);
            }
            ContinuousFileReaderOperator.LOG.info("The format does not support checkpointing. The current input split will be re-read from start upon recovery.");
            return new Tuple3<>(arrayList, this.currentSplit, (Object) null);
        }

        public void cancel() {
            this.isRunning = false;
        }

        /* synthetic */ SplitReader(ContinuousFileReaderOperator continuousFileReaderOperator, FileInputFormat fileInputFormat, TypeSerializer typeSerializer, SourceFunction.SourceContext sourceContext, Object obj, Tuple3 tuple3, AnonymousClass1 anonymousClass1) {
            this(fileInputFormat, typeSerializer, sourceContext, obj, tuple3);
        }
    }

    public ContinuousFileReaderOperator(FileInputFormat<OUT> fileInputFormat) {
        this.format = (FileInputFormat) Preconditions.checkNotNull(fileInputFormat);
    }

    @Override // org.apache.flink.streaming.api.operators.OutputTypeConfigurable
    public void setOutputType(TypeInformation<OUT> typeInformation, ExecutionConfig executionConfig) {
        this.serializer = typeInformation.createSerializer(executionConfig);
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        Preconditions.checkState(this.reader == null, "The reader is already initialized.");
        Preconditions.checkState(this.serializer != null, "The serializer has not been set. Probably the setOutputType() was not called. Please report it.");
        this.format.setRuntimeContext(getRuntimeContext());
        this.format.configure(new Configuration());
        this.checkpointLock = getContainingTask().getCheckpointLock();
        TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic();
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$streaming$api$TimeCharacteristic[timeCharacteristic.ordinal()]) {
            case 1:
                this.readerContext = new StreamSource.ManualWatermarkContext(this, this.checkpointLock, this.output);
                break;
            case 2:
                this.readerContext = new StreamSource.AutomaticWatermarkContext(this, this.checkpointLock, this.output, getRuntimeContext().getExecutionConfig().getAutoWatermarkInterval());
                break;
            case Ascii.ETX /* 3 */:
                this.readerContext = new StreamSource.NonTimestampContext(this, this.checkpointLock, this.output);
                break;
            default:
                throw new Exception(String.valueOf(timeCharacteristic));
        }
        this.reader = new SplitReader<>(this, this.format, this.serializer, this.readerContext, this.checkpointLock, this.readerState, null);
        this.readerState = null;
        this.reader.start();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<FileInputSplit> streamRecord) throws Exception {
        this.reader.addSplit(streamRecord.getValue());
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void dispose() {
        super.dispose();
        this.reader.cancel();
        try {
            this.reader.join(200L);
        } catch (InterruptedException e) {
        }
        while (this.reader.isAlive()) {
            StringBuilder sb = new StringBuilder();
            for (StackTraceElement stackTraceElement : this.reader.getStackTrace()) {
                sb.append(stackTraceElement).append('\n');
            }
            LOG.warn("The reader is stuck in method:\n {}", sb.toString());
            this.reader.interrupt();
            try {
                this.reader.join(50L);
            } catch (InterruptedException e2) {
            }
        }
        this.reader = null;
        this.readerContext = null;
        this.readerState = null;
        this.format = null;
        this.serializer = null;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void close() throws Exception {
        super.close();
        if (this.reader != null && this.reader.isAlive() && this.reader.isRunning()) {
            this.reader.addSplit(EOS);
            this.checkpointLock.wait();
        }
        if (this.readerContext != null) {
            this.readerContext.emitWatermark(Watermark.MAX_WATERMARK);
            this.readerContext.close();
        }
        this.output.close();
    }

    @Override // org.apache.flink.streaming.api.operators.AsyncExceptionChecker
    public void checkAsyncException() {
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public StreamTaskState snapshotOperatorState(long j, long j2) throws Exception {
        StreamTaskState snapshotOperatorState = super.snapshotOperatorState(j, j2);
        AbstractStateBackend.CheckpointStateOutputStream createCheckpointStateOutputStream = getStateBackend().createCheckpointStateOutputStream(j, j2);
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(createCheckpointStateOutputStream);
        AbstractStateBackend.CheckpointStateOutputView checkpointStateOutputView = new AbstractStateBackend.CheckpointStateOutputView(createCheckpointStateOutputStream);
        Tuple3 readerState = this.reader.getReaderState();
        List list = (List) readerState.f0;
        FileInputSplit fileInputSplit = (FileInputSplit) readerState.f1;
        Serializable serializable = (Serializable) readerState.f2;
        objectOutputStream.writeObject(fileInputSplit);
        checkpointStateOutputView.writeInt(list.size());
        Iterator it = list.iterator();
        while (it.hasNext()) {
            objectOutputStream.writeObject((FileInputSplit) it.next());
        }
        objectOutputStream.writeObject(serializable);
        snapshotOperatorState.setOperatorState(createCheckpointStateOutputStream.closeAndGetHandle());
        return snapshotOperatorState;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void restoreState(StreamTaskState streamTaskState) throws Exception {
        super.restoreState(streamTaskState);
        InputStream inputStream = (InputStream) streamTaskState.getOperatorState().getState(getUserCodeClassloader());
        ObjectInputStream objectInputStream = new ObjectInputStream(inputStream);
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(inputStream);
        FileInputSplit fileInputSplit = (FileInputSplit) objectInputStream.readObject();
        LinkedList linkedList = new LinkedList();
        int readInt = dataInputViewStreamWrapper.readInt();
        for (int i = 0; i < readInt; i++) {
            linkedList.add((FileInputSplit) objectInputStream.readObject());
        }
        Serializable serializable = (Serializable) objectInputStream.readObject();
        Preconditions.checkState(this.readerState == null, "The reader state has already been initialized.");
        this.readerState = new Tuple3<>(linkedList, fileInputSplit, serializable);
        dataInputViewStreamWrapper.close();
    }
}
