/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.accumulators.Accumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackendFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorChain;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.streaming.runtime.tasks.StreamTaskStateList;
import org.apache.flink.streaming.runtime.tasks.TimerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamTask<OUT, Operator extends StreamOperator<OUT>>
extends AbstractInvokable
implements StatefulTask<StreamTaskStateList> {
    public static final ThreadGroup TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
    protected static final Logger LOG = LoggerFactory.getLogger(StreamTask.class);
    private final Object lock = new Object();
    protected Operator headOperator;
    private OperatorChain<OUT> operatorChain;
    private StreamConfig configuration;
    private ClassLoader userClassLoader;
    private StateBackend<?> stateBackend;
    private ScheduledExecutorService timerService;
    private Map<String, Accumulator<?, ?>> accumulatorMap;
    private StreamTaskStateList lazyRestoreState;
    private volatile TimerException timerException;
    private volatile boolean isRunning;

    protected abstract void init() throws Exception;

    protected abstract void run() throws Exception;

    protected abstract void cleanup() throws Exception;

    protected abstract void cancelTask() throws Exception;

    public final void registerInputOutput() throws Exception {
        LOG.debug("registerInputOutput for {}", (Object)this.getName());
        boolean initializationCompleted = false;
        try {
            AccumulatorRegistry accumulatorRegistry = this.getEnvironment().getAccumulatorRegistry();
            this.userClassLoader = this.getUserCodeClassLoader();
            this.configuration = new StreamConfig(this.getTaskConfiguration());
            this.accumulatorMap = accumulatorRegistry.getUserMap();
            this.stateBackend = this.createStateBackend();
            this.stateBackend.initializeForJob(this.getEnvironment().getJobID());
            this.headOperator = (StreamOperator)this.configuration.getStreamOperator(this.userClassLoader);
            this.operatorChain = new OperatorChain(this, this.headOperator, accumulatorRegistry.getReadWriteReporter());
            if (this.headOperator != null) {
                this.headOperator.setup(this, this.configuration, this.operatorChain.getChainEntryPoint());
            }
            this.timerService = Executors.newSingleThreadScheduledExecutor((ThreadFactory)new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + this.getName()));
            this.init();
            initializationCompleted = true;
        }
        finally {
            if (!initializationCompleted) {
                if (this.timerService != null) {
                    this.timerService.shutdownNow();
                }
                if (this.operatorChain != null) {
                    this.operatorChain.releaseOutputs();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void invoke() throws Exception {
        LOG.debug("Invoking {}", (Object)this.getName());
        boolean disposed = false;
        try {
            this.restoreStateLazy();
            Object object = this.lock;
            synchronized (object) {
                this.openAllOperators();
            }
            this.isRunning = true;
            this.run();
            this.isRunning = false;
            if (LOG.isDebugEnabled()) {
                LOG.debug("Finished task {}", (Object)this.getName());
            }
            object = this.lock;
            synchronized (object) {
                this.closeAllOperators();
            }
            this.operatorChain.flushOutputs();
            this.tryDisposeAllOperators();
            disposed = true;
        }
        finally {
            this.isRunning = false;
            this.timerService.shutdownNow();
            if (this.operatorChain != null) {
                this.operatorChain.releaseOutputs();
            }
            try {
                this.cleanup();
            }
            catch (Throwable t) {
                LOG.error("Error during cleanup of stream task.");
            }
            if (!disposed) {
                this.disposeAllOperators();
            }
            try {
                if (this.stateBackend != null) {
                    this.stateBackend.close();
                }
            }
            catch (Throwable t) {
                LOG.error("Error while closing the state backend", t);
            }
        }
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.cancelTask();
    }

    private void openAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.open();
        }
    }

    private void closeAllOperators() throws Exception {
        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
        for (int i = allOperators.length - 1; i >= 0; --i) {
            StreamOperator<?> operator = allOperators[i];
            if (operator == null) continue;
            operator.close();
        }
    }

    private void tryDisposeAllOperators() throws Exception {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            if (operator == null) continue;
            operator.dispose();
        }
    }

    private void disposeAllOperators() {
        for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
            try {
                if (operator == null) continue;
                operator.dispose();
            }
            catch (Throwable t) {
                LOG.error("Error during disposal of stream operator.", t);
            }
        }
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (this.timerService != null) {
            if (!this.timerService.isTerminated()) {
                LOG.warn("Timer service was not shut down. Shutting down in finalize().");
            }
            this.timerService.shutdown();
        }
    }

    public String getName() {
        return this.getEnvironment().getTaskNameWithSubtasks();
    }

    public Object getCheckpointLock() {
        return this.lock;
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public Map<String, Accumulator<?, ?>> getAccumulatorMap() {
        return this.accumulatorMap;
    }

    public Output<StreamRecord<OUT>> getHeadOutput() {
        return this.operatorChain.getChainEntryPoint();
    }

    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public void setInitialState(StreamTaskStateList initialState) {
        this.lazyRestoreState = initialState;
    }

    public void restoreStateLazy() throws Exception {
        if (this.lazyRestoreState != null) {
            LOG.info("Restoring checkpointed state to task {}", (Object)this.getName());
            try {
                StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
                StreamTaskState[] states = this.lazyRestoreState.getState(this.userClassLoader);
                this.lazyRestoreState = null;
                for (int i = 0; i < states.length; ++i) {
                    StreamTaskState state = states[i];
                    StreamOperator<?> operator = allOperators[i];
                    if (state != null && operator != null) {
                        LOG.debug("Task {} in chain ({}) has checkpointed state", (Object)i, (Object)this.getName());
                        operator.restoreState(state);
                        continue;
                    }
                    if (operator == null) continue;
                    LOG.debug("Task {} in chain ({}) does not have checkpointed state", (Object)i, (Object)this.getName());
                }
            }
            catch (Exception e) {
                throw new Exception("Could not restore checkpointed state to operators and functions", e);
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception {
        LOG.debug("Starting checkpoint {} on task {}", (Object)checkpointId, (Object)this.getName());
        Object object = this.lock;
        synchronized (object) {
            block9: {
                if (this.isRunning) {
                    this.operatorChain.broadcastCheckpointBarrier(checkpointId, timestamp);
                    try {
                        StreamOperator<?>[] allOperators = this.operatorChain.getAllOperators();
                        StreamTaskState[] states = new StreamTaskState[allOperators.length];
                        for (int i = 0; i < states.length; ++i) {
                            StreamOperator<?> operator = allOperators[i];
                            if (operator == null) continue;
                            StreamTaskState state = operator.snapshotOperatorState(checkpointId, timestamp);
                            states[i] = state.isEmpty() ? null : state;
                        }
                        StreamTaskStateList allStates = new StreamTaskStateList(states);
                        if (allStates.isEmpty()) {
                            this.getEnvironment().acknowledgeCheckpoint(checkpointId);
                        } else {
                            this.getEnvironment().acknowledgeCheckpoint(checkpointId, (StateHandle)allStates);
                        }
                    }
                    catch (Exception e) {
                        if (!this.isRunning) break block9;
                        throw e;
                    }
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Object object = this.lock;
        synchronized (object) {
            if (this.isRunning) {
                LOG.debug("Notification of complete checkpoint for task {}", (Object)this.getName());
                for (StreamOperator<?> operator : this.operatorChain.getAllOperators()) {
                    if (operator == null) continue;
                    operator.notifyOfCompletedCheckpoint(checkpointId);
                }
            } else {
                LOG.debug("Ignoring notification of complete checkpoint for not-running task {}", (Object)this.getName());
            }
        }
    }

    public StateBackend<?> getStateBackend() {
        return this.stateBackend;
    }

    private StateBackend<?> createStateBackend() throws Exception {
        StateBackend<?> configuredBackend = this.configuration.getStateBackend(this.userClassLoader);
        if (configuredBackend != null) {
            LOG.info("Using user-defined state backend: " + configuredBackend);
            return configuredBackend;
        }
        Configuration flinkConfig = this.getEnvironment().getTaskManagerInfo().getConfiguration();
        String backendName = flinkConfig.getString("state.backend", null);
        if (backendName == null) {
            LOG.warn("No state backend has been specified, using default state backend (Memory / JobManager)");
            backendName = "jobmanager";
        }
        switch (backendName = backendName.toLowerCase()) {
            case "jobmanager": {
                LOG.info("State backend is set to heap memory (checkpoint to jobmanager)");
                return MemoryStateBackend.defaultInstance();
            }
            case "filesystem": {
                FsStateBackend backend = new FsStateBackendFactory().createFromConfig(flinkConfig);
                LOG.info("State backend is set to heap memory (checkpoints to filesystem \"" + backend.getBasePath() + "\")");
                return backend;
            }
        }
        try {
            Class<StateBackendFactory> clazz = Class.forName(backendName, false, this.userClassLoader).asSubclass(StateBackendFactory.class);
            return (StateBackend)clazz.newInstance();
        }
        catch (ClassNotFoundException e) {
            throw new IllegalConfigurationException("Cannot find configured state backend: " + backendName);
        }
        catch (ClassCastException e) {
            throw new IllegalConfigurationException("The class configured under 'state.backend' is not a valid state backend factory (" + backendName + ')');
        }
        catch (Throwable t) {
            throw new IllegalConfigurationException("Cannot create configured state backend", t);
        }
    }

    public void registerTimer(long timestamp, Triggerable target) {
        long delay = Math.max(timestamp - System.currentTimeMillis(), 0L);
        this.timerService.schedule(new TriggerTask(this, this.lock, target, timestamp), delay, TimeUnit.MILLISECONDS);
    }

    public void checkTimerException() throws TimerException {
        if (this.timerException != null) {
            throw this.timerException;
        }
    }

    public String toString() {
        return this.getName();
    }

    protected final EventListener<CheckpointBarrier> getCheckpointBarrierListener() {
        return new EventListener<CheckpointBarrier>(){

            public void onEvent(CheckpointBarrier barrier) {
                try {
                    StreamTask.this.triggerCheckpoint(barrier.getId(), barrier.getTimestamp());
                }
                catch (Exception e) {
                    throw new RuntimeException("Error triggering a checkpoint as the result of receiving checkpoint barrier", e);
                }
            }
        };
    }

    private static final class TriggerTask
    implements Runnable {
        private final Object lock;
        private final Triggerable target;
        private final long timestamp;
        private final StreamTask<?, ?> task;

        TriggerTask(StreamTask<?, ?> task, Object lock, Triggerable target, long timestamp) {
            this.task = task;
            this.lock = lock;
            this.target = target;
            this.timestamp = timestamp;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void run() {
            Object object = this.lock;
            synchronized (object) {
                block5: {
                    try {
                        this.target.trigger(this.timestamp);
                    }
                    catch (Throwable t) {
                        LOG.error("Caught exception while processing timer.", t);
                        if (((StreamTask)this.task).timerException != null) break block5;
                        ((StreamTask)this.task).timerException = new TimerException(t);
                    }
                }
            }
        }
    }
}

