package org.apache.flink.streaming.runtime.tasks;

import java.io.IOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.io.InputStatus;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateReader;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.MultipleRecordWriters;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterBuilder;
import org.apache.flink.runtime.io.network.api.writer.RecordWriterDelegate;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.io.network.api.writer.SingleRecordWriter;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.coordination.OperatorEvent;
import org.apache.flink.runtime.plugable.SerializationDelegate;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.operators.MailboxExecutor;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializerImpl;
import org.apache.flink.streaming.runtime.io.RecordWriterOutput;
import org.apache.flink.streaming.runtime.io.StreamInputProcessor;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.streamstatus.StreamStatusMaintainer;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxExecutorFactory;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailbox;
import org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.function.RunnableWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask.class */
public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> extends AbstractInvokable implements AsyncExceptionHandler {
    public static final ThreadGroup TRIGGER_THREAD_GROUP;
    protected static final Logger LOG;
    private final StreamTaskActionExecutor actionExecutor;

    @Nullable
    protected StreamInputProcessor inputProcessor;
    protected OP headOperator;
    protected OperatorChain<OUT, OP> operatorChain;
    protected final StreamConfig configuration;
    protected final StateBackend stateBackend;
    private final SubtaskCheckpointCoordinator subtaskCheckpointCoordinator;
    protected final TimerService timerService;
    private final CloseableRegistry cancelables;
    private final StreamTaskAsyncExceptionHandler asyncExceptionHandler;
    private volatile boolean isRunning;
    private volatile boolean canceled;
    private volatile boolean failing;
    private boolean disposedOperators;
    private final ExecutorService asyncOperationsThreadPool;
    private final RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> recordWriter;
    protected final MailboxProcessor mailboxProcessor;
    final MailboxExecutor mainMailboxExecutor;
    private final ExecutorService channelIOExecutor;
    private Long syncSavepointId;
    private Long activeSyncSavepointId;
    private long latestAsyncCheckpointStartDelayNanos;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/StreamTask$StreamTaskAsyncExceptionHandler.class */
    public static class StreamTaskAsyncExceptionHandler {
        private final Environment environment;

        StreamTaskAsyncExceptionHandler(Environment environment) {
            this.environment = environment;
        }

        void handleAsyncException(String str, Throwable th) {
            this.environment.failExternally(new AsynchronousException(str, th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment) throws Exception {
        this(environment, null);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment, @Nullable TimerService timerService) throws Exception {
        this(environment, timerService, FatalExitExceptionHandler.INSTANCE);
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, StreamTaskActionExecutor.IMMEDIATE);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor streamTaskActionExecutor) throws Exception {
        this(environment, timerService, uncaughtExceptionHandler, streamTaskActionExecutor, new TaskMailboxImpl(Thread.currentThread()));
    }

    protected StreamTask(Environment environment, @Nullable TimerService timerService, Thread.UncaughtExceptionHandler uncaughtExceptionHandler, StreamTaskActionExecutor streamTaskActionExecutor, TaskMailbox taskMailbox) throws Exception {
        super(environment);
        this.cancelables = new CloseableRegistry();
        this.syncSavepointId = null;
        this.activeSyncSavepointId = null;
        this.configuration = new StreamConfig(getTaskConfiguration());
        this.recordWriter = createRecordWriterDelegate(this.configuration, environment);
        this.actionExecutor = (StreamTaskActionExecutor) Preconditions.checkNotNull(streamTaskActionExecutor);
        this.mailboxProcessor = new MailboxProcessor(this::processInput, taskMailbox, streamTaskActionExecutor);
        this.mailboxProcessor.initMetric(environment.getMetricGroup());
        this.mainMailboxExecutor = this.mailboxProcessor.getMainMailboxExecutor();
        this.asyncExceptionHandler = new StreamTaskAsyncExceptionHandler(environment);
        this.asyncOperationsThreadPool = Executors.newCachedThreadPool(new ExecutorThreadFactory("AsyncOperations", uncaughtExceptionHandler));
        this.stateBackend = createStateBackend();
        this.subtaskCheckpointCoordinator = new SubtaskCheckpointCoordinatorImpl(this.stateBackend.createCheckpointStorage(getEnvironment().getJobID()), getName(), streamTaskActionExecutor, getCancelables(), getAsyncOperationsThreadPool(), getEnvironment(), this, this.configuration.isUnalignedCheckpointsEnabled(), (v1, v2) -> {
            return prepareInputSnapshot(v1, v2);
        });
        if (timerService == null) {
            this.timerService = new SystemProcessingTimeService(this::handleTimerException, new DispatcherThreadFactory(TRIGGER_THREAD_GROUP, "Time Trigger for " + getName()));
        } else {
            this.timerService = timerService;
        }
        this.channelIOExecutor = Executors.newSingleThreadExecutor(new ExecutorThreadFactory("channel-state-unspilling"));
    }

    private CompletableFuture<Void> prepareInputSnapshot(ChannelStateWriter channelStateWriter, long j) throws IOException {
        return this.inputProcessor == null ? FutureUtils.completedVoidFuture() : this.inputProcessor.prepareSnapshot(channelStateWriter, j);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SubtaskCheckpointCoordinator getCheckpointCoordinator() {
        return this.subtaskCheckpointCoordinator;
    }

    protected abstract void init() throws Exception;

    protected void cancelTask() throws Exception {
    }

    protected void cleanup() throws Exception {
        if (this.inputProcessor != null) {
            this.inputProcessor.close();
        }
    }

    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        InputStatus processInput = this.inputProcessor.processInput();
        if (processInput == InputStatus.MORE_AVAILABLE && this.recordWriter.isAvailable()) {
            return;
        }
        if (processInput == InputStatus.END_OF_INPUT) {
            controller.allActionsCompleted();
            return;
        }
        CompletableFuture<?> inputOutputJointFuture = getInputOutputJointFuture(processInput);
        MailboxDefaultAction.Suspension suspendDefaultAction = controller.suspendDefaultAction();
        suspendDefaultAction.getClass();
        inputOutputJointFuture.thenRun(suspendDefaultAction::resume);
    }

    @VisibleForTesting
    CompletableFuture<?> getInputOutputJointFuture(InputStatus inputStatus) {
        return (inputStatus != InputStatus.NOTHING_AVAILABLE || this.recordWriter.isAvailable()) ? inputStatus == InputStatus.NOTHING_AVAILABLE ? this.inputProcessor.getAvailableFuture() : this.recordWriter.getAvailableFuture() : CompletableFuture.allOf(this.inputProcessor.getAvailableFuture(), this.recordWriter.getAvailableFuture());
    }

    private void resetSynchronousSavepointId(long j, boolean z) {
        if (!z && this.activeSyncSavepointId != null && this.activeSyncSavepointId.longValue() == j) {
            this.activeSyncSavepointId = null;
            this.operatorChain.setIgnoreEndOfInput(false);
        }
        this.syncSavepointId = null;
    }

    private void setSynchronousSavepointId(long j, boolean z) {
        Preconditions.checkState(this.syncSavepointId == null, "at most one stop-with-savepoint checkpoint at a time is allowed");
        this.syncSavepointId = Long.valueOf(j);
        this.activeSyncSavepointId = Long.valueOf(j);
        this.operatorChain.setIgnoreEndOfInput(z);
    }

    @VisibleForTesting
    OptionalLong getSynchronousSavepointId() {
        return this.syncSavepointId != null ? OptionalLong.of(this.syncSavepointId.longValue()) : OptionalLong.empty();
    }

    private boolean isSynchronousSavepointId(long j) {
        return this.syncSavepointId != null && this.syncSavepointId.longValue() == j;
    }

    private void runSynchronousSavepointMailboxLoop() throws Exception {
        if (!$assertionsDisabled && this.syncSavepointId == null) {
            throw new AssertionError();
        }
        MailboxExecutor mailboxExecutor = this.mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY);
        while (!this.canceled && this.syncSavepointId != null) {
            mailboxExecutor.yield();
        }
    }

    protected void advanceToEndOfEventTime() throws Exception {
    }

    protected void finishTask() throws Exception {
    }

    public StreamTaskStateInitializer createStreamTaskStateInitializer() {
        return new StreamTaskStateInitializerImpl(getEnvironment(), this.stateBackend);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Counter setupNumRecordsInCounter(StreamOperator streamOperator) {
        try {
            return streamOperator.getMetricGroup().getIOMetricGroup().getNumRecordsInCounter();
        } catch (Exception e) {
            LOG.warn("An exception occurred during the metrics setup.", e);
            return new SimpleCounter();
        }
    }

    protected void beforeInvoke() throws Exception {
        this.disposedOperators = false;
        LOG.debug("Initializing {}.", getName());
        this.operatorChain = new OperatorChain<>(this, this.recordWriter);
        this.headOperator = this.operatorChain.getHeadOperator();
        init();
        if (this.canceled) {
            throw new CancelTaskException();
        }
        LOG.debug("Invoking {}", getName());
        this.actionExecutor.runThrowing(() -> {
            ChannelStateReader channelStateReader = getEnvironment().getTaskStateManager().getChannelStateReader();
            if (!channelStateReader.hasChannelStates()) {
                this.operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
                requestPartitions();
                return;
            }
            ResultPartitionWriter[] allWriters = getEnvironment().getAllWriters();
            if (allWriters != null) {
                for (ResultPartitionWriter resultPartitionWriter : allWriters) {
                    resultPartitionWriter.readRecoveredState(channelStateReader);
                }
            }
            this.operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());
            InputGate[] allInputGates = getEnvironment().getAllInputGates();
            if (allInputGates == null || allInputGates.length <= 0) {
                return;
            }
            CompletableFuture[] completableFutureArr = new CompletableFuture[allInputGates.length];
            for (int i = 0; i < allInputGates.length; i++) {
                completableFutureArr[i] = allInputGates[i].readRecoveredState(this.channelIOExecutor, channelStateReader);
            }
            CompletableFuture.allOf(completableFutureArr).thenRun(() -> {
                this.mainMailboxExecutor.execute(this::requestPartitions, "Input gates request partitions");
            });
        });
        this.isRunning = true;
    }

    private void requestPartitions() throws IOException {
        InputGate[] allInputGates = getEnvironment().getAllInputGates();
        if (allInputGates != null) {
            for (InputGate inputGate : allInputGates) {
                inputGate.requestPartitions();
            }
        }
    }

    public final void invoke() throws Exception {
        try {
            beforeInvoke();
        } catch (Throwable th) {
            this.failing = !this.canceled;
            try {
                cleanUpInvoke();
            } catch (Throwable th2) {
                ExceptionUtils.rethrowException(ExceptionUtils.firstOrSuppressed(th2, th));
            }
            ExceptionUtils.rethrowException(th);
        }
        if (this.canceled) {
            throw new CancelTaskException();
        }
        runMailboxLoop();
        if (this.canceled) {
            throw new CancelTaskException();
        }
        afterInvoke();
        cleanUpInvoke();
    }

    @VisibleForTesting
    public boolean runMailboxStep() throws Exception {
        return this.mailboxProcessor.runMailboxStep();
    }

    @VisibleForTesting
    public boolean isMailboxLoopRunning() {
        return this.mailboxProcessor.isMailboxLoopRunning();
    }

    private void runMailboxLoop() throws Exception {
        this.mailboxProcessor.runMailboxLoop();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void afterInvoke() throws Exception {
        LOG.debug("Finished task {}", getName());
        getCompletionFuture().exceptionally(th -> {
            return null;
        }).join();
        CompletableFuture completableFuture = new CompletableFuture();
        this.operatorChain.closeOperators(this.actionExecutor);
        this.actionExecutor.runThrowing(() -> {
            FutureUtils.forward(this.timerService.quiesce(), completableFuture);
            this.mailboxProcessor.prepareClose();
            this.isRunning = false;
        });
        this.mailboxProcessor.drain();
        completableFuture.get();
        LOG.debug("Closed operators for task {}", getName());
        this.operatorChain.flushOutputs();
        disposeAllOperators(false);
        this.disposedOperators = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void cleanUpInvoke() throws Exception {
        getCompletionFuture().exceptionally(th -> {
            return null;
        }).join();
        this.isRunning = false;
        setShouldInterruptOnCancel(false);
        Thread.interrupted();
        tryShutdownTimerService();
        try {
            this.cancelables.close();
            shutdownAsyncThreads();
        } catch (Throwable th2) {
            LOG.error("Could not shut down async checkpoint threads", th2);
        }
        try {
            cleanup();
        } catch (Throwable th3) {
            LOG.error("Error during cleanup of stream task", th3);
        }
        disposeAllOperators(true);
        if (this.operatorChain != null) {
            this.actionExecutor.run(() -> {
                this.operatorChain.releaseOutputs();
            });
        } else {
            this.recordWriter.close();
        }
        try {
            this.channelIOExecutor.shutdown();
        } catch (Throwable th4) {
            LOG.error("Error during shutdown the channel state unspill executor", th4);
        }
        this.mailboxProcessor.close();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public CompletableFuture<Void> getCompletionFuture() {
        return FutureUtils.completedVoidFuture();
    }

    public final void cancel() throws Exception {
        this.isRunning = false;
        this.canceled = true;
        try {
            cancelTask();
        } finally {
            getCompletionFuture().whenComplete((r5, th) -> {
                this.mailboxProcessor.allActionsCompleted();
                try {
                    this.cancelables.close();
                } catch (IOException e) {
                    throw new CompletionException(e);
                }
            });
        }
    }

    public MailboxExecutorFactory getMailboxExecutorFactory() {
        MailboxProcessor mailboxProcessor = this.mailboxProcessor;
        mailboxProcessor.getClass();
        return mailboxProcessor::getMailboxExecutor;
    }

    public final boolean isRunning() {
        return this.isRunning;
    }

    public final boolean isCanceled() {
        return this.canceled;
    }

    public final boolean isFailing() {
        return this.failing;
    }

    private void shutdownAsyncThreads() throws Exception {
        if (this.asyncOperationsThreadPool.isShutdown()) {
            return;
        }
        this.asyncOperationsThreadPool.shutdownNow();
    }

    /* JADX WARN: Type inference failed for: r0v15, types: [org.apache.flink.streaming.api.operators.StreamOperator] */
    private void disposeAllOperators(boolean z) throws Exception {
        if (this.operatorChain == null || this.disposedOperators) {
            return;
        }
        Iterator<StreamOperatorWrapper<?, ?>> it = this.operatorChain.getAllOperators(true).iterator();
        while (it.hasNext()) {
            ?? streamOperator = it.next().getStreamOperator();
            if (z) {
                try {
                    streamOperator.dispose();
                } catch (Exception e) {
                    LOG.error("Error during disposal of stream operator.", e);
                }
            } else {
                streamOperator.dispose();
            }
        }
        this.disposedOperators = true;
    }

    protected void finalize() throws Throwable {
        super.finalize();
        if (!this.timerService.isTerminated()) {
            LOG.info("Timer service is shutting down.");
            this.timerService.shutdownService();
        }
        this.cancelables.close();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isSerializingTimestamps() {
        TimeCharacteristic timeCharacteristic = this.configuration.getTimeCharacteristic();
        return (timeCharacteristic == TimeCharacteristic.EventTime) | (timeCharacteristic == TimeCharacteristic.IngestionTime);
    }

    public final String getName() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public String getTaskNameWithSubtaskAndId() {
        return getEnvironment().getTaskInfo().getTaskNameWithSubtasks() + " (" + getEnvironment().getExecutionId() + ')';
    }

    public CheckpointStorageWorkerView getCheckpointStorage() {
        return this.subtaskCheckpointCoordinator.getCheckpointStorage();
    }

    public StreamConfig getConfiguration() {
        return this.configuration;
    }

    public StreamStatusMaintainer getStreamStatusMaintainer() {
        return this.operatorChain;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public RecordWriterOutput<?>[] getStreamOutputs() {
        return this.operatorChain.getStreamOutputs();
    }

    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.mainMailboxExecutor.execute(() -> {
            this.latestAsyncCheckpointStartDelayNanos = 1000000 * Math.max(0L, System.currentTimeMillis() - checkpointMetaData.getTimestamp());
            try {
                completableFuture.complete(Boolean.valueOf(triggerCheckpoint(checkpointMetaData, checkpointOptions)));
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                throw e;
            }
        }, "checkpoint %s with %s", checkpointMetaData, checkpointOptions);
        return completableFuture;
    }

    private boolean triggerCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions) throws Exception {
        try {
            CheckpointMetrics alignmentDurationNanos = new CheckpointMetrics().setAlignmentDurationNanos(0L);
            this.subtaskCheckpointCoordinator.initCheckpoint(checkpointMetaData.getCheckpointId(), checkpointOptions);
            boolean performCheckpoint = performCheckpoint(checkpointMetaData, checkpointOptions, alignmentDurationNanos);
            if (!performCheckpoint) {
                declineCheckpoint(checkpointMetaData.getCheckpointId());
            }
            return performCheckpoint;
        } catch (Exception e) {
            if (this.isRunning) {
                throw new Exception("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e);
            }
            LOG.debug("Could not perform checkpoint {} for operator {} while the invokable was not in state running.", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), getName(), e});
            return false;
        }
    }

    public <E extends Exception> void executeInTaskThread(ThrowingRunnable<E> throwingRunnable, String str, Object... objArr) throws Exception {
        if (this.mailboxProcessor.isMailboxThread()) {
            throwingRunnable.run();
        } else {
            this.mainMailboxExecutor.execute(throwingRunnable, str, objArr);
        }
    }

    public void triggerCheckpointOnBarrier(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws IOException {
        try {
            if (performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics) && isSynchronousSavepointId(checkpointMetaData.getCheckpointId())) {
                runSynchronousSavepointMailboxLoop();
            }
        } catch (CancelTaskException e) {
            LOG.info("Operator {} was cancelled while performing checkpoint {}.", getName(), Long.valueOf(checkpointMetaData.getCheckpointId()));
            throw e;
        } catch (Exception e2) {
            throw new IOException("Could not perform checkpoint " + checkpointMetaData.getCheckpointId() + " for operator " + getName() + '.', e2);
        }
    }

    public void abortCheckpointOnBarrier(long j, Throwable th) throws IOException {
        resetSynchronousSavepointId(j, false);
        this.subtaskCheckpointCoordinator.abortCheckpointOnBarrier(j, th, this.operatorChain);
    }

    private boolean performCheckpoint(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, CheckpointMetrics checkpointMetrics) throws Exception {
        LOG.debug("Starting checkpoint ({}) {} on task {}", new Object[]{Long.valueOf(checkpointMetaData.getCheckpointId()), checkpointOptions.getCheckpointType(), getName()});
        if (this.isRunning) {
            this.actionExecutor.runThrowing(() -> {
                if (checkpointOptions.getCheckpointType().isSynchronous()) {
                    setSynchronousSavepointId(checkpointMetaData.getCheckpointId(), checkpointOptions.getCheckpointType().shouldIgnoreEndOfInput());
                    if (checkpointOptions.getCheckpointType().shouldAdvanceToEndOfTime()) {
                        advanceToEndOfEventTime();
                    }
                } else if (this.activeSyncSavepointId != null && this.activeSyncSavepointId.longValue() < checkpointMetaData.getCheckpointId()) {
                    this.activeSyncSavepointId = null;
                    this.operatorChain.setIgnoreEndOfInput(false);
                }
                this.subtaskCheckpointCoordinator.checkpointState(checkpointMetaData, checkpointOptions, checkpointMetrics, this.operatorChain, this::isRunning);
            });
            return true;
        }
        this.actionExecutor.runThrowing(() -> {
            this.recordWriter.broadcastEvent(new CancelCheckpointMarker(checkpointMetaData.getCheckpointId()));
        });
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void declineCheckpoint(long j) {
        getEnvironment().declineCheckpoint(j, new CheckpointException("Task Name" + getName(), CheckpointFailureReason.CHECKPOINT_DECLINED_TASK_NOT_READY));
    }

    public final ExecutorService getAsyncOperationsThreadPool() {
        return this.asyncOperationsThreadPool;
    }

    public Future<Void> notifyCheckpointCompleteAsync(long j) {
        return notifyCheckpointOperation(() -> {
            notifyCheckpointComplete(j);
        }, String.format("checkpoint %d complete", Long.valueOf(j)));
    }

    public Future<Void> notifyCheckpointAbortAsync(long j) {
        return notifyCheckpointOperation(() -> {
            resetSynchronousSavepointId(j, false);
            this.subtaskCheckpointCoordinator.notifyCheckpointAborted(j, this.operatorChain, this::isRunning);
        }, String.format("checkpoint %d aborted", Long.valueOf(j)));
    }

    private Future<Void> notifyCheckpointOperation(RunnableWithException runnableWithException, String str) {
        CompletableFuture completableFuture = new CompletableFuture();
        this.mailboxProcessor.getMailboxExecutor(TaskMailbox.MAX_PRIORITY).execute(() -> {
            try {
                runnableWithException.run();
                completableFuture.complete(null);
            } catch (Exception e) {
                completableFuture.completeExceptionally(e);
                throw e;
            }
        }, str);
        return completableFuture;
    }

    private void notifyCheckpointComplete(long j) throws Exception {
        this.subtaskCheckpointCoordinator.notifyCheckpointComplete(j, this.operatorChain, this::isRunning);
        if (this.isRunning && isSynchronousSavepointId(j)) {
            finishTask();
            resetSynchronousSavepointId(j, true);
        }
    }

    private void tryShutdownTimerService() {
        if (this.timerService.isTerminated()) {
            return;
        }
        try {
            long j = getEnvironment().getTaskManagerInfo().getConfiguration().getLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT_TIMERS);
            if (!this.timerService.shutdownServiceUninterruptible(j)) {
                LOG.warn("Timer service shutdown exceeded time limit of {} ms while waiting for pending timers. Will continue with shutdown procedure.", Long.valueOf(j));
            }
        } catch (Throwable th) {
            LOG.error("Could not shut down timer service", th);
        }
    }

    public void dispatchOperatorEvent(OperatorID operatorID, SerializedValue<OperatorEvent> serializedValue) throws FlinkException {
        try {
            this.mainMailboxExecutor.execute(() -> {
                try {
                    this.operatorChain.dispatchOperatorEvent(operatorID, serializedValue);
                } catch (Throwable th) {
                    this.mailboxProcessor.reportThrowable(th);
                }
            }, "dispatch operator event");
        } catch (RejectedExecutionException e) {
        }
    }

    private StateBackend createStateBackend() throws Exception {
        return StateBackendLoader.fromApplicationOrConfigOrDefault(this.configuration.getStateBackend(getUserCodeClassLoader()), getEnvironment().getTaskManagerInfo().getConfiguration(), getUserCodeClassLoader(), LOG);
    }

    @VisibleForTesting
    TimerService getTimerService() {
        return this.timerService;
    }

    @VisibleForTesting
    OP getHeadOperator() {
        return this.headOperator;
    }

    @VisibleForTesting
    StreamTaskActionExecutor getActionExecutor() {
        return this.actionExecutor;
    }

    public ProcessingTimeServiceFactory getProcessingTimeServiceFactory() {
        return mailboxExecutor -> {
            return new ProcessingTimeServiceImpl(this.timerService, processingTimeCallback -> {
                return deferCallbackToMailbox(mailboxExecutor, processingTimeCallback);
            });
        };
    }

    @Override // org.apache.flink.streaming.runtime.tasks.AsyncExceptionHandler
    public void handleAsyncException(String str, Throwable th) {
        if (this.isRunning) {
            this.asyncExceptionHandler.handleAsyncException(str, th);
        }
    }

    public String toString() {
        return getName();
    }

    public final CloseableRegistry getCancelables() {
        return this.cancelables;
    }

    @VisibleForTesting
    public static <OUT> RecordWriterDelegate<SerializationDelegate<StreamRecord<OUT>>> createRecordWriterDelegate(StreamConfig streamConfig, Environment environment) {
        List createRecordWriters = createRecordWriters(streamConfig, environment);
        return createRecordWriters.size() == 1 ? new SingleRecordWriter((RecordWriter) createRecordWriters.get(0)) : createRecordWriters.size() == 0 ? new NonRecordWriter() : new MultipleRecordWriters(createRecordWriters);
    }

    private static <OUT> List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>> createRecordWriters(StreamConfig streamConfig, Environment environment) {
        ArrayList arrayList = new ArrayList();
        List<StreamEdge> outEdgesInOrder = streamConfig.getOutEdgesInOrder(environment.getUserClassLoader());
        for (int i = 0; i < outEdgesInOrder.size(); i++) {
            StreamEdge streamEdge = outEdgesInOrder.get(i);
            arrayList.add(createRecordWriter(streamEdge, i, environment, environment.getTaskInfo().getTaskName(), streamEdge.getBufferTimeout()));
        }
        return arrayList;
    }

    private static <OUT> RecordWriter<SerializationDelegate<StreamRecord<OUT>>> createRecordWriter(StreamEdge streamEdge, int i, Environment environment, String str, long j) {
        int numTargetKeyGroups;
        StreamPartitioner streamPartitioner = null;
        try {
            streamPartitioner = (StreamPartitioner) InstantiationUtil.clone(streamEdge.getPartitioner(), environment.getUserClassLoader());
        } catch (Exception e) {
            ExceptionUtils.rethrow(e);
        }
        LOG.debug("Using partitioner {} for output {} of task {}", new Object[]{streamPartitioner, Integer.valueOf(i), str});
        ResultPartitionWriter writer = environment.getWriter(i);
        if ((streamPartitioner instanceof ConfigurableStreamPartitioner) && 0 < (numTargetKeyGroups = writer.getNumTargetKeyGroups())) {
            ((ConfigurableStreamPartitioner) streamPartitioner).configure(numTargetKeyGroups);
        }
        RecordWriter<SerializationDelegate<StreamRecord<OUT>>> build = new RecordWriterBuilder().setChannelSelector(streamPartitioner).setTimeout(j).setTaskName(str).build(writer);
        build.setMetricGroup(environment.getMetricGroup().getIOMetricGroup());
        return build;
    }

    private void handleTimerException(Exception exc) {
        handleAsyncException("Caught exception while processing timer.", new TimerException(exc));
    }

    @VisibleForTesting
    ProcessingTimeCallback deferCallbackToMailbox(MailboxExecutor mailboxExecutor, ProcessingTimeCallback processingTimeCallback) {
        return j -> {
            mailboxExecutor.execute(() -> {
                invokeProcessingTimeCallback(processingTimeCallback, j);
            }, "Timer callback for %s @ %d", processingTimeCallback, Long.valueOf(j));
        };
    }

    private void invokeProcessingTimeCallback(ProcessingTimeCallback processingTimeCallback, long j) {
        try {
            processingTimeCallback.onProcessingTime(j);
        } catch (Throwable th) {
            handleAsyncException("Caught exception while processing timer.", new TimerException(th));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public long getAsyncCheckpointStartDelayNanos() {
        return this.latestAsyncCheckpointStartDelayNanos;
    }

    static {
        $assertionsDisabled = !StreamTask.class.desiredAssertionStatus();
        TRIGGER_THREAD_GROUP = new ThreadGroup("Triggers");
        LOG = LoggerFactory.getLogger(StreamTask.class);
    }
}
