package org.apache.flink.statefun.flink.core.functions;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.statefun.flink.core.StatefulFunctionsJobConstants;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverse;
import org.apache.flink.statefun.flink.core.StatefulFunctionsUniverses;
import org.apache.flink.statefun.flink.core.backpressure.BackPressureValve;
import org.apache.flink.statefun.flink.core.backpressure.ThresholdBackPressureValve;
import org.apache.flink.statefun.flink.core.common.MailboxExecutorFacade;
import org.apache.flink.statefun.flink.core.common.ManagingResources;
import org.apache.flink.statefun.flink.core.message.Message;
import org.apache.flink.statefun.flink.core.message.MessageFactory;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.util.OutputTag;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/functions/FunctionGroupOperator.class */
public class FunctionGroupOperator extends AbstractStreamOperator<Message> implements OneInputStreamOperator<Message, Message> {
    private static final long serialVersionUID = 1;
    private final Map<EgressIdentifier<?>, OutputTag<Object>> sideOutputs;
    private final StatefulFunctionsConfig configuration;
    private transient Reductions reductions;
    private transient MailboxExecutor mailboxExecutor;
    private transient BackPressureValve backPressureValve;
    private transient List<ManagingResources> managingResources;

    /* JADX INFO: Access modifiers changed from: package-private */
    public FunctionGroupOperator(Map<EgressIdentifier<?>, OutputTag<Object>> map, StatefulFunctionsConfig statefulFunctionsConfig, MailboxExecutor mailboxExecutor, ChainingStrategy chainingStrategy, ProcessingTimeService processingTimeService) {
        this.sideOutputs = (Map) Objects.requireNonNull(map);
        this.configuration = (StatefulFunctionsConfig) Objects.requireNonNull(statefulFunctionsConfig);
        this.mailboxExecutor = (MailboxExecutor) Objects.requireNonNull(mailboxExecutor);
        this.chainingStrategy = chainingStrategy;
        this.processingTimeService = processingTimeService;
    }

    public void processElement(StreamRecord<Message> streamRecord) throws InterruptedException {
        while (this.backPressureValve.shouldBackPressure()) {
            this.mailboxExecutor.yield();
        }
        this.reductions.apply((Message) streamRecord.getValue());
    }

    public void open() throws Exception {
        super.open();
        StatefulFunctionsUniverse statefulFunctionsUniverse = statefulFunctionsUniverse(this.configuration);
        TypeSerializer typeSerializerIn = getOperatorConfig().getTypeSerializerIn(0, getContainingTask().getUserCodeClassLoader());
        MapStateDescriptor mapStateDescriptor = new MapStateDescriptor("asyncOperations", LongSerializer.INSTANCE, typeSerializerIn.duplicate());
        ListStateDescriptor<Message> listStateDescriptor = new ListStateDescriptor<>("delayed-messages-buffer", typeSerializerIn.duplicate());
        MapState mapState = getRuntimeContext().getMapState(new MapStateDescriptor("delayed-message-index", String.class, Long.class));
        MapState mapState2 = getRuntimeContext().getMapState(mapStateDescriptor);
        Objects.requireNonNull(this.mailboxExecutor, "MailboxExecutor is unexpectedly NULL");
        this.backPressureValve = new ThresholdBackPressureValve(this.configuration.getMaxAsyncOperationsPerTask());
        this.managingResources = resourceManagingFunctionProviders(statefulFunctionsUniverse.functions());
        this.reductions = Reductions.create(this.backPressureValve, statefulFunctionsUniverse, getRuntimeContext(), getKeyedStateBackend(), new FlinkTimerServiceFactory((InternalTimeServiceManager) super.getTimeServiceManager().orElseThrow(IllegalStateException::new)), delayedMessagesBufferState(listStateDescriptor), mapState, this.sideOutputs, this.output, MessageFactory.forKey(statefulFunctionsUniverse.messageFactoryKey()), new MailboxExecutorFacade(this.mailboxExecutor, "Stateful Functions Mailbox"), getRuntimeContext().getMetricGroup().addGroup(StatefulFunctionsJobConstants.FUNCTION_OPERATOR_NAME), mapState2);
        AsyncOperationFailureNotifier.fireExpiredAsyncOperations(mapStateDescriptor, this.reductions, getKeyedStateBackend());
    }

    public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
        super.snapshotState(stateSnapshotContext);
        this.reductions.snapshotAsyncOperations();
    }

    public void close() throws Exception {
        try {
            closeOrDispose();
        } finally {
            super.close();
        }
    }

    private void closeOrDispose() {
        List<ManagingResources> list = this.managingResources;
        if (list == null) {
            return;
        }
        Iterator<ManagingResources> it2 = list.iterator();
        while (it2.hasNext()) {
            try {
                it2.next().shutdown();
            } catch (Throwable th) {
                LOG.warn("Exception caught during close. It would be silently ignored.", th);
            }
        }
    }

    private InternalListState<String, Long, Message> delayedMessagesBufferState(ListStateDescriptor<Message> listStateDescriptor) {
        try {
            return getKeyedStateBackend().getOrCreateKeyedState(LongSerializer.INSTANCE, listStateDescriptor);
        } catch (Exception e) {
            throw new RuntimeException("Error registered Flink state for delayed messages buffer.", e);
        }
    }

    private StatefulFunctionsUniverse statefulFunctionsUniverse(StatefulFunctionsConfig statefulFunctionsConfig) {
        return StatefulFunctionsUniverses.get(Thread.currentThread().getContextClassLoader(), statefulFunctionsConfig);
    }

    private static List<ManagingResources> resourceManagingFunctionProviders(Map<FunctionType, StatefulFunctionProvider> map) {
        return (List) map.values().stream().filter(statefulFunctionProvider -> {
            return statefulFunctionProvider instanceof ManagingResources;
        }).map(statefulFunctionProvider2 -> {
            return (ManagingResources) statefulFunctionProvider2;
        }).collect(Collectors.toList());
    }
}
