package io.engineblock.activityimpl.motor;

import com.codahale.metrics.Timer;
import io.engineblock.activityapi.core.Action;
import io.engineblock.activityapi.core.Activity;
import io.engineblock.activityapi.core.ActivityDefObserver;
import io.engineblock.activityapi.core.AsyncAction;
import io.engineblock.activityapi.core.Motor;
import io.engineblock.activityapi.core.MultiPhaseAction;
import io.engineblock.activityapi.core.RunState;
import io.engineblock.activityapi.core.Startable;
import io.engineblock.activityapi.core.Stoppable;
import io.engineblock.activityapi.core.SyncAction;
import io.engineblock.activityapi.core.ops.fluent.OpTracker;
import io.engineblock.activityapi.core.ops.fluent.OpTrackerImpl;
import io.engineblock.activityapi.core.ops.fluent.opfacets.TrackedOp;
import io.engineblock.activityapi.cyclelog.buffers.op_output.StrideOutputConsumer;
import io.engineblock.activityapi.cyclelog.buffers.results.CycleResultSegmentBuffer;
import io.engineblock.activityapi.cyclelog.buffers.results.CycleResultsSegment;
import io.engineblock.activityapi.cyclelog.buffers.results.CycleSegment;
import io.engineblock.activityapi.input.Input;
import io.engineblock.activityapi.output.Output;
import io.engineblock.activityapi.ratelimits.RateLimiter;
import io.engineblock.activityimpl.ActivityDef;
import io.engineblock.activityimpl.SlotStateTracker;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/engineblock/activityimpl/motor/CoreMotor.class */
public class CoreMotor<D> implements ActivityDefObserver, Motor<D>, Stoppable {
    private static final Logger logger = LoggerFactory.getLogger(CoreMotor.class);
    private long slotId;
    private Timer inputTimer;
    private RateLimiter strideRateLimiter;
    private Timer stridesServiceTimer;
    private Timer stridesResponseTimer;
    private RateLimiter cycleRateLimiter;
    private Timer cyclesTimer;
    private Timer cycleResponseTimer;
    private RateLimiter phaseRateLimiter;
    private Timer phasesTimer;
    private Input input;
    private Action action;
    private Activity activity;
    private Output output;
    private SlotStateTracker slotStateTracker;
    private AtomicReference<RunState> slotState;
    private int stride;
    private OpTracker<D> opTracker;

    public CoreMotor(Activity activity, long j, Input input) {
        this.stride = 1;
        this.activity = activity;
        this.slotId = j;
        setInput(input);
        this.slotStateTracker = new SlotStateTracker(j);
        this.slotState = this.slotStateTracker.getAtomicSlotState();
        onActivityDefUpdate(activity.getActivityDef());
    }

    public CoreMotor(Activity activity, long j, Input input, Action action) {
        this(activity, j, input);
        setAction(action);
    }

    public CoreMotor(Activity activity, long j, Input input, Action action, Output output) {
        this(activity, j, input);
        setAction(action);
        setResultOutput(output);
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Motor<D> setInput(Input input) {
        this.input = input;
        return this;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Input getInput() {
        return this.input;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Motor<D> setAction(Action action) {
        this.action = action;
        return this;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public Action getAction() {
        return this.action;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public long getSlotId() {
        return this.slotId;
    }

    @Override // io.engineblock.activityapi.core.Motor
    public SlotStateTracker getSlotStateTracker() {
        return this.slotStateTracker;
    }

    @Override // java.lang.Runnable
    public void run() {
        Timer.Context time;
        try {
            this.strideRateLimiter = this.activity.getStrideLimiter();
            this.cycleRateLimiter = this.activity.getCycleLimiter();
            this.phaseRateLimiter = this.activity.getPhaseLimiter();
            this.stridesServiceTimer = this.activity.getInstrumentation().getOrCreateStridesServiceTimer();
            this.stridesResponseTimer = this.activity.getInstrumentation().getStridesResponseTimerOrNull();
            this.inputTimer = this.activity.getInstrumentation().getOrCreateInputTimer();
            if (this.slotState.get() == RunState.Finished) {
                logger.warn("Input was already exhausted for slot " + this.slotId + ", remaining in finished state.");
            }
            this.slotStateTracker.enterState(RunState.Running);
            MultiPhaseAction multiPhaseAction = null;
            if (this.action instanceof MultiPhaseAction) {
                multiPhaseAction = (MultiPhaseAction) this.action;
            }
            this.action.init();
            if (this.input instanceof Startable) {
                ((Startable) this.input).start();
            }
            if (this.strideRateLimiter != null) {
                this.strideRateLimiter.start();
            }
            long j = 0;
            long j2 = 0;
            long j3 = 0;
            if (this.action instanceof AsyncAction) {
                AsyncAction asyncAction = (AsyncAction) AsyncAction.class.cast(this.action);
                this.opTracker = new OpTrackerImpl(this.activity, this.slotId);
                this.opTracker.setCycleOpFunction(asyncAction.getOpInitFunction());
                StrideOutputConsumer strideOutputConsumer = null;
                if (this.action instanceof StrideOutputConsumer) {
                    strideOutputConsumer = (StrideOutputConsumer) asyncAction;
                }
                while (this.slotState.get() == RunState.Running) {
                    time = this.inputTimer.time();
                    try {
                        CycleSegment inputSegment = this.input.getInputSegment(this.stride);
                        if (time != null) {
                            time.close();
                        }
                        if (inputSegment == null) {
                            logger.debug("input exhausted (input " + this.input + ") via null segment, stopping motor thread " + this.slotId);
                            this.slotStateTracker.enterState(RunState.Finished);
                        } else {
                            if (this.strideRateLimiter != null) {
                                j = this.strideRateLimiter.maybeWaitForOp();
                            }
                            StrideTracker strideTracker = new StrideTracker(this.stridesServiceTimer, this.stridesResponseTimer, j, inputSegment.peekNextCycle(), this.stride, this.output, strideOutputConsumer);
                            strideTracker.start();
                            System.nanoTime();
                            while (!inputSegment.isExhausted() && this.slotState.get() == RunState.Running) {
                                long nextCycle = inputSegment.nextCycle();
                                if (nextCycle < 0 && inputSegment.isExhausted()) {
                                    logger.trace("input exhausted (input " + this.input + ") via negative read, stopping motor thread " + this.slotId);
                                    this.slotStateTracker.enterState(RunState.Finished);
                                } else if (this.slotState.get() != RunState.Running) {
                                    Logger logger2 = logger;
                                    long j4 = this.slotId;
                                    logger2.trace("motor stopped in cycle " + nextCycle + ", stopping motor thread " + logger2);
                                } else {
                                    if (this.cycleRateLimiter != null) {
                                        j2 = this.cycleRateLimiter.maybeWaitForOp();
                                    }
                                    try {
                                        TrackedOp<D> newOp = this.opTracker.newOp(nextCycle, strideTracker);
                                        newOp.setWaitTime(j2);
                                        synchronized (this.opTracker) {
                                            while (this.opTracker.isFull()) {
                                                try {
                                                    logger.trace("Blocking for enqueue with (" + this.opTracker.getPendingOps() + "/" + this.opTracker.getMaxPendingOps() + ") queued ops");
                                                    this.opTracker.wait(10000L);
                                                } catch (InterruptedException e) {
                                                }
                                            }
                                        }
                                        asyncAction.enqueue(newOp);
                                    } catch (Exception e2) {
                                        Logger logger3 = logger;
                                        logger3.error("Error while processing async cycle " + nextCycle + ", error:" + logger3);
                                        throw e2;
                                    }
                                }
                            }
                        }
                    } finally {
                    }
                }
                if (this.slotState.get() == RunState.Finished) {
                    if (this.opTracker.awaitCompletion(60000L)) {
                        logger.debug("slot " + this.slotId + " completed successfully");
                    } else {
                        logger.warn("slot " + this.slotId + " was stopped before completing successfully");
                    }
                }
                if (this.slotState.get() == RunState.Stopping) {
                    this.slotStateTracker.enterState(RunState.Stopped);
                }
            } else {
                if (!(this.action instanceof SyncAction)) {
                    throw new RuntimeException("Valid Action implementations must implement either the SyncAction or the AsyncAction sub-interface");
                }
                this.cyclesTimer = this.activity.getInstrumentation().getOrCreateCyclesServiceTimer();
                this.stridesServiceTimer = this.activity.getInstrumentation().getOrCreateStridesServiceTimer();
                this.phasesTimer = this.activity.getInstrumentation().getOrCreatePhasesServiceTimer();
                if (this.activity.getActivityDef().getParams().containsKey("async")) {
                    throw new RuntimeException("The async parameter was given for this activity, but it does not seem to know how to do async.");
                }
                SyncAction syncAction = (SyncAction) this.action;
                while (this.slotState.get() == RunState.Running) {
                    CycleResultSegmentBuffer cycleResultSegmentBuffer = new CycleResultSegmentBuffer(this.stride);
                    time = this.inputTimer.time();
                    try {
                        CycleSegment inputSegment2 = this.input.getInputSegment(this.stride);
                        if (time != null) {
                            time.close();
                        }
                        if (inputSegment2 == null) {
                            logger.debug("input exhausted (input " + this.input + ") via null segment, stopping motor thread " + this.slotId);
                            this.slotStateTracker.enterState(RunState.Finished);
                        } else {
                            if (this.strideRateLimiter != null) {
                                j = this.strideRateLimiter.maybeWaitForOp();
                            }
                            long nanoTime = System.nanoTime();
                            while (!inputSegment2.isExhausted()) {
                                try {
                                    long nextCycle2 = inputSegment2.nextCycle();
                                    if (nextCycle2 < 0 && inputSegment2.isExhausted()) {
                                        logger.trace("input exhausted (input " + this.input + ") via negative read, stopping motor thread " + this.slotId);
                                        this.slotStateTracker.enterState(RunState.Finished);
                                    } else if (this.slotState.get() != RunState.Running) {
                                        Logger logger4 = logger;
                                        long j5 = this.slotId;
                                        logger4.trace("motor stopped after input (input " + nextCycle2 + "), stopping motor thread " + logger4);
                                    } else {
                                        if (this.cycleRateLimiter != null) {
                                            j2 = this.cycleRateLimiter.maybeWaitForOp();
                                        }
                                        long nanoTime2 = System.nanoTime();
                                        try {
                                            logger.trace("cycle " + nextCycle2);
                                            long nanoTime3 = System.nanoTime();
                                            if (this.phaseRateLimiter != null) {
                                                j3 = this.phaseRateLimiter.maybeWaitForOp();
                                            }
                                            int runCycle = syncAction.runCycle(nextCycle2);
                                            this.phasesTimer.update((System.nanoTime() - nanoTime3) + j3, TimeUnit.NANOSECONDS);
                                            if (multiPhaseAction != null) {
                                                while (multiPhaseAction.incomplete()) {
                                                    long nanoTime4 = System.nanoTime();
                                                    if (this.phaseRateLimiter != null) {
                                                        j3 = this.phaseRateLimiter.maybeWaitForOp();
                                                    }
                                                    runCycle = multiPhaseAction.runPhase(nextCycle2);
                                                    this.phasesTimer.update((System.nanoTime() - nanoTime4) + j3, TimeUnit.NANOSECONDS);
                                                }
                                            }
                                            this.cyclesTimer.update((System.nanoTime() - nanoTime2) + j2, TimeUnit.NANOSECONDS);
                                            cycleResultSegmentBuffer.append(nextCycle2, runCycle);
                                        } finally {
                                        }
                                    }
                                } catch (Throwable th) {
                                    this.stridesServiceTimer.update((System.nanoTime() - nanoTime) + j, TimeUnit.NANOSECONDS);
                                    throw th;
                                }
                            }
                            this.stridesServiceTimer.update((System.nanoTime() - nanoTime) + j, TimeUnit.NANOSECONDS);
                            if (this.output != null) {
                                CycleResultsSegment reader = cycleResultSegmentBuffer.toReader();
                                try {
                                    this.output.onCycleResultSegment(reader);
                                } catch (Exception e3) {
                                    logger.error("Error while feeding result segment " + reader + " to output '" + this.output + "', error:" + e3);
                                    throw e3;
                                }
                            }
                        }
                    } finally {
                    }
                }
                if (this.slotState.get() == RunState.Stopping) {
                    this.slotStateTracker.enterState(RunState.Stopped);
                }
            }
        } catch (Throwable th2) {
            logger.error("Error in core motor loop:" + th2, th2);
            throw th2;
        }
    }

    public String toString() {
        long j = this.slotId;
        this.slotState.get();
        return "slot:" + j + "; state:" + j;
    }

    @Override // io.engineblock.activityapi.core.ActivityDefObserver
    public void onActivityDefUpdate(ActivityDef activityDef) {
        for (Object obj : new Object[]{this.input, this.opTracker, this.action, this.output}) {
            if (obj instanceof ActivityDefObserver) {
                ((ActivityDefObserver) obj).onActivityDefUpdate(activityDef);
            }
        }
        this.stride = activityDef.getParams().getOptionalInteger("stride").orElse(1).intValue();
    }

    @Override // io.engineblock.activityapi.core.Stoppable
    public synchronized void requestStop() {
        if (this.slotState.get() == RunState.Running) {
            if (this.input instanceof Stoppable) {
                ((Stoppable) this.input).requestStop();
            }
            if (this.action instanceof Stoppable) {
                ((Stoppable) this.action).requestStop();
            }
            this.slotStateTracker.enterState(RunState.Stopping);
            return;
        }
        if (this.slotState.get() == RunState.Stopped || this.slotState.get() == RunState.Stopping) {
            return;
        }
        Logger logger2 = logger;
        long slotId = getSlotId();
        this.slotState.get();
        logger2.warn("attempted to stop motor " + slotId + ": from non Running state:" + logger2);
    }

    public void setResultOutput(Output output) {
        this.output = output;
    }
}
