/*
 * Decompiled with CFR 0.152.
 */
package net.lecousin.framework.network.mime.transfer;

import java.io.IOException;
import java.nio.ByteBuffer;
import net.lecousin.framework.concurrent.CancelException;
import net.lecousin.framework.concurrent.Task;
import net.lecousin.framework.concurrent.synch.AsyncWork;
import net.lecousin.framework.concurrent.synch.ISynchronizationPoint;
import net.lecousin.framework.concurrent.synch.SynchronizationPoint;
import net.lecousin.framework.concurrent.util.production.simple.Consumer;
import net.lecousin.framework.concurrent.util.production.simple.Producer;
import net.lecousin.framework.concurrent.util.production.simple.Production;
import net.lecousin.framework.exception.NoException;
import net.lecousin.framework.io.IO;
import net.lecousin.framework.io.util.IOReaderAsProducer;
import net.lecousin.framework.network.TCPRemote;
import net.lecousin.framework.network.mime.MIME;
import net.lecousin.framework.network.mime.transfer.TransferReceiver;
import net.lecousin.framework.network.mime.transfer.encoding.ContentDecoder;

public class IdentityTransfer
extends TransferReceiver {
    private long pos = 0L;
    private long size;
    private boolean eot;

    public IdentityTransfer(MIME mime, ContentDecoder decoder) throws IOException {
        super(mime, decoder);
        Long s = mime.getContentLength();
        if (s == null) {
            throw new IOException("No content length for identity transfer: impossible to transfer data");
        }
        this.size = s;
        this.eot = this.size == 0L;
    }

    @Override
    public boolean isExpectingData() {
        return this.size > 0L;
    }

    @Override
    public AsyncWork<Boolean, IOException> consume(final ByteBuffer buf) {
        if (this.eot) {
            return new AsyncWork((Object)Boolean.TRUE, null);
        }
        final AsyncWork result = new AsyncWork();
        int l = buf.remaining();
        if (this.pos + (long)l > this.size) {
            int l2 = (int)(this.size - this.pos);
            this.pos += (long)l2;
            this.eot = this.pos == this.size;
            final int limit = buf.limit();
            buf.limit(limit - (l - l2));
            final ISynchronizationPoint<IOException> decode = this.decoder.decode(buf);
            decode.listenInline(new Runnable(){

                @Override
                public void run() {
                    buf.limit(limit);
                    if (decode.isSuccessful()) {
                        if (!IdentityTransfer.this.eot) {
                            result.unblockSuccess((Object)Boolean.FALSE);
                        } else {
                            IdentityTransfer.this.decoder.endOfData().listenInline(() -> result.unblockSuccess((Object)Boolean.TRUE), (ISynchronizationPoint)result);
                        }
                    } else {
                        result.unblockError((Exception)IO.error((Throwable)decode.getError()));
                    }
                }
            });
        } else {
            this.pos += (long)l;
            this.eot = this.pos == this.size;
            final ISynchronizationPoint<IOException> decode = this.decoder.decode(buf);
            decode.listenInline(new Runnable(){

                @Override
                public void run() {
                    if (decode.isSuccessful()) {
                        if (!IdentityTransfer.this.eot) {
                            result.unblockSuccess((Object)Boolean.FALSE);
                        } else {
                            IdentityTransfer.this.decoder.endOfData().listenInline(() -> result.unblockSuccess((Object)Boolean.TRUE), (ISynchronizationPoint)result);
                        }
                    } else {
                        result.unblockError((Exception)IO.error((Throwable)decode.getError()));
                    }
                }
            });
        }
        return result;
    }

    public static SynchronizationPoint<IOException> send(final TCPRemote client, final IO.Readable data, final int bufferSize, final int maxBuffers) {
        final SynchronizationPoint result = new SynchronizationPoint();
        Task.Cpu<Void, NoException> task = new Task.Cpu<Void, NoException>("Sending MIME body", 4){

            public Void run() {
                Production production = new Production((Producer)new IOReaderAsProducer(data, bufferSize), maxBuffers, (Consumer)new Consumer<ByteBuffer>(){

                    public AsyncWork<Void, IOException> consume(ByteBuffer product) {
                        return client.send(product).toAsyncWorkVoid();
                    }

                    public AsyncWork<Void, IOException> endOfProduction() {
                        return new AsyncWork(null, null);
                    }

                    public void error(Exception error) {
                        result.error((Exception)IO.error((Throwable)error));
                    }

                    public void cancel(CancelException event) {
                        result.cancel(event);
                    }
                });
                production.start();
                production.getSyncOnFinished().listenInline((AsyncWork.AsyncWorkListener)new AsyncWork.AsyncWorkListener<Void, Exception>(){

                    public void ready(Void r) {
                        result.unblock();
                    }

                    public void error(Exception error) {
                        result.error((Exception)IO.error((Throwable)error));
                    }

                    public void cancelled(CancelException event) {
                        result.cancel(event);
                    }
                });
                return null;
            }
        };
        task.start();
        return result;
    }

    public static SynchronizationPoint<IOException> send(TCPRemote client, IO.Readable.Buffered data) {
        SynchronizationPoint result = new SynchronizationPoint();
        IdentityTransfer.sendNextBuffer(client, data, (SynchronizationPoint<IOException>)result);
        return result;
    }

    private static void sendNextBuffer(TCPRemote client, IO.Readable.Buffered data, SynchronizationPoint<IOException> result) {
        data.readNextBufferAsync().listenInline(buffer -> {
            if (buffer == null) {
                result.unblock();
                return;
            }
            client.send(buffer).listenInline(() -> IdentityTransfer.sendNextBuffer(client, data, result), (ISynchronizationPoint)result);
        }, result);
    }
}

