package org.apache.reef.wake.impl;

import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.reef.wake.EventHandler;

/* loaded from: input_file:org/apache/reef/wake/impl/BlockingEventHandler.class */
public final class BlockingEventHandler<T> implements EventHandler<T> {
    private final int expectedSize;
    private final EventHandler<Iterable<T>> destination;
    private BlockingQueue<T> events = new LinkedBlockingQueue();
    private final AtomicInteger cursor = new AtomicInteger(0);

    public BlockingEventHandler(int i, EventHandler<Iterable<T>> eventHandler) {
        this.expectedSize = i;
        this.destination = eventHandler;
    }

    @Override // org.apache.reef.wake.EventHandler
    public void onNext(T t) {
        this.events.add(t);
        if (this.cursor.incrementAndGet() % this.expectedSize == 0) {
            ArrayList arrayList = new ArrayList(this.expectedSize);
            synchronized (this.events) {
                for (int i = 0; i < this.expectedSize; i++) {
                    arrayList.add(this.events.poll());
                }
            }
            this.destination.onNext(arrayList);
        }
    }
}
