package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.functors.NotNullPredicate;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator;
import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier;
import org.apache.flink.runtime.state.FileStateHandle;
import org.apache.flink.runtime.state.LocalStateHandle;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.StateHandleProvider;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.StatefulStreamOperator;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointCommittingOperator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    protected StreamConfig configuration;
    protected OutputHandler<OUT> outputHandler;
    protected boolean hasChainedOperators;
    protected ClassLoader userClassLoader;
    private StateHandleProvider<Serializable> stateHandleProvider;
    protected final Object checkpointLock = new Object();
    protected volatile boolean isRunning = false;
    protected O streamOperator = null;
    private EventListener<TaskEvent> superstepListener = new SuperstepEventListener();
    protected List<StreamingRuntimeContext> contexts = new ArrayList();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$StateBackend.class */
    public enum StateBackend {
        JOBMANAGER,
        FILESYSTEM
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$SuperstepEventListener.class */
    private class SuperstepEventListener implements EventListener<TaskEvent> {
        private SuperstepEventListener() {
        }

        @Override // org.apache.flink.runtime.util.event.EventListener
        public void onEvent(TaskEvent taskEvent) {
            try {
                StreamingSuperstep streamingSuperstep = (StreamingSuperstep) taskEvent;
                StreamTask.this.triggerCheckpoint(streamingSuperstep.getId(), streamingSuperstep.getTimestamp());
            } catch (Exception e) {
                throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
            }
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void registerInputOutput() {
        this.userClassLoader = getUserCodeClassLoader();
        this.configuration = new StreamConfig(getTaskConfiguration());
        this.stateHandleProvider = getStateHandleProvider();
        this.outputHandler = new OutputHandler<>(this);
        this.streamOperator = (O) this.configuration.getStreamOperator(this.userClassLoader);
        if (this.streamOperator != null) {
            StreamingRuntimeContext createRuntimeContext = createRuntimeContext(this.configuration);
            this.contexts.add(createRuntimeContext);
            this.streamOperator.setup(this.outputHandler.getOutput(), createRuntimeContext);
        }
        this.hasChainedOperators = !this.outputHandler.getChainedOperators().isEmpty();
    }

    public String getName() {
        return getEnvironment().getTaskName();
    }

    public StreamingRuntimeContext createRuntimeContext(StreamConfig streamConfig) {
        return new StreamingRuntimeContext(streamConfig.getStreamOperator(this.userClassLoader).getClass().getSimpleName(), getEnvironment(), getUserCodeClassLoader(), getExecutionConfig());
    }

    private StateHandleProvider<Serializable> getStateHandleProvider() {
        StateHandleProvider<Serializable> stateHandleProvider = this.configuration.getStateHandleProvider(this.userClassLoader);
        if (stateHandleProvider != null) {
            LOG.info("Using user defined state backend for streaming checkpoitns.");
            return stateHandleProvider;
        }
        String upperCase = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND, ConfigConstants.DEFAULT_STATE_BACKEND).toUpperCase();
        try {
            StateBackend valueOf = StateBackend.valueOf(upperCase);
            switch (valueOf) {
                case JOBMANAGER:
                    LOG.info("State backend for state checkpoints is set to jobmanager.");
                    return LocalStateHandle.createProvider();
                case FILESYSTEM:
                    String string = GlobalConfiguration.getString(ConfigConstants.STATE_BACKEND_FS_DIR, null);
                    if (string == null) {
                        throw new RuntimeException("For filesystem checkpointing, a checkpoint directory needs to be specified.\nFor example: \"state.backend.dir: hdfs://checkpoints\"");
                    }
                    LOG.info("State backend for state checkpoints is set to filesystem with directory: " + string);
                    return FileStateHandle.createProvider(string);
                default:
                    throw new RuntimeException("Backend " + valueOf + " is not supported yet.");
            }
        } catch (Exception e) {
            throw new RuntimeException(upperCase + " is not a valid state backend.\nSupported backends: jobmanager, filesystem.");
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void openOperator() throws Exception {
        this.streamOperator.open(getTaskConfiguration());
        Iterator<OneInputStreamOperator<?, ?>> it = this.outputHandler.chainedOperators.iterator();
        while (it.hasNext()) {
            it.next().open(getTaskConfiguration());
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void closeOperator() throws Exception {
        this.streamOperator.close();
        for (int size = this.outputHandler.chainedOperators.size() - 1; size >= 0; size--) {
            this.outputHandler.chainedOperators.get(size).close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void clearBuffers() throws IOException {
        if (this.outputHandler != null) {
            this.outputHandler.clearWriters();
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable
    public void cancel() {
        this.isRunning = false;
    }

    public EventListener<TaskEvent> getSuperstepListener() {
        return this.superstepListener;
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier
    public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {
        Serializable state = stateHandle.getState();
        if (!this.hasChainedOperators) {
            if (this.streamOperator instanceof StatefulStreamOperator) {
                ((StatefulStreamOperator) this.streamOperator).restoreInitialState(state);
                return;
            }
            return;
        }
        List list = (List) state;
        Serializable serializable = (Serializable) list.get(0);
        if (serializable != null && (this.streamOperator instanceof StatefulStreamOperator)) {
            ((StatefulStreamOperator) this.streamOperator).restoreInitialState(serializable);
        }
        for (int i = 1; i < list.size(); i++) {
            Serializable serializable2 = (Serializable) list.get(i);
            if (serializable2 != null) {
                OneInputStreamOperator<?, ?> oneInputStreamOperator = this.outputHandler.getChainedOperators().get(i - 1);
                if (oneInputStreamOperator instanceof StatefulStreamOperator) {
                    ((StatefulStreamOperator) oneInputStreamOperator).restoreInitialState(serializable2);
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator
    public void triggerCheckpoint(long j, long j2) throws Exception {
        synchronized (this.checkpointLock) {
            if (this.isRunning) {
                try {
                    LOG.debug("Starting checkpoint {} on task {}", Long.valueOf(j), getName());
                    try {
                        Serializable stateSnapshotFromFunction = this.streamOperator instanceof StatefulStreamOperator ? ((StatefulStreamOperator) this.streamOperator).getStateSnapshotFromFunction(j, j2) : null;
                        if (this.hasChainedOperators) {
                            ArrayList arrayList = new ArrayList();
                            arrayList.add(stateSnapshotFromFunction);
                            for (OneInputStreamOperator<?, ?> oneInputStreamOperator : this.outputHandler.getChainedOperators()) {
                                if (oneInputStreamOperator instanceof StatefulStreamOperator) {
                                    arrayList.add(((StatefulStreamOperator) oneInputStreamOperator).getStateSnapshotFromFunction(j, j2));
                                }
                            }
                            stateSnapshotFromFunction = CollectionUtils.exists(arrayList, NotNullPredicate.INSTANCE) ? arrayList : null;
                        }
                        StateHandle<Serializable> createStateHandle = stateSnapshotFromFunction == null ? null : this.stateHandleProvider.createStateHandle(stateSnapshotFromFunction);
                        this.outputHandler.broadcastBarrier(j, j2);
                        if (createStateHandle == null) {
                            getEnvironment().acknowledgeCheckpoint(j);
                        } else {
                            getEnvironment().acknowledgeCheckpoint(j, createStateHandle);
                        }
                    } catch (Exception e) {
                        throw new Exception("Error while drawing snapshot of the user state.", e);
                    }
                } catch (Exception e2) {
                    if (this.isRunning) {
                        throw e2;
                    }
                }
            }
        }
    }

    @Override // org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator
    public void confirmCheckpoint(long j, long j2) throws Exception {
        synchronized (this.checkpointLock) {
            if (this.streamOperator instanceof StatefulStreamOperator) {
                ((StatefulStreamOperator) this.streamOperator).confirmCheckpointCompleted(j, j2);
            }
            if (this.hasChainedOperators) {
                for (OneInputStreamOperator<?, ?> oneInputStreamOperator : this.outputHandler.getChainedOperators()) {
                    if (oneInputStreamOperator instanceof StatefulStreamOperator) {
                        ((StatefulStreamOperator) oneInputStreamOperator).confirmCheckpointCompleted(j, j2);
                    }
                }
            }
        }
    }

    public String toString() {
        return getEnvironment().getTaskNameWithSubtasks();
    }
}
