package org.apache.crunch.util;

import com.google.common.collect.Lists;
import java.util.Iterator;
import java.util.LinkedList;
import org.apache.crunch.DoFn;
import org.apache.crunch.Emitter;

/* loaded from: input_file:lib/crunch-core-0.12.0-hadoop2.jar:org/apache/crunch/util/DoFnIterator.class */
public class DoFnIterator<S, T> implements Iterator<T> {
    private final Iterator<S> iter;
    private final DoFn<S, T> fn;
    private CacheEmitter<T> cache = new CacheEmitter<>();
    private boolean cleanup = false;

    /* loaded from: input_file:lib/crunch-core-0.12.0-hadoop2.jar:org/apache/crunch/util/DoFnIterator$CacheEmitter.class */
    private static class CacheEmitter<T> implements Emitter<T> {
        private final LinkedList<T> cache;

        private CacheEmitter() {
            this.cache = Lists.newLinkedList();
        }

        public synchronized boolean isEmpty() {
            return this.cache.isEmpty();
        }

        public synchronized T poll() {
            return this.cache.poll();
        }

        @Override // org.apache.crunch.Emitter
        public synchronized void emit(T t) {
            this.cache.add(t);
        }

        @Override // org.apache.crunch.Emitter
        public void flush() {
        }
    }

    public DoFnIterator(Iterator<S> it, DoFn<S, T> doFn) {
        this.iter = it;
        this.fn = doFn;
    }

    @Override // java.util.Iterator
    public boolean hasNext() {
        while (this.cache.isEmpty() && this.iter.hasNext()) {
            this.fn.process(this.iter.next(), this.cache);
        }
        if (this.cache.isEmpty() && !this.cleanup) {
            this.fn.cleanup(this.cache);
            this.cleanup = true;
        }
        return !this.cache.isEmpty();
    }

    @Override // java.util.Iterator
    public T next() {
        return this.cache.poll();
    }

    @Override // java.util.Iterator
    public void remove() {
        throw new UnsupportedOperationException();
    }
}
