package gobblin.writer;

import com.google.common.base.Optional;
import gobblin.source.extractor.CheckpointableWatermark;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;

/* loaded from: input_file:WEB-INF/lib/gobblin-core-base-0.11.0.jar:gobblin/writer/MultiWriterWatermarkTracker.class */
public class MultiWriterWatermarkTracker implements WatermarkTracker {
    private final ConcurrentHashMap<String, Set<CheckpointableWatermark>> candidateCommittables = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, Set<CheckpointableWatermark>> unacknowledgedWatermarks = new ConcurrentHashMap<>();

    @Override // gobblin.writer.WatermarkTracker
    public synchronized void reset() {
        this.candidateCommittables.clear();
        this.unacknowledgedWatermarks.clear();
    }

    private synchronized Set<CheckpointableWatermark> getOrCreate(Map<String, Set<CheckpointableWatermark>> map, String str) {
        if (map.containsKey(str)) {
            return map.get(str);
        }
        TreeSet treeSet = new TreeSet();
        map.put(str, treeSet);
        return treeSet;
    }

    @Override // gobblin.writer.WatermarkTracker
    public void committedWatermarks(Map<String, CheckpointableWatermark> map) {
        committedWatermarks(map.values());
    }

    public void committedWatermarks(Iterable<CheckpointableWatermark> iterable) {
        Iterator<CheckpointableWatermark> it = iterable.iterator();
        while (it.hasNext()) {
            committedWatermark(it.next());
        }
    }

    @Override // gobblin.writer.WatermarkTracker
    public void committedWatermark(CheckpointableWatermark checkpointableWatermark) {
        getOrCreate(this.candidateCommittables, checkpointableWatermark.getSource()).add(checkpointableWatermark);
    }

    @Override // gobblin.writer.WatermarkTracker
    public void unacknowledgedWatermark(CheckpointableWatermark checkpointableWatermark) {
        getOrCreate(this.unacknowledgedWatermarks, checkpointableWatermark.getSource()).add(checkpointableWatermark);
    }

    @Override // gobblin.writer.WatermarkTracker
    public void unacknowledgedWatermarks(Map<String, CheckpointableWatermark> map) {
        Iterator<CheckpointableWatermark> it = map.values().iterator();
        while (it.hasNext()) {
            unacknowledgedWatermark(it.next());
        }
    }

    @Override // gobblin.writer.WatermarkTracker
    public Map<String, CheckpointableWatermark> getAllCommitableWatermarks() {
        HashMap hashMap = new HashMap(this.candidateCommittables.size());
        Iterator it = this.candidateCommittables.keySet().iterator();
        while (it.hasNext()) {
            Optional<CheckpointableWatermark> committableWatermark = getCommittableWatermark((String) it.next());
            if (committableWatermark.isPresent()) {
                hashMap.put(committableWatermark.get().getSource(), committableWatermark.get());
            }
        }
        return hashMap;
    }

    @Override // gobblin.writer.WatermarkTracker
    public Map<String, CheckpointableWatermark> getAllUnacknowledgedWatermarks() {
        HashMap hashMap = new HashMap(this.unacknowledgedWatermarks.size());
        Iterator it = this.unacknowledgedWatermarks.keySet().iterator();
        while (it.hasNext()) {
            Optional<CheckpointableWatermark> unacknowledgedWatermark = getUnacknowledgedWatermark((String) it.next());
            if (unacknowledgedWatermark.isPresent()) {
                hashMap.put(unacknowledgedWatermark.get().getSource(), unacknowledgedWatermark.get());
            }
        }
        return hashMap;
    }

    public Optional<CheckpointableWatermark> getCommittableWatermark(String str) {
        Set<CheckpointableWatermark> set = this.unacknowledgedWatermarks.get(str);
        CheckpointableWatermark next = (set == null || set.isEmpty()) ? null : set.iterator().next();
        CheckpointableWatermark checkpointableWatermark = null;
        for (CheckpointableWatermark checkpointableWatermark2 : this.candidateCommittables.get(str)) {
            if (next == null || checkpointableWatermark2.compareTo(next) < 0) {
                checkpointableWatermark = checkpointableWatermark2;
            }
        }
        return checkpointableWatermark == null ? Optional.absent() : Optional.of(checkpointableWatermark);
    }

    public Optional<CheckpointableWatermark> getUnacknowledgedWatermark(String str) {
        Set<CheckpointableWatermark> set = this.unacknowledgedWatermarks.get(str);
        return set.isEmpty() ? Optional.absent() : Optional.of(set.iterator().next());
    }
}
