package org.apache.flink.streaming.runtime.operators.sink.committables;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.util.Preconditions;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/committables/CheckpointCommittableManagerImpl.class */
public class CheckpointCommittableManagerImpl<CommT> implements CheckpointCommittableManager<CommT> {
    private final Map<Integer, SubtaskCommittableManager<CommT>> subtasksCommittableManagers;

    @Nullable
    private final Long checkpointId;
    private final int subtaskId;
    private final int numberOfSubtasks;

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointCommittableManagerImpl(int i, int i2, @Nullable Long l) {
        this(new HashMap(), i, i2, l);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointCommittableManagerImpl(Map<Integer, SubtaskCommittableManager<CommT>> map, int i, int i2, @Nullable Long l) {
        this.subtasksCommittableManagers = (Map) Preconditions.checkNotNull(map);
        this.subtaskId = i;
        this.numberOfSubtasks = i2;
        this.checkpointId = l;
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager
    public long getCheckpointId() {
        Preconditions.checkNotNull(this.checkpointId);
        return this.checkpointId.longValue();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Collection<SubtaskCommittableManager<CommT>> getSubtaskCommittableManagers() {
        return this.subtasksCommittableManagers.values();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void upsertSummary(CommittableSummary<CommT> committableSummary) {
        if (this.subtasksCommittableManagers.putIfAbsent(Integer.valueOf(committableSummary.getSubtaskId()), new SubtaskCommittableManager<>(committableSummary.getNumberOfCommittables(), this.subtaskId, committableSummary.getCheckpointId().isPresent() ? Long.valueOf(committableSummary.getCheckpointId().getAsLong()) : null)) != null) {
            throw new UnsupportedOperationException("Currently it is not supported to update the CommittableSummary for a checkpoint coming from the same subtask. Please check the status of FLINK-25920");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addCommittable(CommittableWithLineage<CommT> committableWithLineage) {
        getSubtaskCommittableManager(committableWithLineage.getSubtaskId()).add((CommittableWithLineage) committableWithLineage);
    }

    SubtaskCommittableManager<CommT> getSubtaskCommittableManager(int i) {
        return (SubtaskCommittableManager) Preconditions.checkNotNull(this.subtasksCommittableManagers.get(Integer.valueOf(i)), "Unknown subtask for %s", Integer.valueOf(i));
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager
    public CommittableSummary<CommT> getSummary() {
        return new CommittableSummary<>(this.subtaskId, this.numberOfSubtasks, this.checkpointId, this.subtasksCommittableManagers.values().stream().mapToInt((v0) -> {
            return v0.getNumCommittables();
        }).sum(), this.subtasksCommittableManagers.values().stream().mapToInt((v0) -> {
            return v0.getNumPending();
        }).sum(), this.subtasksCommittableManagers.values().stream().mapToInt((v0) -> {
            return v0.getNumFailed();
        }).sum());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public boolean isFinished() {
        return this.subtasksCommittableManagers.values().stream().allMatch((v0) -> {
            return v0.isFinished();
        });
    }

    @Override // org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager
    public Collection<CommittableWithLineage<CommT>> commit(boolean z, Committer<CommT> committer) throws IOException, InterruptedException {
        Collection<CommitRequestImpl<CommT>> pendingRequests = getPendingRequests(z);
        pendingRequests.forEach((v0) -> {
            v0.setSelected();
        });
        committer.commit(new ArrayList(pendingRequests));
        pendingRequests.forEach((v0) -> {
            v0.setCommittedIfNoError();
        });
        return drainFinished();
    }

    Collection<CommitRequestImpl<CommT>> getPendingRequests(boolean z) {
        return (Collection) this.subtasksCommittableManagers.values().stream().filter(subtaskCommittableManager -> {
            return !z || subtaskCommittableManager.hasReceivedAll();
        }).flatMap((v0) -> {
            return v0.getPendingRequests();
        }).collect(Collectors.toList());
    }

    Collection<CommittableWithLineage<CommT>> drainFinished() {
        return (Collection) this.subtasksCommittableManagers.values().stream().flatMap(subtaskCommittableManager -> {
            return subtaskCommittableManager.drainCommitted().stream();
        }).collect(Collectors.toList());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointCommittableManagerImpl<CommT> merge(CheckpointCommittableManagerImpl<CommT> checkpointCommittableManagerImpl) {
        Preconditions.checkArgument(Objects.equals(checkpointCommittableManagerImpl.checkpointId, this.checkpointId));
        for (Map.Entry<Integer, SubtaskCommittableManager<CommT>> entry : checkpointCommittableManagerImpl.subtasksCommittableManagers.entrySet()) {
            this.subtasksCommittableManagers.merge(entry.getKey(), entry.getValue(), (v0, v1) -> {
                return v0.merge(v1);
            });
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public CheckpointCommittableManagerImpl<CommT> copy() {
        return new CheckpointCommittableManagerImpl<>((Map) this.subtasksCommittableManagers.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return ((SubtaskCommittableManager) entry.getValue()).copy();
        })), this.subtaskId, this.numberOfSubtasks, this.checkpointId);
    }
}
