/*
 * Decompiled with CFR 0.152.
 */
package io.activej.csp.process.frame;

import io.activej.bytebuf.ByteBuf;
import io.activej.common.Checks;
import io.activej.common.builder.AbstractBuilder;
import io.activej.csp.ChannelInput;
import io.activej.csp.ChannelOutput;
import io.activej.csp.consumer.ChannelConsumer;
import io.activej.csp.dsl.WithChannelTransformer;
import io.activej.csp.process.AbstractCommunicatingProcess;
import io.activej.csp.process.frame.BlockEncoder;
import io.activej.csp.process.frame.FrameFormat;
import io.activej.csp.supplier.ChannelSupplier;
import io.activej.reactor.Reactive;

public final class ChannelFrameEncoder
extends AbstractCommunicatingProcess
implements WithChannelTransformer<ChannelFrameEncoder, ByteBuf, ByteBuf> {
    private static final boolean CHECKS = Checks.isEnabled(ChannelFrameEncoder.class);
    private final BlockEncoder encoder;
    private boolean encoderResets;
    private ChannelSupplier<ByteBuf> input;
    private ChannelConsumer<ByteBuf> output;

    private ChannelFrameEncoder(BlockEncoder encoder) {
        this.encoder = encoder;
    }

    public static ChannelFrameEncoder create(FrameFormat format) {
        return (ChannelFrameEncoder)ChannelFrameEncoder.builder(format.createEncoder()).build();
    }

    public static ChannelFrameEncoder create(BlockEncoder encoder) {
        return (ChannelFrameEncoder)ChannelFrameEncoder.builder(encoder).build();
    }

    public static Builder builder(FrameFormat format) {
        return ChannelFrameEncoder.builder(format.createEncoder());
    }

    public static Builder builder(BlockEncoder encoder) {
        return new ChannelFrameEncoder(encoder).new Builder();
    }

    @Override
    public ChannelInput<ByteBuf> getInput() {
        return input -> {
            if (CHECKS) {
                Reactive.checkInReactorThread((Reactive)this);
            }
            this.input = this.sanitize(input);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
            return this.getProcessCompletion();
        };
    }

    @Override
    public ChannelOutput<ByteBuf> getOutput() {
        return output -> {
            if (CHECKS) {
                Reactive.checkInReactorThread((Reactive)this);
            }
            this.output = this.sanitize(output);
            if (this.input != null && this.output != null) {
                this.startProcess();
            }
        };
    }

    @Override
    protected void doProcess() {
        this.encodeBufs();
    }

    private void encodeBufs() {
        this.input.filter(ByteBuf::canRead).get().whenResult(buf -> {
            if (this.encoderResets) {
                this.encoder.reset();
            }
            if (buf != null) {
                ByteBuf outputBuf = this.encoder.encode((ByteBuf)buf);
                buf.recycle();
                this.output.accept(outputBuf).whenResult(this::encodeBufs);
            } else {
                this.output.acceptAll((ByteBuf[])new ByteBuf[]{this.encoder.encodeEndOfStreamBlock(), null}).whenResult(() -> this.completeProcess());
            }
        });
    }

    @Override
    protected void doClose(Exception e) {
        this.input.closeEx(e);
        this.output.closeEx(e);
    }

    public final class Builder
    extends AbstractBuilder<Builder, ChannelFrameEncoder> {
        private Builder() {
        }

        public Builder withEncoderResets() {
            Builder.checkNotBuilt((AbstractBuilder)this);
            return this.withEncoderResets(true);
        }

        public Builder withEncoderResets(boolean encoderResets) {
            Builder.checkNotBuilt((AbstractBuilder)this);
            ChannelFrameEncoder.this.encoderResets = encoderResets;
            return this;
        }

        protected ChannelFrameEncoder doBuild() {
            return ChannelFrameEncoder.this;
        }
    }
}

