package org.apache.flink.test.checkpointing.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

/* loaded from: input_file:org/apache/flink/test/checkpointing/utils/AccumulatingIntegerSink.class */
public class AccumulatingIntegerSink extends RichSinkFunction<Integer> implements CheckpointListener, CheckpointedFunction {
    private static final String ACCUMULATOR_NAME = "output";
    private List<Integer> current = new ArrayList();
    private final Map<Long, List<Integer>> pendingForAccumulator = new HashMap();
    private final ListAccumulator<Integer> accumulator = new ListAccumulator<>();
    private final int delayMillis;

    public AccumulatingIntegerSink(int i) {
        this.delayMillis = i;
    }

    public void open(Configuration configuration) throws Exception {
        super.open(configuration);
        getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, this.accumulator);
    }

    public void invoke(Integer num, SinkFunction.Context context) throws InterruptedException {
        this.current.add(num);
        if (this.delayMillis > 0) {
            Thread.sleep(this.delayMillis);
        }
    }

    public void initializeState(FunctionInitializationContext functionInitializationContext) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
        this.pendingForAccumulator.put(Long.valueOf(functionSnapshotContext.getCheckpointId()), this.current);
        this.current = new ArrayList();
    }

    public void notifyCheckpointComplete(long j) {
        List<Integer> remove = this.pendingForAccumulator.remove(Long.valueOf(j));
        ListAccumulator<Integer> listAccumulator = this.accumulator;
        listAccumulator.getClass();
        remove.forEach((v1) -> {
            r1.add(v1);
        });
    }

    public void notifyCheckpointAborted(long j) {
    }

    public static List<Integer> getOutput(Map<String, Object> map) {
        return (List) map.get(ACCUMULATOR_NAME);
    }
}
