package io.zeebe.transport.impl.actor;

import io.zeebe.dispatcher.BlockPeek;
import io.zeebe.dispatcher.Subscription;
import io.zeebe.transport.impl.ControlMessages;
import io.zeebe.transport.impl.SendFailureHandler;
import io.zeebe.transport.impl.TransportChannel;
import io.zeebe.transport.impl.TransportContext;
import io.zeebe.util.actor.Actor;
import io.zeebe.util.state.ComposedState;
import io.zeebe.util.state.SimpleStateMachineContext;
import io.zeebe.util.state.State;
import io.zeebe.util.state.StateMachine;
import io.zeebe.util.state.StateMachineAgent;
import io.zeebe.util.time.ClockUtil;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.agrona.DirectBuffer;
import org.agrona.collections.Int2ObjectHashMap;

/* loaded from: input_file:io/zeebe/transport/impl/actor/Sender.class */
public class Sender implements Actor {
    private final ActorContext actorContext;
    private final Subscription senderSubscription;
    private final int maxPeekSize;
    private final boolean isClient;
    protected final long keepAlivePeriod;
    protected final SendFailureHandler sendFailureHandler;
    private static final int DEFAULT = 0;
    private static final int DISCARD = 1;
    private static final int SEND_NEXT_KEEP_ALIVE = 2;
    private final Int2ObjectHashMap<TransportChannel> channelMap = new Int2ObjectHashMap<>();
    protected long lastKeepAlive = 0;
    private final PollState pollState = new PollState();
    private final ProcessState processState = new ProcessState();
    private final DiscardState discardState = new DiscardState();
    private final SendKeepAliveState keepAliveState = new SendKeepAliveState();
    private final StateMachine<SenderContext> stateMachine = StateMachine.builder(stateMachine -> {
        return new SenderContext(stateMachine);
    }).initialState(this.pollState).from(this.pollState).take(0).to(this.processState).from(this.pollState).take(2).to(this.keepAliveState).from(this.keepAliveState).take(2).to(this.keepAliveState).from(this.keepAliveState).take(0).to(this.pollState).from(this.processState).take(1).to(this.discardState).from(this.processState).take(0).to(this.pollState).from(this.discardState).take(0).to(this.pollState).build();
    private StateMachineAgent<SenderContext> stateMachineAgent = new StateMachineAgent<>(this.stateMachine);

    /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$DiscardState.class */
    class DiscardState implements State<SenderContext> {
        DiscardState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(SenderContext senderContext) throws Exception {
            BlockPeek blockPeek = senderContext.blockPeek;
            if (Sender.this.sendFailureHandler != null) {
                Iterator<DirectBuffer> it = blockPeek.iterator();
                while (it.hasNext()) {
                    DirectBuffer next = it.next();
                    Sender.this.sendFailureHandler.onFragment(next, 0, next.capacity(), blockPeek.getStreamId(), senderContext.failure, senderContext.failureCause);
                }
            }
            blockPeek.markFailed();
            senderContext.take(0);
            return 1;
        }
    }

    /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$PollState.class */
    class PollState implements State<SenderContext> {
        PollState() {
        }

        @Override // io.zeebe.util.state.State
        public int doWork(SenderContext senderContext) {
            senderContext.reset();
            long currentTimeInMillis = ClockUtil.getCurrentTimeInMillis();
            if (Sender.this.keepAlivePeriod > 0 && currentTimeInMillis - Sender.this.lastKeepAlive > Sender.this.keepAlivePeriod && !Sender.this.channelMap.isEmpty()) {
                senderContext.take(2);
                Sender.this.lastKeepAlive = currentTimeInMillis;
                return 1;
            }
            int peekBlock = Sender.this.senderSubscription.peekBlock(senderContext.blockPeek, Sender.this.maxPeekSize, true);
            if (peekBlock > 0) {
                senderContext.take(0);
            }
            return peekBlock;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$ProcessState.class */
    public class ProcessState extends ComposedState<SenderContext> {
        private final AwaitChannelState awaitChannelState = new AwaitChannelState();
        private final WriteState writeState = new WriteState();

        /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$ProcessState$AwaitChannelState.class */
        class AwaitChannelState implements ComposedState.Step<SenderContext> {
            AwaitChannelState() {
            }

            @Override // io.zeebe.util.state.ComposedState.Step
            public boolean doWork(SenderContext senderContext) {
                CompletableFuture<Void> completableFuture = senderContext.channelFuture;
                BlockPeek blockPeek = senderContext.blockPeek;
                TransportChannel transportChannel = (TransportChannel) Sender.this.channelMap.get(blockPeek.getStreamId());
                if (transportChannel != null && !transportChannel.isClosed()) {
                    senderContext.writeChannel = transportChannel;
                    return true;
                }
                if (!Sender.this.isClient) {
                    senderContext.failure = "Channel is not open";
                    senderContext.take(1);
                    return false;
                }
                if (completableFuture == null) {
                    senderContext.channelFuture = ((ClientActorContext) Sender.this.actorContext).requestChannel(blockPeek.getStreamId());
                    return false;
                }
                if (!completableFuture.isCancelled() && !completableFuture.isCompletedExceptionally()) {
                    return false;
                }
                try {
                    completableFuture.get();
                } catch (Exception e) {
                    senderContext.failure = "Could not open channel";
                    senderContext.failureCause = e;
                }
                senderContext.take(1);
                return false;
            }
        }

        /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$ProcessState$WriteState.class */
        class WriteState implements ComposedState.Step<SenderContext> {
            WriteState() {
            }

            @Override // io.zeebe.util.state.ComposedState.Step
            public boolean doWork(SenderContext senderContext) {
                BlockPeek blockPeek = senderContext.blockPeek;
                int write = senderContext.writeChannel.write(blockPeek.getRawBuffer());
                if (write == -1) {
                    senderContext.failure = "Could not write to channel";
                    senderContext.take(1);
                    return false;
                }
                senderContext.bytesWritten += write;
                if (senderContext.bytesWritten != blockPeek.getBlockLength()) {
                    return false;
                }
                blockPeek.markCompleted();
                senderContext.take(0);
                return true;
            }
        }

        ProcessState() {
        }

        @Override // io.zeebe.util.state.ComposedState
        protected List<ComposedState.Step<SenderContext>> steps() {
            return Arrays.asList(this.awaitChannelState, this.writeState);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$SendKeepAliveState.class */
    public class SendKeepAliveState extends ComposedState<SenderContext> {
        protected final SelectChannelStep selectStep = new SelectChannelStep();
        protected final SendKeepAliveOnChannelStep sendStep = new SendKeepAliveOnChannelStep();

        /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$SendKeepAliveState$SelectChannelStep.class */
        class SelectChannelStep implements ComposedState.Step<SenderContext> {
            SelectChannelStep() {
            }

            @Override // io.zeebe.util.state.ComposedState.Step
            public boolean doWork(SenderContext senderContext) {
                if (senderContext.channelIt == null) {
                    senderContext.channelIt = Sender.this.channelMap.values().iterator();
                }
                if (senderContext.channelIt.hasNext()) {
                    senderContext.writeChannel = senderContext.channelIt.next();
                } else {
                    senderContext.writeChannel = null;
                }
                senderContext.keepAliveBuffer.clear();
                return true;
            }
        }

        /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$SendKeepAliveState$SendKeepAliveOnChannelStep.class */
        class SendKeepAliveOnChannelStep implements ComposedState.Step<SenderContext> {
            SendKeepAliveOnChannelStep() {
            }

            @Override // io.zeebe.util.state.ComposedState.Step
            public boolean doWork(SenderContext senderContext) {
                if (senderContext.writeChannel == null) {
                    senderContext.take(0);
                    return true;
                }
                boolean z = false;
                if (senderContext.keepAliveBuffer.remaining() > 0 && senderContext.writeChannel.write(senderContext.keepAliveBuffer) < 0) {
                    z = true;
                }
                if (senderContext.keepAliveBuffer.remaining() == 0) {
                    z = true;
                }
                if (z) {
                    senderContext.take(2);
                }
                return z;
            }
        }

        SendKeepAliveState() {
        }

        @Override // io.zeebe.util.state.ComposedState
        protected List<ComposedState.Step<SenderContext>> steps() {
            return Arrays.asList(this.selectStep, this.sendStep);
        }

        @Override // io.zeebe.util.state.State
        public boolean isInterruptable() {
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:io/zeebe/transport/impl/actor/Sender$SenderContext.class */
    public static class SenderContext extends SimpleStateMachineContext {
        final BlockPeek blockPeek;
        CompletableFuture<Void> channelFuture;
        TransportChannel writeChannel;
        int bytesWritten;
        String failure;
        Exception failureCause;
        Iterator<TransportChannel> channelIt;
        final ByteBuffer keepAliveBuffer;

        SenderContext(StateMachine<?> stateMachine) {
            super(stateMachine);
            this.blockPeek = new BlockPeek();
            this.keepAliveBuffer = ByteBuffer.allocate(ControlMessages.KEEP_ALIVE.capacity());
            ControlMessages.KEEP_ALIVE.getBytes(0, this.keepAliveBuffer, ControlMessages.KEEP_ALIVE.capacity());
            this.keepAliveBuffer.flip();
        }

        @Override // io.zeebe.util.state.StateMachineContext
        public void reset() {
            this.writeChannel = null;
            this.bytesWritten = 0;
            this.channelFuture = null;
            this.channelIt = null;
            this.keepAliveBuffer.clear();
            this.failure = null;
            this.failureCause = null;
        }
    }

    public Sender(ActorContext actorContext, TransportContext transportContext) {
        this.actorContext = actorContext;
        this.senderSubscription = transportContext.getSenderSubscription();
        this.maxPeekSize = transportContext.getMessageMaxLength() * 16;
        this.isClient = actorContext instanceof ClientActorContext;
        this.sendFailureHandler = transportContext.getSendFailureHandler();
        this.keepAlivePeriod = transportContext.getChannelKeepAlivePeriod();
        actorContext.setSender(this);
    }

    @Override // io.zeebe.util.actor.Actor
    public int doWork() throws Exception {
        return this.stateMachineAgent.doWork();
    }

    public void removeChannel(TransportChannel transportChannel) {
        this.stateMachineAgent.addCommand(senderContext -> {
            this.channelMap.remove(transportChannel.getStreamId());
        });
    }

    public void registerChannel(TransportChannel transportChannel) {
        this.stateMachineAgent.addCommand(senderContext -> {
            if (this.channelMap.isEmpty()) {
                this.lastKeepAlive = ClockUtil.getCurrentTimeInMillis();
            }
            this.channelMap.put(transportChannel.getStreamId(), (int) transportChannel);
        });
    }
}
