package org.apache.flink.streaming.connectors.gcp.pubsub.common;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;

/* loaded from: input_file:org/apache/flink/streaming/connectors/gcp/pubsub/common/AcknowledgeOnCheckpoint.class */
public class AcknowledgeOnCheckpoint<ACKID extends Serializable> implements CheckpointListener, ListCheckpointed<AcknowledgeIdsForCheckpoint<ACKID>> {
    private final Acknowledger<ACKID> acknowledger;
    private List<AcknowledgeIdsForCheckpoint<ACKID>> acknowledgeIdsPerCheckpoint = new ArrayList();
    private List<ACKID> acknowledgeIdsForPendingCheckpoint = new ArrayList();
    private AtomicInteger outstandingAcknowledgements = new AtomicInteger(0);

    public AcknowledgeOnCheckpoint(Acknowledger<ACKID> acknowledger) {
        this.acknowledger = acknowledger;
    }

    public void addAcknowledgeId(ACKID ackid) {
        this.acknowledgeIdsForPendingCheckpoint.add(ackid);
        this.outstandingAcknowledgements.incrementAndGet();
    }

    public void notifyCheckpointComplete(long j) throws Exception {
        this.acknowledger.acknowledge((List) this.acknowledgeIdsPerCheckpoint.stream().filter(acknowledgeIdsForCheckpoint -> {
            return acknowledgeIdsForCheckpoint.getCheckpointId() <= j;
        }).flatMap(acknowledgeIdsForCheckpoint2 -> {
            return acknowledgeIdsForCheckpoint2.getAcknowledgeIds().stream();
        }).collect(Collectors.toList()));
        this.acknowledgeIdsPerCheckpoint = (List) this.acknowledgeIdsPerCheckpoint.stream().filter(acknowledgeIdsForCheckpoint3 -> {
            return acknowledgeIdsForCheckpoint3.getCheckpointId() > j;
        }).collect(Collectors.toList());
        this.outstandingAcknowledgements = new AtomicInteger(numberOfAcknowledgementIds(this.acknowledgeIdsPerCheckpoint));
    }

    public List<AcknowledgeIdsForCheckpoint<ACKID>> snapshotState(long j, long j2) throws Exception {
        this.acknowledgeIdsPerCheckpoint.add(new AcknowledgeIdsForCheckpoint<>(j, this.acknowledgeIdsForPendingCheckpoint));
        this.acknowledgeIdsForPendingCheckpoint = new ArrayList();
        return this.acknowledgeIdsPerCheckpoint;
    }

    public void restoreState(List<AcknowledgeIdsForCheckpoint<ACKID>> list) throws Exception {
        this.outstandingAcknowledgements = new AtomicInteger(numberOfAcknowledgementIds(list));
        this.acknowledgeIdsPerCheckpoint = list;
    }

    private int numberOfAcknowledgementIds(List<AcknowledgeIdsForCheckpoint<ACKID>> list) {
        return list.stream().map((v0) -> {
            return v0.getAcknowledgeIds();
        }).mapToInt((v0) -> {
            return v0.size();
        }).sum();
    }

    public int numberOfOutstandingAcknowledgements() {
        return this.outstandingAcknowledgements.get();
    }
}
