/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Serializable;
import java.util.HashMap;
import java.util.concurrent.ScheduledFuture;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
Serializable {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
    private transient StreamTask<?, ?> container;
    private transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    private transient StreamingRuntimeContext runtimeContext;
    private transient KeySelector<?, ?> stateKeySelector1;
    private transient KeySelector<?, ?> stateKeySelector2;
    private AbstractStateBackend stateBackend = null;
    protected MetricGroup metrics;

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        this.container = containingTask;
        this.config = config;
        String operatorName = containingTask.getEnvironment().getTaskInfo().getTaskName().split("->")[config.getChainIndex()].trim();
        this.metrics = this.container.getEnvironment().getMetricGroup().addOperator(operatorName);
        this.output = new CountingOutput(output, this.metrics.counter("numRecordsOut"));
        this.runtimeContext = new StreamingRuntimeContext(this, this.container.getEnvironment(), this.container.getAccumulatorMap());
        this.stateKeySelector1 = config.getStatePartitioner(0, this.getUserCodeClassloader());
        this.stateKeySelector2 = config.getStatePartitioner(1, this.getUserCodeClassloader());
        try {
            TypeSerializer keySerializer = config.getStateKeySerializer(this.getUserCodeClassloader());
            String operatorIdentifier = this.getClass().getSimpleName() + "_" + config.getVertexID() + "_" + this.runtimeContext.getIndexOfThisSubtask();
            this.stateBackend = this.container.createStateBackend(operatorIdentifier, keySerializer);
        }
        catch (Exception e) {
            throw new RuntimeException("Could not initialize state backend. ", e);
        }
    }

    @Override
    public MetricGroup getMetricGroup() {
        return this.metrics;
    }

    @Override
    public void open() throws Exception {
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public void dispose() {
        if (this.stateBackend != null) {
            try {
                this.stateBackend.close();
                this.stateBackend.dispose();
            }
            catch (Exception e) {
                throw new RuntimeException("Error while closing/disposing state backend.", e);
            }
        }
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        HashMap partitionedSnapshots;
        StreamTaskState state = new StreamTaskState();
        if (this.stateBackend != null && (partitionedSnapshots = this.stateBackend.snapshotPartitionedState(checkpointId, timestamp)) != null) {
            state.setKvStates(partitionedSnapshots);
        }
        return state;
    }

    @Override
    public void restoreState(StreamTaskState state) throws Exception {
        if (this.stateBackend != null) {
            this.stateBackend.injectKeyValueStateSnapshots(state.getKvStates());
        }
    }

    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
        if (this.stateBackend != null) {
            this.stateBackend.notifyOfCompletedCheckpoint(checkpointId);
        }
    }

    public ExecutionConfig getExecutionConfig() {
        return this.container.getExecutionConfig();
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public StreamTask<?, ?> getContainingTask() {
        return this.container;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.container.getUserCodeClassLoader();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public AbstractStateBackend getStateBackend() {
        return this.stateBackend;
    }

    protected ScheduledFuture<?> registerTimer(long time, Triggerable target) {
        return this.container.registerTimer(time, target);
    }

    protected long getCurrentProcessingTime() {
        return this.container.getCurrentProcessingTime();
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S)this.getStateBackend().getPartitionedState(null, (TypeSerializer)VoidSerializer.INSTANCE, stateDescriptor);
    }

    protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return (S)this.getStateBackend().getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
    }

    @Override
    public void setKeyContextElement1(StreamRecord record) throws Exception {
        if (this.stateKeySelector1 != null) {
            Object key = this.stateKeySelector1.getKey(record.getValue());
            this.getStateBackend().setCurrentKey(key);
        }
    }

    @Override
    public void setKeyContextElement2(StreamRecord record) throws Exception {
        if (this.stateKeySelector2 != null) {
            Object key = this.stateKeySelector2.getKey(record.getValue());
            this.getStateBackend().setCurrentKey(key);
        }
    }

    public void setKeyContext(Object key) {
        if (this.stateKeySelector1 != null) {
            this.stateBackend.setCurrentKey(key);
        }
    }

    @Override
    public final void setChainingStrategy(ChainingStrategy strategy) {
        this.chainingStrategy = strategy;
    }

    @Override
    public final ChainingStrategy getChainingStrategy() {
        return this.chainingStrategy;
    }

    public class CountingOutput
    implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }

        public void collect(StreamRecord<OUT> record) {
            this.numRecordsOut.inc();
            this.output.collect(record);
        }

        public void close() {
            this.output.close();
        }
    }
}

