package org.apache.ignite.internal.sql.engine.exec;

import it.unimi.dsi.fastutil.longs.Long2ObjectMaps;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.rel.Inbox;
import org.apache.ignite.internal.sql.engine.exec.rel.Outbox;
import org.apache.ignite.internal.sql.engine.message.InboxCloseMessage;
import org.apache.ignite.internal.sql.engine.message.MessageService;
import org.apache.ignite.internal.sql.engine.message.QueryBatchAcknowledgeMessage;
import org.apache.ignite.internal.sql.engine.message.QueryBatchMessage;
import org.apache.ignite.internal.sql.engine.message.SqlQueryMessagesFactory;
import org.apache.ignite.internal.sql.engine.metadata.FragmentDescription;
import org.apache.ignite.internal.sql.engine.util.BaseQueryContext;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.lang.ErrorGroups;
import org.apache.ignite.lang.IgniteInternalCheckedException;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.network.ClusterNode;

/* loaded from: input_file:org/apache/ignite/internal/sql/engine/exec/ExchangeServiceImpl.class */
public class ExchangeServiceImpl implements ExchangeService {
    private static final IgniteLogger LOG = Loggers.forClass(ExchangeServiceImpl.class);
    private static final SqlQueryMessagesFactory FACTORY = new SqlQueryMessagesFactory();
    private final ClusterNode localNode;
    private final QueryTaskExecutor taskExecutor;
    private final MailboxRegistry mailboxRegistry;
    private final MessageService msgSrvc;

    public ExchangeServiceImpl(ClusterNode clusterNode, QueryTaskExecutor queryTaskExecutor, MailboxRegistry mailboxRegistry, MessageService messageService) {
        this.localNode = clusterNode;
        this.taskExecutor = queryTaskExecutor;
        this.mailboxRegistry = mailboxRegistry;
        this.msgSrvc = messageService;
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.LifecycleAware
    public void start() {
        this.msgSrvc.register((str, networkMessage) -> {
            onMessage(str, (InboxCloseMessage) networkMessage);
        }, (short) 5);
        this.msgSrvc.register((str2, networkMessage2) -> {
            onMessage(str2, (QueryBatchAcknowledgeMessage) networkMessage2);
        }, (short) 4);
        this.msgSrvc.register((str3, networkMessage3) -> {
            onMessage(str3, (QueryBatchMessage) networkMessage3);
        }, (short) 3);
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public <RowT> void sendBatch(String str, UUID uuid, long j, long j2, int i, boolean z, List<RowT> list) throws IgniteInternalCheckedException {
        this.msgSrvc.send(str, FACTORY.queryBatchMessage().queryId(uuid).fragmentId(j).exchangeId(j2).batchId(i).last(z).rows(Commons.cast(list)).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public void acknowledge(String str, UUID uuid, long j, long j2, int i) throws IgniteInternalCheckedException {
        this.msgSrvc.send(str, FACTORY.queryBatchAcknowledgeMessage().queryId(uuid).fragmentId(j).exchangeId(j2).batchId(i).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public void closeQuery(String str, UUID uuid) throws IgniteInternalCheckedException {
        this.msgSrvc.send(str, FACTORY.queryCloseMessage().queryId(uuid).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public void closeInbox(String str, UUID uuid, long j, long j2) throws IgniteInternalCheckedException {
        this.msgSrvc.send(str, FACTORY.inboxCloseMessage().queryId(uuid).fragmentId(j).exchangeId(j2).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public void sendError(String str, UUID uuid, long j, Throwable th) throws IgniteInternalCheckedException {
        this.msgSrvc.send(str, FACTORY.errorMessage().queryId(uuid).fragmentId(j).error(th).build());
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.ExchangeService
    public boolean alive(String str) {
        return this.msgSrvc.alive(str);
    }

    protected void onMessage(String str, InboxCloseMessage inboxCloseMessage) {
        Collection<Inbox<?>> inboxes = this.mailboxRegistry.inboxes(inboxCloseMessage.queryId(), inboxCloseMessage.fragmentId(), inboxCloseMessage.exchangeId());
        if (CollectionUtils.nullOrEmpty(inboxes)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Stale inbox cancel message received [nodeId={}, queryId={}, fragmentId={}, exchangeId={}]", new Object[]{str, inboxCloseMessage.queryId(), Long.valueOf(inboxCloseMessage.fragmentId()), Long.valueOf(inboxCloseMessage.exchangeId())});
                return;
            }
            return;
        }
        for (Inbox<?> inbox : inboxes) {
            ExecutionContext<?> context = inbox.context();
            Objects.requireNonNull(inbox);
            ExecutionContext.RunnableX runnableX = inbox::close;
            Objects.requireNonNull(inbox);
            context.execute(runnableX, inbox::onError);
        }
    }

    protected void onMessage(String str, QueryBatchAcknowledgeMessage queryBatchAcknowledgeMessage) {
        Outbox<?> outbox = this.mailboxRegistry.outbox(queryBatchAcknowledgeMessage.queryId(), queryBatchAcknowledgeMessage.exchangeId());
        if (outbox == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Stale acknowledge message received: [nodeId={}, queryId={}, fragmentId={}, exchangeId={}, batchId={}]", new Object[]{str, queryBatchAcknowledgeMessage.queryId(), Long.valueOf(queryBatchAcknowledgeMessage.fragmentId()), Long.valueOf(queryBatchAcknowledgeMessage.exchangeId()), Integer.valueOf(queryBatchAcknowledgeMessage.batchId())});
            }
        } else {
            try {
                outbox.onAcknowledge(str, queryBatchAcknowledgeMessage.batchId());
            } catch (Throwable th) {
                outbox.onError(th);
                throw new IgniteInternalException(ErrorGroups.Common.UNEXPECTED_ERR, "Unexpected exception", th);
            }
        }
    }

    protected void onMessage(String str, QueryBatchMessage queryBatchMessage) {
        Inbox<?> inbox = this.mailboxRegistry.inbox(queryBatchMessage.queryId(), queryBatchMessage.exchangeId());
        if (inbox == null && queryBatchMessage.batchId() == 0) {
            inbox = this.mailboxRegistry.register(new Inbox(baseInboxContext(str, queryBatchMessage.queryId(), queryBatchMessage.fragmentId()), this, this.mailboxRegistry, queryBatchMessage.exchangeId(), queryBatchMessage.exchangeId()));
        }
        if (inbox == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Stale batch message received: [nodeId={}, queryId={}, fragmentId={}, exchangeId={}, batchId={}]", new Object[]{str, queryBatchMessage.queryId(), Long.valueOf(queryBatchMessage.fragmentId()), Long.valueOf(queryBatchMessage.exchangeId()), Integer.valueOf(queryBatchMessage.batchId())});
            }
        } else {
            try {
                inbox.onBatchReceived(str, queryBatchMessage.batchId(), queryBatchMessage.last(), Commons.cast(queryBatchMessage.rows()));
            } catch (Throwable th) {
                inbox.onError(th);
                throw new IgniteInternalException(ErrorGroups.Common.UNEXPECTED_ERR, "Unexpected exception", th);
            }
        }
    }

    private ExecutionContext<?> baseInboxContext(String str, UUID uuid, long j) {
        return new ExecutionContext<>(BaseQueryContext.builder().logger(LOG).build(), this.taskExecutor, uuid, this.localNode, str, new FragmentDescription(j, null, null, Long2ObjectMaps.emptyMap()), null, Map.of(), null);
    }

    @Override // org.apache.ignite.internal.sql.engine.exec.LifecycleAware
    public void stop() {
    }
}
