package org.apache.flink.test.checkpointing;

import java.util.Collection;
import java.util.HashMap;
import java.util.Optional;
import java.util.concurrent.FutureTask;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.AbstractSnapshotStrategy;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.util.TestUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase.class */
public class CheckpointFailureManagerITCase extends TestLogger {
    private static MiniClusterWithClientResource cluster;

    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase$AsyncFailureStateBackend.class */
    private static class AsyncFailureStateBackend extends MemoryStateBackend {
        private static final long serialVersionUID = 1;

        private AsyncFailureStateBackend() {
        }

        /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.flink.test.checkpointing.CheckpointFailureManagerITCase$AsyncFailureStateBackend$1] */
        public OperatorStateBackend createOperatorStateBackend(Environment environment, String str, @Nonnull Collection<OperatorStateHandle> collection, CloseableRegistry closeableRegistry) {
            return new DefaultOperatorStateBackendBuilder(environment.getUserCodeClassLoader().asClassLoader(), environment.getExecutionConfig(), true, collection, closeableRegistry) { // from class: org.apache.flink.test.checkpointing.CheckpointFailureManagerITCase.AsyncFailureStateBackend.1
                /* renamed from: build, reason: merged with bridge method [inline-methods] */
                public DefaultOperatorStateBackend m724build() {
                    return new DefaultOperatorStateBackend(this.executionConfig, this.cancelStreamRegistry, new HashMap(), new HashMap(), new HashMap(), new HashMap(), (AbstractSnapshotStrategy) Mockito.mock(AbstractSnapshotStrategy.class)) { // from class: org.apache.flink.test.checkpointing.CheckpointFailureManagerITCase.AsyncFailureStateBackend.1.1
                        @Nonnull
                        public RunnableFuture<SnapshotResult<OperatorStateHandle>> snapshot(long j, long j2, @Nonnull CheckpointStreamFactory checkpointStreamFactory, @Nonnull CheckpointOptions checkpointOptions) {
                            return new FutureTask(() -> {
                                throw new Exception("Expected async snapshot exception.");
                            });
                        }
                    };
                }
            }.m724build();
        }

        /* renamed from: configure, reason: merged with bridge method [inline-methods] and merged with bridge method [inline-methods] */
        public AsyncFailureStateBackend m723configure(ReadableConfig readableConfig, ClassLoader classLoader) {
            return this;
        }
    }

    /* loaded from: input_file:org/apache/flink/test/checkpointing/CheckpointFailureManagerITCase$StringGeneratingSourceFunction.class */
    private static class StringGeneratingSourceFunction extends RichParallelSourceFunction<String> implements CheckpointedFunction {
        private static final long serialVersionUID = 1;
        private final byte[] randomBytes;
        private ListState<Long> listState;
        private long emitted;
        private volatile boolean isRunning;
        private static final ListStateDescriptor<Long> stateDescriptor = new ListStateDescriptor<>("emitted", Long.class);
        public static final AtomicInteger INITIALIZE_TIMES = new AtomicInteger(0);

        private StringGeneratingSourceFunction() {
            this.randomBytes = new byte[10];
            this.emitted = 0L;
            this.isRunning = true;
        }

        public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
            this.listState.clear();
            this.listState.add(Long.valueOf(this.emitted));
        }

        public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
            this.listState = functionInitializationContext.getOperatorStateStore().getListState(stateDescriptor);
            INITIALIZE_TIMES.addAndGet(1);
        }

        public void run(SourceFunction.SourceContext<String> sourceContext) throws Exception {
            while (this.isRunning) {
                ThreadLocalRandom.current().nextBytes(this.randomBytes);
                synchronized (sourceContext.getCheckpointLock()) {
                    sourceContext.collect(new String(this.randomBytes));
                    this.emitted++;
                }
                Thread.sleep(10L);
            }
        }

        public void cancel() {
            this.isRunning = false;
        }
    }

    @Before
    public void setup() throws Exception {
        cluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder().setConfiguration(new Configuration()).build());
        cluster.before();
    }

    @AfterClass
    public static void shutDownExistingCluster() {
        if (cluster != null) {
            cluster.after();
            cluster = null;
        }
    }

    @Test(timeout = 10000)
    public void testAsyncCheckpointFailureTriggerJobFailed() throws Exception {
        StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();
        executionEnvironment.enableCheckpointing(100L);
        executionEnvironment.setRestartStrategy(RestartStrategies.noRestart());
        executionEnvironment.setStateBackend(new AsyncFailureStateBackend());
        executionEnvironment.addSource(new StringGeneratingSourceFunction()).addSink(new DiscardingSink());
        try {
            TestUtils.submitJobAndWaitForResult(cluster.getClusterClient(), StreamingJobGraphGenerator.createJobGraph(executionEnvironment.getStreamGraph()), getClass().getClassLoader());
        } catch (JobExecutionException e) {
            Optional findThrowable = ExceptionUtils.findThrowable(e, FlinkRuntimeException.class);
            Assert.assertTrue(findThrowable.isPresent());
            Assert.assertEquals("Exceeded checkpoint tolerable failure threshold.", ((FlinkRuntimeException) findThrowable.get()).getMessage());
        }
        Assert.assertEquals(1L, StringGeneratingSourceFunction.INITIALIZE_TIMES.get());
    }
}
