/*
 * Decompiled with CFR 0.152.
 */
package de.quantummaid.eventmaid.channel;

import de.quantummaid.eventmaid.channel.Channel;
import de.quantummaid.eventmaid.channel.ChannelProcessingFrame;
import de.quantummaid.eventmaid.channel.ChannelStatusInformation;
import de.quantummaid.eventmaid.channel.action.Action;
import de.quantummaid.eventmaid.channel.action.ActionHandler;
import de.quantummaid.eventmaid.channel.action.ActionHandlerSet;
import de.quantummaid.eventmaid.channel.exception.ChannelExceptionHandler;
import de.quantummaid.eventmaid.channel.internal.events.ChannelEventListener;
import de.quantummaid.eventmaid.channel.internal.filtering.FilterApplierImpl;
import de.quantummaid.eventmaid.channel.internal.filtering.PostFilterActions;
import de.quantummaid.eventmaid.channel.internal.statistics.ChannelStatisticsCollector;
import de.quantummaid.eventmaid.channel.statistics.ChannelStatistics;
import de.quantummaid.eventmaid.filtering.Filter;
import de.quantummaid.eventmaid.identification.CorrelationId;
import de.quantummaid.eventmaid.identification.MessageId;
import de.quantummaid.eventmaid.internal.exceptions.BubbleUpWrappedException;
import de.quantummaid.eventmaid.internal.pipe.Pipe;
import de.quantummaid.eventmaid.processingContext.EventType;
import de.quantummaid.eventmaid.processingContext.ProcessingContext;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;

final class ChannelImpl<T>
implements Channel<T> {
    private final Pipe<ProcessingContext<T>> acceptingPipe;
    private final List<Filter<ProcessingContext<T>>> preFilter;
    private final List<Filter<ProcessingContext<T>>> processFilter;
    private final List<Filter<ProcessingContext<T>>> postFilter;
    private final Action<T> defaultAction;
    private final ActionHandlerSet<T> actionHandlerSet;
    private final ChannelStatisticsCollector statisticsCollector;
    private final ChannelExceptionHandler<T> exceptionHandler;

    private ChannelImpl(Pipe<ProcessingContext<T>> acceptingPipe, Pipe<ProcessingContext<T>> preToProcessPipe, Pipe<ProcessingContext<T>> processToPostPipe, Pipe<ProcessingContext<T>> afterPostPipe, Action<T> defaultAction, ChannelEventListener<ProcessingContext<T>> eventListener, ChannelStatisticsCollector statisticsCollector, ActionHandlerSet<T> actionHandlerSet, ChannelExceptionHandler<T> exceptionHandler) {
        this.acceptingPipe = acceptingPipe;
        this.defaultAction = defaultAction;
        this.actionHandlerSet = actionHandlerSet;
        this.statisticsCollector = statisticsCollector;
        this.exceptionHandler = exceptionHandler;
        this.preFilter = new CopyOnWriteArrayList<Filter<ProcessingContext<T>>>();
        this.processFilter = new CopyOnWriteArrayList<Filter<ProcessingContext<T>>>();
        this.postFilter = new CopyOnWriteArrayList<Filter<ProcessingContext<T>>>();
        acceptingPipe.subscribe(new AdvanceMessageUsingFilter(this.preFilter, preToProcessPipe, eventListener, exceptionHandler));
        preToProcessPipe.subscribe(new AdvanceMessageUsingFilter(this.processFilter, processToPostPipe, eventListener, exceptionHandler));
        processToPostPipe.subscribe(new AdvanceMessageUsingFilter(this.postFilter, afterPostPipe, eventListener, exceptionHandler));
        afterPostPipe.subscribe(new ConsumerExecutingActionSetByFilterOrDefaultAction());
    }

    static <T> Channel<T> channel(Action<T> defaultAction, Pipe<ProcessingContext<T>> acceptingPipe, Pipe<ProcessingContext<T>> prePipe, Pipe<ProcessingContext<T>> processPipe, Pipe<ProcessingContext<T>> postPipe, ChannelEventListener<ProcessingContext<T>> eventListener, ChannelStatisticsCollector statisticsCollector, ActionHandlerSet<T> actionHandlerSet, ChannelExceptionHandler<T> exceptionHandler) {
        return new ChannelImpl<T>(acceptingPipe, prePipe, processPipe, postPipe, defaultAction, eventListener, statisticsCollector, actionHandlerSet, exceptionHandler);
    }

    @Override
    public MessageId send(T message) {
        EventType eventType = EventType.eventTypeFromObjectClass(message);
        ProcessingContext<T> processingContext = ProcessingContext.processingContext(eventType, message);
        return this.send(processingContext);
    }

    @Override
    public MessageId send(T message, CorrelationId correlationId) {
        EventType eventType = EventType.eventTypeFromObjectClass(message);
        ProcessingContext<T> processingContext = ProcessingContext.processingContext(eventType, message, correlationId);
        return this.send(processingContext);
    }

    @Override
    public MessageId send(ProcessingContext<T> processingContext) {
        MessageId messageId = processingContext.getMessageId();
        try {
            this.advanceChannelProcessingFrameHistory(processingContext);
            this.acceptingPipe.send(processingContext);
            return messageId;
        }
        catch (BubbleUpWrappedException e) {
            this.exceptionHandler.handleBubbledUpException(e);
            return messageId;
        }
    }

    private void advanceChannelProcessingFrameHistory(ProcessingContext<T> processingContext) {
        ChannelProcessingFrame previousProcessingFrame = processingContext.getCurrentProcessingFrame();
        ChannelProcessingFrame currentProcessingFrame = ChannelProcessingFrame.processingFrame(this);
        if (this.noPreviousChannelTraversed(previousProcessingFrame)) {
            processingContext.setInitialProcessingFrame(currentProcessingFrame);
        } else {
            previousProcessingFrame.setNextFrame(currentProcessingFrame);
            currentProcessingFrame.setPreviousFrame(previousProcessingFrame);
        }
        processingContext.setCurrentProcessingFrame(currentProcessingFrame);
    }

    private boolean noPreviousChannelTraversed(ChannelProcessingFrame<T> previousProcessingFrame) {
        return previousProcessingFrame == null;
    }

    @Override
    public void addPreFilter(Filter<ProcessingContext<T>> filter) {
        this.preFilter.add(filter);
    }

    @Override
    public void addPreFilter(Filter<ProcessingContext<T>> filter, int position) {
        this.preFilter.add(position, filter);
    }

    @Override
    public List<Filter<ProcessingContext<T>>> getPreFilter() {
        return this.preFilter;
    }

    @Override
    public void removePreFilter(Filter<ProcessingContext<T>> filter) {
        this.preFilter.remove(filter);
    }

    @Override
    public void addProcessFilter(Filter<ProcessingContext<T>> filter) {
        this.processFilter.add(filter);
    }

    @Override
    public void addProcessFilter(Filter<ProcessingContext<T>> filter, int position) {
        this.processFilter.add(position, filter);
    }

    @Override
    public List<Filter<ProcessingContext<T>>> getProcessFilter() {
        return this.processFilter;
    }

    @Override
    public void removeProcessFilter(Filter<ProcessingContext<T>> filter) {
        this.processFilter.remove(filter);
    }

    @Override
    public void addPostFilter(Filter<ProcessingContext<T>> filter) {
        this.postFilter.add(filter);
    }

    @Override
    public void addPostFilter(Filter<ProcessingContext<T>> filter, int position) {
        this.postFilter.add(position, filter);
    }

    @Override
    public List<Filter<ProcessingContext<T>>> getPostFilter() {
        return this.postFilter;
    }

    @Override
    public void removePostFilter(Filter<ProcessingContext<T>> filter) {
        this.postFilter.remove(filter);
    }

    @Override
    public Action<T> getDefaultAction() {
        return this.defaultAction;
    }

    @Override
    public ChannelStatusInformation getStatusInformation() {
        ChannelStatistics statistics = this.statisticsCollector.getStatistics();
        return ChannelStatusInformation.channelStatusInformation(statistics);
    }

    @Override
    public void close(boolean finishRemainingTasks) {
        this.acceptingPipe.close(finishRemainingTasks);
    }

    @Override
    public void close() {
        this.close(false);
    }

    @Override
    public boolean isClosed() {
        return this.acceptingPipe.isClosed();
    }

    @Override
    public boolean awaitTermination(int timeout, TimeUnit timeUnit) throws InterruptedException {
        return this.acceptingPipe.awaitTermination(timeout, timeUnit);
    }

    private final class AdvanceMessageUsingFilter
    implements Consumer<ProcessingContext<T>> {
        private final List<Filter<ProcessingContext<T>>> filter;
        private final Pipe<ProcessingContext<T>> nextPipe;
        private final ChannelEventListener<ProcessingContext<T>> eventListener;
        private final ChannelExceptionHandler<T> exceptionHandler;

        @Override
        public void accept(ProcessingContext<T> preFilterprocessingContext) {
            FilterApplierImpl filterApplier = new FilterApplierImpl();
            try {
                filterApplier.applyAll(preFilterprocessingContext, this.filter, new PostFilterActions<ProcessingContext<T>>(){

                    @Override
                    public void onAllPassed(ProcessingContext<T> processingContext) {
                        AdvanceMessageUsingFilter.this.nextPipe.send(processingContext);
                    }

                    @Override
                    public void onBlock(ProcessingContext<T> processingContext) {
                        AdvanceMessageUsingFilter.this.eventListener.messageBlocked(processingContext);
                    }

                    @Override
                    public void onForgotten(ProcessingContext<T> processingContext) {
                        AdvanceMessageUsingFilter.this.eventListener.messageForgotten(processingContext);
                    }
                });
            }
            catch (Exception e) {
                if (e instanceof BubbleUpWrappedException) {
                    throw e;
                }
                try {
                    this.eventListener.exceptionInFilter(preFilterprocessingContext, e);
                    this.exceptionHandler.handleFilterException(preFilterprocessingContext, e);
                }
                catch (BubbleUpWrappedException bubbledException) {
                    throw bubbledException;
                }
                catch (Exception rethrownException) {
                    throw new BubbleUpWrappedException(e);
                }
            }
        }

        private AdvanceMessageUsingFilter(List<Filter<ProcessingContext<T>>> filter, Pipe<ProcessingContext<T>> nextPipe, ChannelEventListener<ProcessingContext<T>> eventListener, ChannelExceptionHandler<T> exceptionHandler) {
            this.filter = filter;
            this.nextPipe = nextPipe;
            this.eventListener = eventListener;
            this.exceptionHandler = exceptionHandler;
        }
    }

    private final class ConsumerExecutingActionSetByFilterOrDefaultAction
    implements Consumer<ProcessingContext<T>> {
        private ConsumerExecutingActionSetByFilterOrDefaultAction() {
        }

        @Override
        public void accept(ProcessingContext<T> processingContext) {
            Action action;
            Action actionSetByFilter = processingContext.getAction();
            if (actionSetByFilter != null) {
                action = actionSetByFilter;
            } else {
                action = ChannelImpl.this.defaultAction;
                processingContext.changeAction(ChannelImpl.this.defaultAction);
            }
            ActionHandler actionHandler = ChannelImpl.this.actionHandlerSet.getActionHandlerFor(action);
            actionHandler.handle(action, processingContext);
        }
    }
}

