/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process.frame;

import io.activej.bytebuf.ByteBuf;
import io.activej.bytebuf.ByteBufs;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.common.exception.MalformedDataException;
import io.activej.common.exception.TruncatedDataException;
import io.activej.csp.ChannelOutput;
import io.activej.csp.binary.BinaryChannelInput;
import io.activej.csp.binary.BinaryChannelSupplier;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.dsl.WithBinaryChannelInput;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.csp.process.frame.BlockDecoder;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.csp.process.frame.MissingEndOfStreamBlockException;
import io.activej.csp.process.frame.TruncatedBlockException;
import io.activej.promise.Promise;
import io.activej.reactor.Reactive;

public final class ChannelFrameDecoder
extends AbstractCommunicatingProcess
implements WithChannelTransformer<ChannelFrameDecoder, ByteBuf, ByteBuf>,
WithBinaryChannelInput<ChannelFrameDecoder> {
    private static final boolean CHECKS = Checks.isEnabled(ChannelFrameDecoder.class);
    private final BlockDecoder decoder;
    private boolean decoderResets;
    private ByteBufs bufs;
    private BinaryChannelSupplier input;
    private ChannelConsumer<ByteBuf> output;

    private ChannelFrameDecoder(BlockDecoder decoder) {
        this.decoder = decoder;
    }

    public static ChannelFrameDecoder create(FrameFormat format) {
        return (ChannelFrameDecoder)ChannelFrameDecoder.builder(format).build();
    }

    public static ChannelFrameDecoder create(BlockDecoder decoder) {
        return (ChannelFrameDecoder)ChannelFrameDecoder.builder(decoder).build();
    }

    public static Builder builder(FrameFormat format) {
        return ChannelFrameDecoder.builder(format.createDecoder());
    }

    public static Builder builder(BlockDecoder decoder) {
        return new ChannelFrameDecoder(decoder).new Builder();
    }

    @Override
    public BinaryChannelInput getInput() {
        return input -> {
            if (CHECKS) {
                Reactive.checkInReactorThread((Reactive)this);
            }
            this.input = input;
            this.bufs = input.getBufs();
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
            return this.getProcessCompletion();
        };
    }

    @Override
    public ChannelOutput<ByteBuf> getOutput() {
        return output -> {
            if (CHECKS) {
                Reactive.checkInReactorThread((Reactive)this);
            }
            this.output = this.sanitize(output);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
        };
    }

    @Override
    protected void doProcess() {
        this.decode().subscribe((result, e) -> {
            if (e instanceof TruncatedDataException) {
                if (this.bufs.isEmpty()) {
                    if (this.decoder.ignoreMissingEndOfStreamBlock()) {
                        this.output.acceptEndOfStream().whenResult(() -> this.completeProcess());
                    } else {
                        this.closeEx((Exception)((Object)new MissingEndOfStreamBlockException(e)));
                    }
                } else {
                    this.closeEx((Exception)((Object)new TruncatedBlockException(e)));
                }
            } else {
                this.doSanitize(result, e).whenResult(buf -> {
                    if (buf != BlockDecoder.END_OF_STREAM) {
                        this.output.accept((ByteBuf)buf).whenResult(this::doProcess);
                    } else {
                        this.input.endOfStream().then((x$0, x$1) -> this.doSanitize(x$0, (Exception)x$1)).then(() -> this.output.acceptEndOfStream()).whenResult(() -> this.completeProcess());
                    }
                });
            }
        });
    }

    private Promise<ByteBuf> decode() {
        Promise<Void> moreDataPromise;
        do {
            if (this.bufs.isEmpty()) continue;
            try {
                ByteBuf result = this.decoder.decode(this.bufs);
                if (result != null) {
                    if (this.decoderResets) {
                        this.decoder.reset();
                    }
                    return Promise.of((Object)result);
                }
            }
            catch (MalformedDataException e) {
                this.closeEx((Exception)((Object)e));
                return Promise.ofException((Exception)((Object)e));
            }
        } while ((moreDataPromise = this.input.needMoreData()).isResult());
        return moreDataPromise.then(this::decode);
    }

    @Override
    protected void doClose(Exception e) {
        this.input.closeEx(e);
        this.output.closeEx(e);
    }

    public final class Builder
    extends AbstractBuilder<Builder, ChannelFrameDecoder> {
        private Builder() {
        }

        public Builder withDecoderResets() {
            Builder.checkNotBuilt((AbstractBuilder)this);
            return this.withDecoderResets(true);
        }

        public Builder withDecoderResets(boolean decoderResets) {
            Builder.checkNotBuilt((AbstractBuilder)this);
            ChannelFrameDecoder.this.decoderResets = decoderResets;
            return this;
        }

        protected ChannelFrameDecoder doBuild() {
            return ChannelFrameDecoder.this;
        }
    }
}

