package cn.ideabuffer.process.core.nodes.transmitter;

import cn.ideabuffer.process.core.Processor;
import cn.ideabuffer.process.core.context.Context;
import cn.ideabuffer.process.core.context.ContextWrapper;
import cn.ideabuffer.process.core.context.Contexts;
import cn.ideabuffer.process.core.executor.NodeExecutors;
import cn.ideabuffer.process.core.nodes.AbstractExecutableNode;
import cn.ideabuffer.process.core.nodes.TransmittableNode;
import cn.ideabuffer.process.core.status.ProcessStatus;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:cn/ideabuffer/process/core/nodes/transmitter/AbstractTransmittableNode.class */
public abstract class AbstractTransmittableNode<R, P extends Processor<R>> extends AbstractExecutableNode<R, P> implements TransmittableNode<R, P> {
    private TransmittableProcessor transmittableProcessor;

    @Override // cn.ideabuffer.process.core.nodes.transmitter.ResultStream
    public <V> ResultStream<V> thenApply(@NotNull ResultProcessor<V, R> resultProcessor) {
        TransmittableProcessor transmittableProcessor = new TransmittableProcessor(resultProcessor);
        this.transmittableProcessor = transmittableProcessor;
        return transmittableProcessor;
    }

    @Override // cn.ideabuffer.process.core.nodes.transmitter.ResultStream
    public <V> ResultStream<V> thenApplyAsync(@NotNull ResultProcessor<V, R> resultProcessor) {
        TransmittableProcessor transmittableProcessor = new TransmittableProcessor((ResultProcessor) resultProcessor, true, getExecutor());
        this.transmittableProcessor = transmittableProcessor;
        return transmittableProcessor;
    }

    @Override // cn.ideabuffer.process.core.nodes.transmitter.ResultStream
    public ResultStream<Void> thenAccept(@NotNull ResultConsumer<R> resultConsumer) {
        TransmittableProcessor transmittableProcessor = new TransmittableProcessor(resultConsumer);
        this.transmittableProcessor = transmittableProcessor;
        return transmittableProcessor;
    }

    @Override // cn.ideabuffer.process.core.nodes.transmitter.ResultStream
    public ResultStream<Void> thenAcceptAsync(@NotNull ResultConsumer<R> resultConsumer) {
        TransmittableProcessor transmittableProcessor = new TransmittableProcessor((ResultConsumer) resultConsumer, true, getExecutor());
        this.transmittableProcessor = transmittableProcessor;
        return transmittableProcessor;
    }

    @Override // cn.ideabuffer.process.core.nodes.AbstractExecutableNode, cn.ideabuffer.process.core.Executable
    @NotNull
    public ProcessStatus execute(Context context) throws Exception {
        ContextWrapper wrap = Contexts.wrap(context, context.getBlock(), getKeyMapper(), getReadableKeys(), getWritableKeys());
        if (getProcessor() == null || !ruleCheck(wrap)) {
            return ProcessStatus.proceed();
        }
        if (isParallel()) {
            doParallelProcess(wrap);
            return ProcessStatus.proceed();
        }
        try {
            Object process = getProcessor().process(wrap);
            if (getResultKey() != null) {
                wrap.put(getResultKey(), process);
            }
            if (this.transmittableProcessor != null) {
                this.transmittableProcessor.fire(wrap, process);
            }
            notifyListeners(wrap, process, null, true);
            return ProcessStatus.proceed();
        } catch (Exception e) {
            this.logger.error("process error, node:{}, context:{}", new Object[]{this, context, e});
            notifyListeners(wrap, null, e, false);
            throw e;
        }
    }

    private void doParallelProcess(Context context) {
        (getExecutor() == null ? NodeExecutors.DEFAULT_POOL : getExecutor()).execute(() -> {
            try {
                Object process = getProcessor().process(context);
                if (this.transmittableProcessor != null) {
                    this.transmittableProcessor.fire(context, process);
                }
                notifyListeners(context, process, null, true);
            } catch (Exception e) {
                this.logger.error("process error, node:{}, context:{}", new Object[]{this, context, e});
                notifyListeners(context, null, e, false);
            }
        });
    }
}
