/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import java.io.Serializable;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nonnegative;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
import org.apache.flink.test.checkpointing.utils.IntType;
import org.junit.Assert;

public class FailingSource
extends RichSourceFunction<Tuple2<Long, IntType>>
implements ListCheckpointed<Integer>,
CheckpointListener {
    private static final long INITIAL = Long.MIN_VALUE;
    private static final long STATEFUL_CHECKPOINT_COMPLETED = Long.MIN_VALUE;
    @Nonnull
    private final EventEmittingGenerator eventEmittingGenerator;
    private final int expectedEmitCalls;
    private final int failureAfterNumElements;
    private final boolean usingProcessingTime;
    private final AtomicLong checkpointStatus;
    private int emitCallCount;
    private volatile boolean running;

    public FailingSource(@Nonnull EventEmittingGenerator eventEmittingGenerator, @Nonnegative int numberOfGeneratorInvocations) {
        this(eventEmittingGenerator, numberOfGeneratorInvocations, false);
    }

    public FailingSource(@Nonnull EventEmittingGenerator eventEmittingGenerator, @Nonnegative int numberOfGeneratorInvocations, boolean usingProcessingTime) {
        this.eventEmittingGenerator = eventEmittingGenerator;
        this.running = true;
        this.emitCallCount = 0;
        this.expectedEmitCalls = numberOfGeneratorInvocations;
        this.failureAfterNumElements = numberOfGeneratorInvocations / 2;
        this.checkpointStatus = new AtomicLong(Long.MIN_VALUE);
        this.usingProcessingTime = usingProcessingTime;
    }

    public void open(OpenContext openContext) {
        Assert.assertEquals((long)1L, (long)this.getRuntimeContext().getTaskInfo().getNumberOfParallelSubtasks());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void run(SourceFunction.SourceContext<Tuple2<Long, IntType>> ctx) throws Exception {
        boolean failThisTask;
        RuntimeContext runtimeContext = this.getRuntimeContext();
        boolean bl = failThisTask = runtimeContext.getTaskInfo().getAttemptNumber() == 0 && runtimeContext.getTaskInfo().getIndexOfThisSubtask() == 0;
        while (this.running && this.emitCallCount < this.expectedEmitCalls) {
            Object object = ctx.getCheckpointLock();
            synchronized (object) {
                this.eventEmittingGenerator.emitEvent(ctx, this.emitCallCount++);
            }
            if (this.emitCallCount < this.failureAfterNumElements) {
                Thread.sleep(1L);
                continue;
            }
            if (!failThisTask || this.emitCallCount != this.failureAfterNumElements) continue;
            while (this.checkpointStatus.get() != Long.MIN_VALUE) {
                Thread.sleep(1L);
            }
            throw new Exception("Artificial Failure");
        }
        if (this.usingProcessingTime) {
            while (this.running) {
                Thread.sleep(10L);
            }
        }
    }

    public void cancel() {
        this.running = false;
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.checkpointStatus.compareAndSet(checkpointId, Long.MIN_VALUE);
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
        if (this.emitCallCount > this.failureAfterNumElements / 2) {
            this.checkpointStatus.compareAndSet(Long.MIN_VALUE, checkpointId);
        }
        return Collections.singletonList(this.emitCallCount);
    }

    public void restoreState(List<Integer> state) throws Exception {
        if (state.isEmpty() || state.size() > 1) {
            throw new RuntimeException("Test failed due to unexpected recovered state size " + state.size());
        }
        this.emitCallCount = state.get(0);
    }

    @FunctionalInterface
    public static interface EventEmittingGenerator
    extends Serializable {
        public void emitEvent(SourceFunction.SourceContext<Tuple2<Long, IntType>> var1, int var2);
    }
}

