package org.apache.flink.streaming.runtime.tasks;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.util.FatalExitExceptionHandler;
import org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTask.class */
public class SourceStreamTask<OUT, SRC extends SourceFunction<OUT>, OP extends StreamSource<OUT, SRC>> extends StreamTask<OUT, OP> {
    private final SourceStreamTask<OUT, SRC, OP>.LegacySourceFunctionThread sourceThread;
    private final Object lock;
    private volatile boolean externallyInducedCheckpoints;
    private volatile boolean isFinished;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/tasks/SourceStreamTask$LegacySourceFunctionThread.class */
    public class LegacySourceFunctionThread extends Thread {
        private final CompletableFuture<Void> completionFuture = new CompletableFuture<>();

        LegacySourceFunctionThread() {
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                ((StreamSource) SourceStreamTask.this.headOperator).run(SourceStreamTask.this.lock, SourceStreamTask.this.getStreamStatusMaintainer(), SourceStreamTask.this.operatorChain);
                this.completionFuture.complete(null);
            } catch (Throwable th) {
                this.completionFuture.completeExceptionally(th);
            }
        }

        public void setTaskDescription(String str) {
            setName("Legacy Source Thread - " + str);
        }

        CompletableFuture<Void> getCompletionFuture() {
            return (!SourceStreamTask.this.isFailing() || isAlive()) ? this.completionFuture : CompletableFuture.completedFuture(null);
        }
    }

    public SourceStreamTask(Environment environment) throws Exception {
        this(environment, new Object());
    }

    private SourceStreamTask(Environment environment, Object obj) throws Exception {
        super(environment, null, FatalExitExceptionHandler.INSTANCE, StreamTaskActionExecutor.synchronizedExecutor(obj));
        this.isFinished = false;
        this.lock = Preconditions.checkNotNull(obj);
        this.sourceThread = new LegacySourceFunctionThread();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void init() {
        SourceFunction sourceFunction = (SourceFunction) ((StreamSource) this.headOperator).getUserFunction();
        if (sourceFunction instanceof ExternallyInducedSource) {
            this.externallyInducedCheckpoints = true;
            ((ExternallyInducedSource) sourceFunction).setCheckpointTrigger(new ExternallyInducedSource.CheckpointTrigger() { // from class: org.apache.flink.streaming.runtime.tasks.SourceStreamTask.1
                @Override // org.apache.flink.streaming.api.checkpoint.ExternallyInducedSource.CheckpointTrigger
                public void triggerCheckpoint(long j) throws FlinkException {
                    CheckpointOptions forCheckpointWithDefaultLocation = CheckpointOptions.forCheckpointWithDefaultLocation(SourceStreamTask.this.configuration.isExactlyOnceCheckpointMode(), SourceStreamTask.this.configuration.isUnalignedCheckpointsEnabled());
                    try {
                        SourceStreamTask.super.triggerCheckpointAsync(new CheckpointMetaData(j, System.currentTimeMillis()), forCheckpointWithDefaultLocation, false).get();
                    } catch (RuntimeException e) {
                        throw e;
                    } catch (Exception e2) {
                        throw new FlinkException(e2.getMessage(), e2);
                    }
                }
            });
        }
        getEnvironment().getMetricGroup().getIOMetricGroup().gauge("checkpointStartDelayNanos", this::getAsyncCheckpointStartDelayNanos);
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void advanceToEndOfEventTime() throws Exception {
        ((StreamSource) this.headOperator).advanceToEndOfEventTime();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cleanup() {
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void processInput(MailboxDefaultAction.Controller controller) throws Exception {
        controller.suspendDefaultAction();
        this.sourceThread.setTaskDescription(getName());
        this.sourceThread.start();
        this.sourceThread.getCompletionFuture().whenComplete((r6, th) -> {
            if (isCanceled() && ExceptionUtils.findThrowable(th, InterruptedException.class).isPresent()) {
                this.mailboxProcessor.reportThrowable(new CancelTaskException(th));
            } else if (this.isFinished || th == null) {
                this.mailboxProcessor.allActionsCompleted();
            } else {
                this.mailboxProcessor.reportThrowable(th);
            }
        });
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void cancelTask() {
        boolean isAlive;
        try {
            if (this.headOperator != 0) {
                ((StreamSource) this.headOperator).cancel();
            }
            if (isAlive) {
                return;
            }
        } finally {
            if (this.sourceThread.isAlive()) {
                this.sourceThread.interrupt();
            } else if (!this.sourceThread.getCompletionFuture().isDone()) {
                this.sourceThread.getCompletionFuture().complete(null);
            }
        }
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    protected void finishTask() throws Exception {
        this.isFinished = true;
        cancelTask();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public CompletableFuture<Void> getCompletionFuture() {
        return this.sourceThread.getCompletionFuture();
    }

    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public Future<Boolean> triggerCheckpointAsync(CheckpointMetaData checkpointMetaData, CheckpointOptions checkpointOptions, boolean z) {
        CompletableFuture completedFuture;
        if (!this.externallyInducedCheckpoints) {
            return super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, z);
        }
        synchronized (this.lock) {
            completedFuture = CompletableFuture.completedFuture(Boolean.valueOf(isRunning()));
        }
        return completedFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.streaming.runtime.tasks.StreamTask
    public void declineCheckpoint(long j) {
        if (this.externallyInducedCheckpoints) {
            return;
        }
        super.declineCheckpoint(j);
    }
}
