package org.apache.flink.statefun.flink.core.functions;

import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.di.Inject;
import org.apache.flink.statefun.flink.core.di.Lazy;
import org.apache.flink.statefun.flink.core.di.ObjectContainer;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
import org.apache.flink.statefun.flink.core.metrics.FlinkFuncionTypeMetricsFactory;
import org.apache.flink.statefun.flink.core.metrics.FlinkFunctionDispatcherMetrics;
import org.apache.flink.statefun.flink.core.metrics.FuncionTypeMetricsFactory;
import org.apache.flink.statefun.flink.core.metrics.FunctionDispatcherMetrics;
import org.apache.flink.statefun.flink.core.metrics.FunctionTypeMetricsRepository;
import org.apache.flink.statefun.flink.core.state.FlinkState;
import org.apache.flink.statefun.flink.core.state.State;
import org.apache.flink.statefun.flink.core.types.DynamicallyRegisteredTypes;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/Reductions.class */
final class Reductions {
    private final LocalFunctionGroup localFunctionGroup;
    private final PendingAsyncOperations pendingAsyncOperations;

    @Inject
    Reductions(PendingAsyncOperations pendingAsyncOperations, LocalFunctionGroup localFunctionGroup) {
        this.localFunctionGroup = (LocalFunctionGroup) Objects.requireNonNull(localFunctionGroup);
        this.pendingAsyncOperations = (PendingAsyncOperations) Objects.requireNonNull(pendingAsyncOperations);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static Reductions create(BackPressureValve backPressureValve, StatefulFunctionsUniverse statefulFunctionsUniverse, RuntimeContext runtimeContext, KeyedStateBackend<Object> keyedStateBackend, TimerServiceFactory timerServiceFactory, InternalListState<String, Long, Message> internalListState, Map<EgressIdentifier<?>, OutputTag<Object>> map, Output<StreamRecord<Message>> output, MessageFactory messageFactory, Executor executor, MetricGroup metricGroup, MapState<Long, Message> mapState) {
        ObjectContainer objectContainer = new ObjectContainer();
        objectContainer.add("function-providers", (Class<? super Class>) Map.class, (Class) statefulFunctionsUniverse.functions());
        objectContainer.add("function-repository", FunctionRepository.class, StatefulFunctionRepository.class);
        objectContainer.addAlias("function-metrics-repository", FunctionTypeMetricsRepository.class, "function-repository", FunctionRepository.class);
        objectContainer.add("runtime-context", (Class<? super Class>) RuntimeContext.class, (Class) runtimeContext);
        objectContainer.add("keyed-state-backend", (Class<? super Class>) KeyedStateBackend.class, (Class) keyedStateBackend);
        objectContainer.add((ObjectContainer) new DynamicallyRegisteredTypes(statefulFunctionsUniverse.types()));
        objectContainer.add("state", State.class, FlinkState.class);
        objectContainer.add((ObjectContainer) messageFactory);
        objectContainer.add((ObjectContainer) new Partition(runtimeContext.getMaxNumberOfParallelSubtasks(), runtimeContext.getNumberOfParallelSubtasks(), runtimeContext.getIndexOfThisSubtask()));
        objectContainer.add((ObjectContainer) new RemoteSink(output));
        objectContainer.add((ObjectContainer) new SideOutputSink(map, output));
        objectContainer.add("applying-context", ApplyingContext.class, ReusableContext.class);
        objectContainer.add(LocalSink.class);
        objectContainer.add("function-loader", FunctionLoader.class, PredefinedFunctionLoader.class);
        objectContainer.add(Reductions.class);
        objectContainer.add(LocalFunctionGroup.class);
        objectContainer.add("function-metrics-factory", (Class<? super Class>) FuncionTypeMetricsFactory.class, (Class) new FlinkFuncionTypeMetricsFactory(metricGroup));
        objectContainer.add("function-dispatcher-metrics", (Class<? super Class>) FunctionDispatcherMetrics.class, (Class) new FlinkFunctionDispatcherMetrics(metricGroup));
        objectContainer.add("delayed-messages-buffer-state", (Class<? super Class>) InternalListState.class, (Class) internalListState);
        objectContainer.add("delayed-messages-buffer", DelayedMessagesBuffer.class, FlinkStateDelayedMessagesBuffer.class);
        objectContainer.add("delayed-messages-timer-service-factory", (Class<? super Class>) TimerServiceFactory.class, (Class) timerServiceFactory);
        objectContainer.add(DelaySink.class);
        objectContainer.add("function-group", new Lazy(LocalFunctionGroup.class));
        objectContainer.add("reductions", new Lazy(Reductions.class));
        objectContainer.add("mailbox-executor", (Class<? super Class>) Executor.class, (Class) executor);
        objectContainer.add("async-operations", (Class<? super Class>) MapState.class, (Class) mapState);
        objectContainer.add(AsyncSink.class);
        objectContainer.add(PendingAsyncOperations.class);
        objectContainer.add("backpressure-valve", (Class<? super Class>) BackPressureValve.class, (Class) backPressureValve);
        return (Reductions) objectContainer.get(Reductions.class);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void apply(Message message) {
        enqueue(message);
        processEnvelopes();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueue(Message message) {
        this.localFunctionGroup.enqueue(message);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void enqueueAsyncOperationAfterRestore(Long l, Message message) {
        enqueue(new AsyncMessageDecorator(this.pendingAsyncOperations, l, message));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void processEnvelopes() {
        do {
        } while (this.localFunctionGroup.processNextEnvelope());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void snapshotAsyncOperations() {
        this.pendingAsyncOperations.flush();
    }
}
