/*
 * Decompiled with CFR 0.152.
 */
package net.sf.filePiper.model;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import net.sf.filePiper.model.ExecutionPhase;
import net.sf.filePiper.model.FileProcessor;
import net.sf.filePiper.model.FileProcessorEnvironment;
import net.sf.filePiper.model.InputFileInfo;
import net.sf.filePiper.model.PipeComponent;
import net.sf.filePiper.model.Pipeline;
import net.sf.filePiper.model.PipelineEnvironment;
import org.apache.log4j.Logger;

public class ProcessorThread
extends Thread
implements FileProcessorEnvironment,
PipeComponent {
    private static Logger log = Logger.getLogger(ProcessorThread.class);
    private FileProcessor processor;
    private InputStream is;
    private InputFileInfo inputInfo;
    private Pipeline line;
    private PipeComponent nextComponent;
    private byte[] consumeBuffer = new byte[1024];
    private PipelineEnvironment mainReporting;
    private boolean thisShouldContinue;

    public ProcessorThread(FileProcessor p, Pipeline pipeline, PipeComponent nextPipeComponent, PipelineEnvironment reporting) {
        super(p.getProcessorName());
        this.line = pipeline;
        this.processor = p;
        this.nextComponent = nextPipeComponent;
        this.mainReporting = reporting;
        this.thisShouldContinue = true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run() {
        try {
            this.processor.startBatch(this);
        }
        catch (IOException e) {
            log.error((Object)("Error in processor " + this.processor + " startBatch(..) method"), (Throwable)e);
            this.mainReporting.finished(e);
            this.thisShouldContinue = false;
        }
        while (this.shouldContinue() && this.thisShouldContinue) {
            ProcessorThread e = this;
            synchronized (e) {
                while (this.shouldContinue() && this.thisShouldContinue && this.is == null) {
                    try {
                        this.wait();
                    }
                    catch (Exception e2) {
                        log.warn((Object)"Wait interrupted by exception", (Throwable)e2);
                    }
                }
            }
            if (this.is == null || !this.shouldContinue()) continue;
            try {
                this.processor.process(this.is, this.inputInfo, this);
            }
            catch (Exception e2) {
                log.error((Object)("Error in processor " + this.processor + " for input: " + this.inputInfo), (Throwable)e2);
                this.mainReporting.finished(e2);
                this.thisShouldContinue = false;
            }
            finally {
                this.releaseInputStream();
            }
        }
        this.thisShouldContinue = false;
        try {
            this.processor.endBatch(this);
        }
        catch (IOException e) {
            log.error((Object)("Error in processor " + this.processor + " batch end"), (Throwable)e);
            this.mainReporting.finished(e);
        }
        try {
            this.nextComponent.finished();
        }
        catch (IOException e) {
            log.error((Object)("Error in processor " + this.nextComponent + " when finished signal forwarded from " + this), (Throwable)e);
            this.mainReporting.finished(e);
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("End of Thread " + this));
        }
    }

    private synchronized void releaseInputStream() {
        if (this.is != null) {
            try {
                int count;
                while ((count = this.is.read(this.consumeBuffer)) > 0) {
                }
                this.is.close();
            }
            catch (Exception e) {
                log.warn((Object)("Failed to close input steam in processor " + this.processor + " for input: " + this.inputInfo), (Throwable)e);
            }
            this.is = null;
            if (log.isDebugEnabled()) {
                log.debug((Object)("    <<< " + this + " release inputStream for input: " + this.inputInfo));
            }
        }
        this.notifyAll();
    }

    public String toString() {
        return this.getClass().getSimpleName() + "[" + this.processor.getProcessorName() + "@" + System.identityHashCode(this.processor) + "]";
    }

    public synchronized void processInputStream(InputStream input, InputFileInfo info) {
        while (this.shouldContinue() && this.thisShouldContinue && this.is != null) {
            try {
                this.wait();
            }
            catch (Exception e) {
                log.warn((Object)"Wait interrupted by exception", (Throwable)e);
            }
        }
        if (this.shouldContinue() && this.thisShouldContinue) {
            if (log.isDebugEnabled()) {
                log.debug((Object)("    >>> " + this + " starts for input " + info));
            }
            this.is = input;
            this.inputInfo = info;
            this.notifyAll();
        }
    }

    public OutputStream createOutputStream(InputFileInfo info) throws IOException {
        PipedInputStream pis = new PipedInputStream();
        PipedOutputStream pos = new PipedOutputStream(pis);
        this.processInputStream(pis, info);
        return pos;
    }

    public synchronized void finished() throws IOException {
        if (log.isDebugEnabled()) {
            log.debug((Object)("<<Finished>> signal in " + this));
        }
        while (this.shouldContinue() && this.thisShouldContinue && this.is != null) {
            try {
                this.wait();
            }
            catch (Exception e) {
                log.warn((Object)"Wait interrupted by exception", (Throwable)e);
            }
        }
        if (log.isDebugEnabled()) {
            log.debug((Object)("||| last input stream done in " + this));
        }
        this.thisShouldContinue = false;
        this.notifyAll();
    }

    public OutputStream getOutputStream(InputFileInfo info) throws IOException {
        OutputStream out = this.nextComponent.createOutputStream(info);
        return out;
    }

    public boolean shouldContinue() {
        return this.mainReporting.shouldContinue();
    }

    public Pipeline getPipeline() {
        return this.line;
    }

    public ExecutionPhase getCurrentPhase() {
        if (this.mainReporting.isAborted()) {
            return ExecutionPhase.ABORTED;
        }
        if (this.mainReporting.isErrored()) {
            return ExecutionPhase.ERRORED;
        }
        if (!this.thisShouldContinue) {
            return ExecutionPhase.DONE;
        }
        if (this.mainReporting.isRunning()) {
            return ExecutionPhase.RUNNING;
        }
        return ExecutionPhase.NONE;
    }
}

