/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.fetcher;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.geaflow.cluster.fetcher.IInputMessageBuffer;
import org.apache.geaflow.cluster.protocol.InputMessage;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.shuffle.message.PipelineBarrier;
import org.apache.geaflow.shuffle.message.PipelineMessage;
import org.apache.geaflow.shuffle.message.SliceId;
import org.apache.geaflow.shuffle.pipeline.buffer.OutBuffer;
import org.apache.geaflow.shuffle.pipeline.buffer.PipeBuffer;
import org.apache.geaflow.shuffle.pipeline.slice.IPipelineSlice;
import org.apache.geaflow.shuffle.pipeline.slice.SliceManager;
import org.apache.geaflow.shuffle.pipeline.slice.SpillablePipelineSlice;
import org.apache.geaflow.shuffle.serialize.AbstractMessageIterator;
import org.apache.geaflow.shuffle.service.ShuffleManager;

public class PrefetchMessageBuffer<T>
implements IInputMessageBuffer<T> {
    private final CountDownLatch latch = new CountDownLatch(1);
    private final SpillablePipelineSlice slice;
    private final int edgeId;

    public PrefetchMessageBuffer(String logTag, SliceId sliceId) {
        this.slice = new SpillablePipelineSlice(logTag, sliceId);
        this.edgeId = sliceId.getEdgeId();
        SliceManager sliceManager = ShuffleManager.getInstance().getSliceManager();
        sliceManager.register(sliceId, (IPipelineSlice)this.slice);
    }

    public void offer(InputMessage<T> message) {
        throw new UnsupportedOperationException();
    }

    public InputMessage<T> poll(long timeout, TimeUnit unit) {
        throw new UnsupportedOperationException();
    }

    @Override
    public void onMessage(PipelineMessage<T> message) {
        if (message.getEdgeId() != this.edgeId) {
            return;
        }
        AbstractMessageIterator iterator = (AbstractMessageIterator)message.getMessageIterator();
        OutBuffer outBuffer = iterator.getOutBuffer();
        long windowId = message.getRecordArgs().getWindowId();
        this.slice.add(new PipeBuffer(outBuffer, windowId));
    }

    @Override
    public void onBarrier(PipelineBarrier barrier) {
        if (barrier.getEdgeId() != this.edgeId) {
            return;
        }
        this.slice.add(new PipeBuffer(barrier.getWindowId(), (int)barrier.getCount(), true));
        this.slice.flush();
        this.latch.countDown();
    }

    public void waitUtilFinish() {
        try {
            this.latch.await();
        }
        catch (InterruptedException e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }
}

