package org.apache.flink.statefun.flink.core.functions;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.statefun.flink.core.backpressure.InternalContext;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Label;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetrics;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.sdk.Address;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.statefun.sdk.metrics.Metrics;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/ReusableContext.class */
final class ReusableContext implements ApplyingContext, InternalContext {
    private final Partition thisPartition;
    private final LocalSink localSink;
    private final RemoteSink remoteSink;
    private final DelaySink delaySink;
    private final AsyncSink asyncSink;
    private final SideOutputSink sideOutputSink;
    private final State state;
    private final MessageFactory messageFactory;
    private Message in;
    private LiveFunction function;

    @Inject
    ReusableContext(Partition partition, LocalSink localSink, RemoteSink remoteSink, DelaySink delaySink, AsyncSink asyncSink, SideOutputSink sideOutputSink, @Label("state") State state, MessageFactory messageFactory) {
        this.thisPartition = (Partition) Objects.requireNonNull(partition);
        this.localSink = (LocalSink) Objects.requireNonNull(localSink);
        this.remoteSink = (RemoteSink) Objects.requireNonNull(remoteSink);
        this.delaySink = (DelaySink) Objects.requireNonNull(delaySink);
        this.sideOutputSink = (SideOutputSink) Objects.requireNonNull(sideOutputSink);
        this.state = (State) Objects.requireNonNull(state);
        this.messageFactory = (MessageFactory) Objects.requireNonNull(messageFactory);
        this.asyncSink = (AsyncSink) Objects.requireNonNull(asyncSink);
    }

    @Override // org.apache.flink.statefun.flink.core.functions.ApplyingContext
    public void apply(LiveFunction liveFunction, Message message) {
        this.in = message;
        this.function = liveFunction;
        this.state.setCurrentKey(message.target());
        liveFunction.metrics().incomingMessage();
        liveFunction.receive(this, this.in);
        this.in.postApply();
        this.in = null;
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public void send(Address address, Object obj) {
        Objects.requireNonNull(address);
        Objects.requireNonNull(obj);
        Message from = this.messageFactory.from(self(), address, obj);
        if (this.thisPartition.contains(address)) {
            this.localSink.accept(from);
            this.function.metrics().outgoingLocalMessage();
        } else {
            this.remoteSink.accept(from);
            this.function.metrics().outgoingRemoteMessage();
        }
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public <T> void send(EgressIdentifier<T> egressIdentifier, T t) {
        Objects.requireNonNull(egressIdentifier);
        Objects.requireNonNull(t);
        this.function.metrics().outgoingEgressMessage();
        this.sideOutputSink.accept(egressIdentifier, t);
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public void sendAfter(Duration duration, Address address, Object obj) {
        Objects.requireNonNull(duration);
        Objects.requireNonNull(address);
        Objects.requireNonNull(obj);
        this.delaySink.accept(this.messageFactory.from(self(), address, obj), duration.toMillis());
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public void sendAfter(Duration duration, Address address, Object obj, String str) {
        Objects.requireNonNull(duration);
        Objects.requireNonNull(address);
        Objects.requireNonNull(obj);
        Objects.requireNonNull(str);
        this.delaySink.accept(this.messageFactory.from(self(), address, obj, str), duration.toMillis());
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public void cancelDelayedMessage(String str) {
        Objects.requireNonNull(str);
        this.delaySink.removeMessageByCancellationToken(str);
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public <M, T> void registerAsyncOperation(M m, CompletableFuture<T> completableFuture) {
        Objects.requireNonNull(m);
        Objects.requireNonNull(completableFuture);
        this.asyncSink.accept(self(), this.messageFactory.from(self(), self(), m), completableFuture);
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public Metrics metrics() {
        return this.function.metrics().functionTypeScopedMetrics();
    }

    @Override // org.apache.flink.statefun.flink.core.backpressure.InternalContext
    public void awaitAsyncOperationComplete() {
        this.asyncSink.blockAddress(self());
    }

    @Override // org.apache.flink.statefun.flink.core.backpressure.InternalContext
    public FunctionTypeMetrics functionTypeMetrics() {
        return this.function.metrics();
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public Address caller() {
        return this.in.source();
    }

    @Override // org.apache.flink.statefun.sdk.Context
    public Address self() {
        return this.in.target();
    }
}
