package com.ibm.streamsx.topology.internal.functional.ops;

import com.ibm.streams.flow.handlers.StreamHandler;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.StreamingData;
import com.ibm.streams.operator.StreamingInput;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streamsx.topology.internal.functional.FunctionalHelper;
import com.ibm.streamsx.topology.internal.spljava.SPLMapping;

/* loaded from: input_file:com/ibm/streamsx/topology/internal/functional/ops/FunctionQueueableFunctor.class */
abstract class FunctionQueueableFunctor extends FunctionFunctor implements StreamHandler<Object> {
    private int queueSize;
    private SPLMapping<?> inputMapping;
    private StreamHandler<Object> handler;

    @Override // com.ibm.streamsx.topology.internal.functional.ops.FunctionFunctor
    public synchronized void initialize(OperatorContext operatorContext) throws Exception {
        super.initialize(operatorContext);
        this.inputMapping = FunctionalHelper.getInputMapping(this, 0);
        if (getQueueSize() <= 0 || getInput(0).isConnectedToPEPort()) {
            this.handler = this;
        } else {
            this.handler = new FunctionalQueue(operatorContext, getQueueSize(), this);
        }
    }

    public final void process(StreamingInput<Tuple> streamingInput, Tuple tuple) throws Exception {
        this.handler.tuple(this.inputMapping.convertFrom(tuple));
    }

    @Override // com.ibm.streamsx.topology.internal.functional.ops.FunctionFunctor
    public final void processPunctuation(StreamingInput<Tuple> streamingInput, StreamingData.Punctuation punctuation) throws Exception {
        this.handler.mark(punctuation);
        super.processPunctuation(streamingInput, punctuation);
    }

    public int getQueueSize() {
        return this.queueSize;
    }

    @Parameter(optional = true)
    public void setQueueSize(int i) {
        this.queueSize = i;
    }
}
