package io.engineblock.activityimpl.motor;

import com.codahale.metrics.Timer;
import io.engineblock.activityapi.core.Action;
import io.engineblock.activityapi.core.Activity;
import io.engineblock.activityapi.core.ActivityDefObserver;
import io.engineblock.activityapi.core.AsyncAction;
import io.engineblock.activityapi.core.Motor;
import io.engineblock.activityapi.core.MultiPhaseAction;
import io.engineblock.activityapi.core.OpContext;
import io.engineblock.activityapi.core.OpResultBuffer;
import io.engineblock.activityapi.core.RunState;
import io.engineblock.activityapi.core.Startable;
import io.engineblock.activityapi.core.Stoppable;
import io.engineblock.activityapi.core.SyncAction;
import io.engineblock.activityapi.cyclelog.buffers.cycles.CycleSegment;
import io.engineblock.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer;
import io.engineblock.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.engineblock.activityapi.input.Input;
import io.engineblock.activityapi.output.Output;
import io.engineblock.activityapi.rates.RateLimiter;
import io.engineblock.activityimpl.ActivityDef;
import io.engineblock.activityimpl.SlotStateTracker;
import io.engineblock.metrics.ActivityMetrics;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/engineblock/activityimpl/motor/CoreMotor.class */
public class CoreMotor implements ActivityDefObserver, Motor, Stoppable, OpResultBuffer.Sink<OpContext> {
    private static final Logger logger = LoggerFactory.getLogger(CoreMotor.class);
    Timer cyclesTimer;
    Timer phasesTimer;
    Timer stridesTimer;
    Timer inputTimer;
    private long slotId;
    private Input input;
    private Action action;
    private Activity activity;
    private SlotStateTracker slotStateTracker;
    private AtomicReference<RunState> slotState;
    private int stride;
    private Output output;
    private RateLimiter strideRateLimiter;
    private RateLimiter cycleRateLimiter;
    private RateLimiter phaseRateLimiter;

    /* loaded from: input_file:io/engineblock/activityimpl/motor/CoreMotor$StrideResultBuffer.class */
    public static class StrideResultBuffer extends OpResultBuffer<OpContext> {
        private final Timer cycleTimer;

        public StrideResultBuffer(Timer timer, long j, long j2, OpResultBuffer.Sink<OpContext> sink, int i) {
            super(new OpContext(null, j2, j), sink, OpContext[].class, i);
            this.cycleTimer = timer;
        }

        @Override // io.engineblock.activityapi.core.OpResultBuffer, io.engineblock.activityapi.core.OpContext.Sink
        public void handle(OpContext opContext) {
            this.cycleTimer.update(opContext.getTotalLatency(), TimeUnit.NANOSECONDS);
            super.handle(opContext);
        }
    }

    public CoreMotor(Activity activity, long j, Input input) {
        this.stride = 1;
        this.activity = activity;
        this.slotId = j;
        setInput(input);
        this.slotStateTracker = new SlotStateTracker(j);
        this.slotState = this.slotStateTracker.getAtomicSlotState();
        onActivityDefUpdate(activity.getActivityDef());
    }

    public CoreMotor(Activity activity, long j, Input input, Action action) {
        this(activity, j, input);
        setAction(action);
    }

    public CoreMotor(Activity activity, long j, Input input, Action action, Output output) {
        this(activity, j, input);
        setAction(action);
        setResultOutput(output);
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Motor setInput(Input input) {
        this.input = input;
        return this;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Input getInput() {
        return this.input;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Motor setAction(Action action) {
        this.action = action;
        return this;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Action getAction() {
        return this.action;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public long getSlotId() {
        return this.slotId;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public SlotStateTracker getSlotStateTracker() {
        return this.slotStateTracker;
    }

    @Override // java.lang.Runnable
    public void run() {
        Timer.Context time;
        try {
            this.cyclesTimer = ActivityMetrics.timer(this.activity.getActivityDef(), "cycles");
            this.phasesTimer = ActivityMetrics.timer(this.activity.getActivityDef(), "phases");
            this.stridesTimer = ActivityMetrics.timer(this.activity.getActivityDef(), "strides");
            this.inputTimer = ActivityMetrics.timer(this.activity.getActivityDef(), "read_input");
            this.strideRateLimiter = this.activity.getStrideLimiter();
            this.cycleRateLimiter = this.activity.getCycleLimiter();
            this.phaseRateLimiter = this.activity.getPhaseLimiter();
            if (this.slotState.get() == RunState.Finished) {
                logger.warn("Input was already exhausted for slot " + this.slotId + ", remaining in finished state.");
            }
            this.slotStateTracker.enterState(RunState.Running);
            MultiPhaseAction multiPhaseAction = this.action instanceof MultiPhaseAction ? (MultiPhaseAction) this.action : null;
            this.action.init();
            if (this.input instanceof Startable) {
                ((Startable) this.input).start();
            }
            if (this.strideRateLimiter != null) {
                this.strideRateLimiter.start();
            }
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            if (this.action instanceof AsyncAction) {
                AsyncAction asyncAction = (AsyncAction) this.action;
                while (this.slotState.get() == RunState.Running) {
                    time = this.inputTimer.time();
                    Throwable th = null;
                    try {
                        try {
                            CycleSegment inputSegment = this.input.getInputSegment(this.stride);
                            if (time != null) {
                                if (0 != 0) {
                                    try {
                                        time.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    time.close();
                                }
                            }
                            if (inputSegment == null) {
                                logger.debug("input exhausted (input " + this.input + ") via null segment, stopping motor thread " + this.slotId);
                                this.slotStateTracker.enterState(RunState.Finished);
                            } else {
                                if (this.strideRateLimiter != null) {
                                    j = this.strideRateLimiter.acquire();
                                }
                                StrideResultBuffer strideResultBuffer = new StrideResultBuffer(this.cyclesTimer, j, inputSegment.peekNextCycle(), this, this.stride);
                                System.nanoTime();
                                while (!inputSegment.isExhausted()) {
                                    long nextCycle = inputSegment.nextCycle();
                                    if (nextCycle < 0 && inputSegment.isExhausted()) {
                                        logger.trace("input exhausted (input " + this.input + ") via negative read, stopping motor thread " + this.slotId);
                                        this.slotStateTracker.enterState(RunState.Finished);
                                    } else if (this.slotState.get() != RunState.Running) {
                                        logger.trace("motor stopped after input (input " + nextCycle + "), stopping motor thread " + this.slotId);
                                    } else {
                                        if (this.cycleRateLimiter != null) {
                                            j2 = this.cycleRateLimiter.acquire();
                                        }
                                        try {
                                            if (!asyncAction.enqueue(new OpContext(strideResultBuffer, nextCycle, j2))) {
                                                logger.trace("Action queue full at cycle=" + nextCycle);
                                                this.cyclesTimer.update(asyncAction.dequeue().getTotalLatency(), TimeUnit.NANOSECONDS);
                                            }
                                        } catch (Exception e) {
                                            logger.error("Error while processing async cycle " + nextCycle + ", error:" + e);
                                            throw e;
                                        }
                                    }
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
                for (OpContext dequeue = asyncAction.dequeue(); dequeue != null; dequeue = asyncAction.dequeue()) {
                    if (this.output != null) {
                        try {
                            this.output.onCycleResult(dequeue);
                        } catch (Exception e2) {
                            logger.error("Error while feeding cycle result  " + dequeue + " to output '" + this.output + "', error:" + e2);
                            throw e2;
                        }
                    }
                }
                if (this.slotState.get() == RunState.Stopping) {
                    this.slotStateTracker.enterState(RunState.Stopped);
                }
            } else {
                if (!(this.action instanceof SyncAction)) {
                    throw new RuntimeException("Valid Action implementations must implement either the SyncAction or the AsyncAction sub-interface");
                }
                SyncAction syncAction = (SyncAction) this.action;
                while (this.slotState.get() == RunState.Running) {
                    CycleResultSegmentBuffer cycleResultSegmentBuffer = new CycleResultSegmentBuffer(this.stride);
                    time = this.inputTimer.time();
                    Throwable th3 = null;
                    try {
                        try {
                            CycleSegment inputSegment2 = this.input.getInputSegment(this.stride);
                            if (time != null) {
                                if (0 != 0) {
                                    try {
                                        time.close();
                                    } catch (Throwable th4) {
                                        th3.addSuppressed(th4);
                                    }
                                } else {
                                    time.close();
                                }
                            }
                            if (inputSegment2 == null) {
                                logger.debug("input exhausted (input " + this.input + ") via null segment, stopping motor thread " + this.slotId);
                                this.slotStateTracker.enterState(RunState.Finished);
                            } else {
                                if (this.strideRateLimiter != null) {
                                    j = this.strideRateLimiter.acquire();
                                }
                                long nanoTime = System.nanoTime();
                                while (!inputSegment2.isExhausted()) {
                                    try {
                                        long nextCycle2 = inputSegment2.nextCycle();
                                        if (nextCycle2 < 0 && inputSegment2.isExhausted()) {
                                            logger.trace("input exhausted (input " + this.input + ") via negative read, stopping motor thread " + this.slotId);
                                            this.slotStateTracker.enterState(RunState.Finished);
                                        } else if (this.slotState.get() != RunState.Running) {
                                            logger.trace("motor stopped after input (input " + nextCycle2 + "), stopping motor thread " + this.slotId);
                                        } else {
                                            if (this.cycleRateLimiter != null) {
                                                j2 = this.cycleRateLimiter.acquire();
                                            }
                                            long nanoTime2 = System.nanoTime();
                                            try {
                                                logger.trace("cycle " + nextCycle2);
                                                long nanoTime3 = System.nanoTime();
                                                if (this.phaseRateLimiter != null) {
                                                    j3 = this.phaseRateLimiter.acquire();
                                                }
                                                int runCycle = syncAction.runCycle(nextCycle2);
                                                this.phasesTimer.update((System.nanoTime() - nanoTime3) + j3, TimeUnit.NANOSECONDS);
                                                if (multiPhaseAction != null) {
                                                    while (multiPhaseAction.incomplete()) {
                                                        long nanoTime4 = System.nanoTime();
                                                        if (this.phaseRateLimiter != null) {
                                                            j3 = this.phaseRateLimiter.acquire();
                                                        }
                                                        runCycle = multiPhaseAction.runPhase(nextCycle2);
                                                        this.phasesTimer.update((System.nanoTime() - nanoTime4) + j3, TimeUnit.NANOSECONDS);
                                                    }
                                                }
                                                this.cyclesTimer.update((System.nanoTime() - nanoTime2) + j2, TimeUnit.NANOSECONDS);
                                                cycleResultSegmentBuffer.append(nextCycle2, runCycle);
                                            } finally {
                                            }
                                        }
                                    } catch (Throwable th5) {
                                        this.stridesTimer.update((System.nanoTime() - nanoTime) + j, TimeUnit.NANOSECONDS);
                                        throw th5;
                                    }
                                }
                                this.stridesTimer.update((System.nanoTime() - nanoTime) + j, TimeUnit.NANOSECONDS);
                                if (this.output != null) {
                                    CycleResultsSegment reader = cycleResultSegmentBuffer.toReader();
                                    try {
                                        this.output.onCycleResultSegment(reader);
                                    } catch (Exception e3) {
                                        logger.error("Error while feeding result segment " + reader + " to output '" + this.output + "', error:" + e3);
                                        throw e3;
                                    }
                                }
                            }
                        } finally {
                        }
                    } finally {
                    }
                }
                if (this.slotState.get() == RunState.Stopping) {
                    this.slotStateTracker.enterState(RunState.Stopped);
                }
            }
        } catch (Throwable th6) {
            logger.error("Error in core motor loop:" + th6, th6);
            throw th6;
        }
    }

    public String toString() {
        return "slot:" + this.slotId + "; state:" + this.slotState.get();
    }

    @Override // io.engineblock.activityapi.core.ActivityDefObserver
    public void onActivityDefUpdate(ActivityDef activityDef) {
        for (Object obj : new Object[]{this.input, this.action, this.output}) {
            if (obj != null && (obj instanceof ActivityDefObserver)) {
                ((ActivityDefObserver) obj).onActivityDefUpdate(activityDef);
            }
        }
        this.stride = activityDef.getParams().getOptionalInteger("stride").orElse(1).intValue();
    }

    @Override // io.engineblock.activityapi.core.Stoppable
    public synchronized void requestStop() {
        if (this.slotState.get() != RunState.Running) {
            if (this.slotState.get() == RunState.Stopped || this.slotState.get() == RunState.Stopping) {
                return;
            }
            logger.warn("attempted to stop motor " + getSlotId() + ": from non Running state:" + this.slotState.get());
            return;
        }
        if (this.input instanceof Stoppable) {
            ((Stoppable) this.input).requestStop();
        }
        if (this.action instanceof Stoppable) {
            ((Stoppable) this.action).requestStop();
        }
        this.slotStateTracker.enterState(RunState.Stopping);
    }

    public void setResultOutput(Output output) {
        this.output = output;
    }

    @Override // io.engineblock.activityapi.core.OpResultBuffer.Sink
    public void handle(OpResultBuffer<OpContext> opResultBuffer) {
        OpContext opContext = opResultBuffer.get();
        opContext.stop();
        this.stridesTimer.update(opContext.getTotalLatency(), TimeUnit.NANOSECONDS);
        logger.trace("completed stride with first result cycle (" + opContext.getCycle() + ")");
        if (this.output != null) {
            int remaining = opResultBuffer.remaining();
            for (int i = 0; i < remaining; i++) {
                OpContext opContext2 = opResultBuffer.get();
                this.output.onCycleResult(opContext2.getCycle(), opContext2.getResult());
            }
        }
    }
}
