package org.apache.asterix.external.input.stream;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.IError;
import org.apache.hyracks.api.util.ExceptionUtils;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/input/stream/LocalFSInputStream.class */
public class LocalFSInputStream extends AbstractMultipleInputStream {
    private static final Logger LOGGER = LogManager.getLogger();
    private final FileSystemWatcher watcher;
    private File currentFile;
    private String lastFileName = "";

    public LocalFSInputStream(FileSystemWatcher fileSystemWatcher) {
        this.watcher = fileSystemWatcher;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public void setController(AbstractFeedDataFlowController abstractFeedDataFlowController) {
        super.setController(abstractFeedDataFlowController);
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
        super.setFeedLogManager(feedLogManager);
        this.watcher.setFeedLogManager(feedLogManager);
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        if (this.in != null) {
            try {
                closeFile();
            } catch (Exception e) {
                iOException = new IOException(e);
            }
        }
        try {
            this.watcher.close();
        } catch (Exception e2) {
            if (iOException == null) {
                throw e2;
            }
            iOException.addSuppressed(e2);
            throw iOException;
        }
    }

    private void closeFile() throws IOException {
        if (this.in != null) {
            if (this.logManager != null) {
                this.logManager.endPartition(this.currentFile.getAbsolutePath());
            }
            try {
                this.in.close();
            } finally {
                this.in = null;
                this.currentFile = null;
            }
        }
    }

    @Override // org.apache.asterix.external.input.stream.AbstractMultipleInputStream
    protected boolean advance() throws IOException {
        String path = this.currentFile != null ? this.currentFile.getPath() : "";
        closeFile();
        this.currentFile = this.watcher.poll();
        if (this.currentFile == null) {
            if (this.controller != null) {
                this.controller.flush();
            }
            this.currentFile = this.watcher.take();
        }
        if (this.currentFile == null) {
            return false;
        }
        this.in = new FileInputStream(this.currentFile);
        this.lastFileName = path;
        if (this.notificationHandler == null) {
            return true;
        }
        this.notificationHandler.notifyNewSource();
        return true;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public boolean stop() throws Exception {
        closeFile();
        this.watcher.close();
        return true;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public boolean handleException(Throwable th) {
        if (this.in == null) {
            return false;
        }
        HyracksDataException rootCause = ExceptionUtils.getRootCause(th);
        if (rootCause instanceof HyracksDataException) {
            HyracksDataException hyracksDataException = rootCause;
            boolean z = false;
            if (hyracksDataException.matchesAny(ErrorCode.RECORD_READER_MALFORMED_INPUT_STREAM, new IError[]{org.apache.hyracks.api.exceptions.ErrorCode.PARSING_ERROR})) {
                logCorruptedInput();
                z = true;
            } else if (hyracksDataException.matches(ErrorCode.INPUT_RECORD_READER_CHAR_ARRAY_RECORD_TOO_LARGE)) {
                z = true;
            }
            if (z) {
                try {
                    advance();
                    return true;
                } catch (Exception e) {
                    LOGGER.log(Level.WARN, "An exception was thrown while trying to skip a file", e);
                }
            }
        }
        LOGGER.log(Level.WARN, "Failed to recover from failure", th);
        return false;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public String getStreamName() {
        return this.currentFile == null ? "" : this.currentFile.getPath();
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public String getPreviousStreamName() {
        return this.lastFileName;
    }

    private void logCorruptedInput() {
        if (this.currentFile != null) {
            try {
                this.logManager.logRecord(this.currentFile.getAbsolutePath(), "Corrupted input file");
            } catch (IOException e) {
                LOGGER.log(Level.WARN, "Filed to write to feed log file", e);
            }
            LOGGER.log(Level.WARN, "Corrupted input file: " + this.currentFile.getAbsolutePath());
        }
    }
}
