package org.apache.asterix.external.input.stream;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import org.apache.asterix.external.api.AsterixInputStream;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FileSystemWatcher;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/asterix/external/input/stream/LocalFSInputStream.class */
public class LocalFSInputStream extends AsterixInputStream {
    private static final Logger LOGGER = Logger.getLogger(LocalFSInputStream.class.getName());
    private final FileSystemWatcher watcher;
    private FileInputStream in;
    private byte lastByte;
    private File currentFile;

    public LocalFSInputStream(FileSystemWatcher fileSystemWatcher) {
        this.watcher = fileSystemWatcher;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public void setController(AbstractFeedDataFlowController abstractFeedDataFlowController) {
        super.setController(abstractFeedDataFlowController);
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public void setFeedLogManager(FeedLogManager feedLogManager) throws HyracksDataException {
        super.setFeedLogManager(feedLogManager);
        this.watcher.setFeedLogManager(feedLogManager);
    }

    @Override // java.io.InputStream, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        IOException iOException = null;
        if (this.in != null) {
            try {
                closeFile();
            } catch (Exception e) {
                iOException = new IOException(e);
            }
        }
        try {
            this.watcher.close();
        } catch (Exception e2) {
            if (iOException == null) {
                throw e2;
            }
            iOException.addSuppressed(e2);
            throw iOException;
        }
    }

    private void closeFile() throws IOException {
        if (this.in != null) {
            if (this.logManager != null) {
                this.logManager.endPartition(this.currentFile.getAbsolutePath());
            }
            try {
                this.in.close();
            } finally {
                this.in = null;
                this.currentFile = null;
            }
        }
    }

    private boolean advance() throws IOException {
        closeFile();
        this.currentFile = this.watcher.poll();
        if (this.currentFile == null) {
            if (this.controller != null) {
                this.controller.flush();
            }
            this.currentFile = this.watcher.take();
        }
        if (this.currentFile == null) {
            return false;
        }
        this.in = new FileInputStream(this.currentFile);
        if (this.notificationHandler == null) {
            return true;
        }
        this.notificationHandler.notifyNewSource();
        return true;
    }

    @Override // java.io.InputStream
    public int read() throws IOException {
        throw new HyracksDataException("read() is not supported with this stream. use read(byte[] b, int off, int len)");
    }

    @Override // java.io.InputStream
    public int read(byte[] bArr, int i, int i2) throws IOException {
        int i3;
        if (this.in == null && !advance()) {
            return -1;
        }
        int read = this.in.read(bArr, i, i2);
        while (true) {
            i3 = read;
            if (i3 >= 0 || !advance()) {
                break;
            }
            if (this.lastByte != 10 && this.lastByte != 10) {
                this.lastByte = (byte) 10;
                bArr[i] = 10;
                return 1;
            }
            read = this.in.read(bArr, i, i2);
        }
        if (i3 > 0) {
            this.lastByte = bArr[(i + i3) - 1];
        }
        return i3;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public boolean stop() throws Exception {
        closeFile();
        this.watcher.close();
        return true;
    }

    @Override // org.apache.asterix.external.api.AsterixInputStream
    public boolean handleException(Throwable th) {
        if (this.in == null) {
            return false;
        }
        if ((th instanceof IOException) && th.getCause().getMessage().contains("Malformed input stream")) {
            if (this.currentFile != null) {
                try {
                    this.logManager.logRecord(this.currentFile.getAbsolutePath(), "Corrupted input file");
                } catch (IOException e) {
                    LOGGER.warn("Filed to write to feed log file", e);
                }
                LOGGER.warn("Corrupted input file: " + this.currentFile.getAbsolutePath());
            }
            try {
                advance();
                return true;
            } catch (Exception e2) {
                LOGGER.warn("An exception was thrown while trying to skip a file", e2);
            }
        }
        LOGGER.warn("Failed to recover from failure", th);
        return false;
    }
}
