/*
 * Decompiled with CFR 0.152.
 */
package org.apache.geaflow.cluster.collector;

import java.util.List;
import java.util.stream.IntStream;
import org.apache.geaflow.api.context.RuntimeContext;
import org.apache.geaflow.cluster.collector.IOutputMessageBuffer;
import org.apache.geaflow.collector.AbstractCollector;
import org.apache.geaflow.collector.ICollector;
import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.partitioner.IPartitioner;
import org.apache.geaflow.partitioner.impl.KeyPartitioner;
import org.apache.geaflow.selector.ISelector;
import org.apache.geaflow.selector.impl.ChannelSelector;
import org.apache.geaflow.shuffle.ForwardOutputDesc;

public abstract class AbstractPipelineCollector<T>
extends AbstractCollector
implements ICollector<T> {
    protected transient IOutputMessageBuffer<T, ?> outputBuffer;
    protected transient ISelector recordISelector;
    protected ForwardOutputDesc<T> outputDesc;
    protected long windowId;

    public AbstractPipelineCollector(ForwardOutputDesc<T> outputDesc) {
        super(outputDesc.getPartitioner().getOpId());
        this.outputDesc = outputDesc;
    }

    public void setUp(RuntimeContext runtimeContext) {
        super.setUp(runtimeContext);
        List targetTaskIds = this.outputDesc.getTargetTaskIndices();
        IPartitioner partitioner = this.outputDesc.getPartitioner();
        if (partitioner.getPartitionType() == IPartitioner.PartitionType.key) {
            ((KeyPartitioner)partitioner).init(this.outputDesc.getNumPartitions());
        }
        this.recordISelector = new ChannelSelector(targetTaskIds.size(), partitioner);
    }

    public void setOutputBuffer(IOutputMessageBuffer<T, ?> outputBuffer) {
        this.outputBuffer = outputBuffer;
    }

    public long getWindowId() {
        return this.windowId;
    }

    public void setWindowId(long windowId) {
        this.windowId = windowId;
    }

    public void broadcast(T value) {
        List targetTaskIds = this.outputDesc.getTargetTaskIndices();
        int[] channels = IntStream.rangeClosed(0, targetTaskIds.size() - 1).toArray();
        try {
            this.outputBuffer.emit(this.windowId, value, false, channels);
            this.outputMeter.mark();
        }
        catch (Exception e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }

    public void partition(T value) {
        this.shuffle(value, false);
    }

    public <KEY> void partition(KEY key, T value) {
        this.shuffle(key, value, false);
    }

    public void finish() {
        try {
            this.outputBuffer.finish(this.windowId);
        }
        catch (Exception e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }

    protected void shuffle(T value, boolean isRetract) {
        int[] targetChannels = this.recordISelector.selectChannels(value);
        try {
            this.outputBuffer.emit(this.windowId, value, isRetract, targetChannels);
            this.outputMeter.mark();
        }
        catch (Exception e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }

    protected <KEY> void shuffle(KEY key, T value, boolean isRetract) {
        int[] targetChannels = this.recordISelector.selectChannels(key);
        try {
            this.outputBuffer.emit(this.windowId, value, isRetract, targetChannels);
            this.outputMeter.mark();
        }
        catch (Exception e) {
            throw new GeaflowRuntimeException((Throwable)e);
        }
    }
}

