package com.datatorrent.stram.engine;

import com.datatorrent.api.Sink;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.stram.tuple.EndStreamTuple;
import com.datatorrent.stram.tuple.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/engine/WindowIdActivatedReservoir.class */
public class WindowIdActivatedReservoir implements SweepableReservoir {
    private Sink<Object> sink;
    private final String identifier;
    private final SweepableReservoir reservoir;
    private final long windowId;
    EndStreamTuple est;
    private static final Logger logger = LoggerFactory.getLogger(WindowIdActivatedReservoir.class);

    public WindowIdActivatedReservoir(String str, SweepableReservoir sweepableReservoir, long j) {
        this.identifier = str;
        this.reservoir = sweepableReservoir;
        this.windowId = j;
        sweepableReservoir.setSink(Sink.BLACKHOLE);
    }

    @Override // com.datatorrent.stram.engine.Reservoir
    public int size() {
        return this.reservoir.size();
    }

    @Override // com.datatorrent.stram.engine.Reservoir
    public Object remove() {
        if (this.est == null) {
            return this.reservoir.remove();
        }
        try {
            EndStreamTuple endStreamTuple = this.est;
            this.est = null;
            return endStreamTuple;
        } catch (Throwable th) {
            this.est = null;
            throw th;
        }
    }

    @Override // com.datatorrent.stram.engine.SweepableReservoir
    public Sink<Object> setSink(Sink<Object> sink) {
        try {
            Sink<Object> sink2 = this.sink;
            this.sink = sink;
            return sink2;
        } catch (Throwable th) {
            this.sink = sink;
            throw th;
        }
    }

    @Override // com.datatorrent.stram.engine.SweepableReservoir
    public Tuple sweep() {
        while (true) {
            Tuple sweep = this.reservoir.sweep();
            if (sweep == null) {
                return null;
            }
            if (sweep.getType() == MessageType.BEGIN_WINDOW && sweep.getWindowId() > this.windowId) {
                this.reservoir.setSink(this.sink);
                EndStreamTuple endStreamTuple = new EndStreamTuple(this.windowId);
                this.est = endStreamTuple;
                return endStreamTuple;
            }
            this.reservoir.remove();
        }
    }

    @Override // com.datatorrent.stram.engine.SweepableReservoir
    public int getCount(boolean z) {
        return 0;
    }

    public String toString() {
        return "WindowIdActivatedReservoir{identifier=" + this.identifier + ", windowId=" + this.windowId + '}';
    }
}
