/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.checkpointing.utils;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.accumulators.ListAccumulator;
import org.apache.flink.api.common.functions.OpenContext;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.legacy.SinkFunction;

public class AccumulatingIntegerSink
extends RichSinkFunction<Integer>
implements CheckpointListener,
CheckpointedFunction {
    private static final String ACCUMULATOR_NAME = "output";
    private List<Integer> current = new ArrayList<Integer>();
    private final Map<Long, List<Integer>> pendingForAccumulator = new HashMap<Long, List<Integer>>();
    private final ListAccumulator<Integer> accumulator = new ListAccumulator();
    private final int delayMillis;

    public AccumulatingIntegerSink(int delayMillis) {
        this.delayMillis = delayMillis;
    }

    public void open(OpenContext openContext) throws Exception {
        super.open(openContext);
        this.getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, this.accumulator);
    }

    public void invoke(Integer value, SinkFunction.Context context) throws InterruptedException {
        this.current.add(value);
        if (this.delayMillis > 0) {
            Thread.sleep(this.delayMillis);
        }
    }

    public void initializeState(FunctionInitializationContext context) throws Exception {
    }

    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        this.pendingForAccumulator.put(context.getCheckpointId(), this.current);
        this.current = new ArrayList<Integer>();
    }

    public void notifyCheckpointComplete(long checkpointId) {
        this.pendingForAccumulator.remove(checkpointId).forEach(arg_0 -> this.accumulator.add(arg_0));
    }

    public void notifyCheckpointAborted(long checkpointId) {
    }

    public static List<Integer> getOutput(Map<String, Object> accumulators) {
        return (List)accumulators.get(ACCUMULATOR_NAME);
    }
}

