package org.apache.flink.statefun.flink.core.feedback;

import java.util.Objects;
import java.util.function.LongFunction;
import org.apache.flink.metrics.MeterView;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.IOUtils;

/* loaded from: input_file:org/apache/flink/statefun/flink/core/feedback/FeedbackSinkOperator.class */
public final class FeedbackSinkOperator<V> extends AbstractStreamOperator<Void> implements OneInputStreamOperator<V, Void> {
    private static final long serialVersionUID = 1;
    private final FeedbackKey<V> key;
    private final LongFunction<V> barrierSentinelSupplier;
    private transient FeedbackChannel<V> channel;
    private transient SimpleCounter totalProduced;

    public FeedbackSinkOperator(FeedbackKey<V> feedbackKey, LongFunction<V> longFunction) {
        this.key = (FeedbackKey) Objects.requireNonNull(feedbackKey);
        this.barrierSentinelSupplier = (LongFunction) Objects.requireNonNull(longFunction);
    }

    public void processElement(StreamRecord<V> streamRecord) {
        this.channel.put(streamRecord.getValue());
        this.totalProduced.inc();
    }

    public void open() throws Exception {
        super.open();
        this.channel = FeedbackChannelBroker.get().getChannel(this.key.withSubTaskIndex(getRuntimeContext().getIndexOfThisSubtask()));
        MetricGroup metricGroup = getRuntimeContext().getMetricGroup();
        SimpleCounter counter = metricGroup.counter("produced", new SimpleCounter());
        metricGroup.meter("producedRate", new MeterView(counter, 60));
        this.totalProduced = counter;
    }

    public void prepareSnapshotPreBarrier(long j) throws Exception {
        super.prepareSnapshotPreBarrier(j);
        this.channel.put(this.barrierSentinelSupplier.apply(j));
    }

    public void close() throws Exception {
        IOUtils.closeQuietly(this.channel);
        super.close();
    }

    public void dispose() throws Exception {
        IOUtils.closeQuietly(this.channel);
        super.dispose();
    }
}
