/*
 * Decompiled with CFR 0.152.
 */
package de.unkrig.commons.util.concurrent;

import de.unkrig.commons.io.IoUtil;
import de.unkrig.commons.io.OutputStreams;
import de.unkrig.commons.io.ProxyOutputStream;
import de.unkrig.commons.io.pipe.Pipe;
import de.unkrig.commons.io.pipe.PipeFactory;
import de.unkrig.commons.io.pipe.PipeUtil;
import de.unkrig.commons.lang.protocol.Consumer;
import de.unkrig.commons.lang.protocol.ConsumerUtil;
import de.unkrig.commons.lang.protocol.ConsumerWhichThrows;
import de.unkrig.commons.lang.protocol.Producer;
import de.unkrig.commons.nullanalysis.NotNullByDefault;
import de.unkrig.commons.nullanalysis.Nullable;
import de.unkrig.commons.util.concurrent.SquadExecutor;
import java.io.IOException;
import java.io.OutputStream;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;

@NotNullByDefault(value=false)
public class ByteStreamSequentializer {
    private final SquadExecutor<Void> squadExecutor;
    private Producer<? extends Pipe> pipeProvider = new Producer<Pipe>(){

        @Override
        @Nullable
        public Pipe produce() {
            return PipeFactory.byteArrayRingBuffer(8192);
        }
    };
    private OutputStream nextTarget;

    public ByteStreamSequentializer(OutputStream delegate, ExecutorService squadExecutor) {
        this.squadExecutor = new SquadExecutor(squadExecutor);
        this.nextTarget = delegate;
    }

    void setPipeProvider(Producer<? extends Pipe> pipeProvider) {
        this.pipeProvider = pipeProvider;
    }

    public synchronized void submit(Consumer<? super OutputStream> task) {
        this.submit(ConsumerUtil.widen2(task));
    }

    public synchronized <EX extends Throwable> void submit(final ConsumerWhichThrows<? super OutputStream, EX> task) {
        final OutputStream previousTarget = this.nextTarget;
        final Pipe buffer = (Pipe)this.pipeProvider.produce();
        assert (buffer != null);
        final PipeUtil.InputOutputStreams ios = PipeUtil.asInputOutputStreams(buffer);
        final ProxyOutputStream pos = new ProxyOutputStream(ios.getOutputStream());
        this.nextTarget = pos;
        this.squadExecutor.submit(new Callable<Void>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public Void call() throws Exception {
                try {
                    task.consume(OutputStreams.unclosable(previousTarget));
                }
                catch (Exception ex) {
                    throw ex;
                }
                catch (Error er) {
                    throw er;
                }
                catch (Throwable th) {
                    throw new Exception(th);
                }
                ProxyOutputStream proxyOutputStream = pos;
                synchronized (proxyOutputStream) {
                    IoUtil.copyAvailable(ios.getInputStream(), previousTarget);
                    pos.setDelegate(previousTarget);
                    buffer.close();
                }
                return null;
            }
        });
    }

    public void awaitCompletion() throws InterruptedException, ExecutionException, CancellationException, IOException {
        this.nextTarget.close();
        this.squadExecutor.awaitCompletion();
    }

    void awaitCompletion(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, IOException {
        this.nextTarget.close();
        this.squadExecutor.awaitCompletion(timeout, unit);
    }
}

