package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/BucketStreamSortOperator.class */
public class BucketStreamSortOperator<T> extends AbstractStreamOperator<T> implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1;
    private long granularity;
    private transient Map<Long, List<StreamRecord<T>>> buckets;

    public BucketStreamSortOperator(long j) {
        this.granularity = j;
    }

    @Override // org.apache.flink.streaming.api.operators.AbstractStreamOperator, org.apache.flink.streaming.api.operators.StreamOperator
    public void open() throws Exception {
        super.open();
        this.buckets = new HashMap();
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processElement(StreamRecord<T> streamRecord) throws Exception {
        long timestamp = streamRecord.getTimestamp() - (streamRecord.getTimestamp() % this.granularity);
        List<StreamRecord<T>> list = this.buckets.get(Long.valueOf(timestamp));
        if (list == null) {
            list = new ArrayList();
            this.buckets.put(Long.valueOf(timestamp), list);
        }
        list.add(streamRecord);
    }

    @Override // org.apache.flink.streaming.api.operators.OneInputStreamOperator
    public void processWatermark(Watermark watermark) throws Exception {
        long timestamp = watermark.getTimestamp() - (watermark.getTimestamp() % this.granularity);
        HashSet hashSet = new HashSet();
        for (Map.Entry<Long, List<StreamRecord<T>>> entry : this.buckets.entrySet()) {
            if (entry.getKey().longValue() < timestamp) {
                Collections.sort(entry.getValue(), new Comparator<StreamRecord<T>>() { // from class: org.apache.flink.streaming.runtime.operators.BucketStreamSortOperator.1
                    @Override // java.util.Comparator
                    public int compare(StreamRecord<T> streamRecord, StreamRecord<T> streamRecord2) {
                        return (int) (streamRecord.getTimestamp() - streamRecord2.getTimestamp());
                    }
                });
                Iterator<StreamRecord<T>> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    this.output.collect(it.next());
                }
                hashSet.add(entry.getKey());
            }
        }
        Iterator it2 = hashSet.iterator();
        while (it2.hasNext()) {
            this.buckets.remove((Long) it2.next());
        }
        this.output.emitWatermark(watermark);
    }
}
