package com.ibm.streamsx.topology.internal.tester.ops;

import com.ibm.streams.operator.AbstractOperator;
import com.ibm.streams.operator.OperatorContext;
import com.ibm.streams.operator.StreamingData;
import com.ibm.streams.operator.StreamingInput;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.encoding.BinaryEncoding;
import com.ibm.streams.operator.internal.model.MethodParameters;
import com.ibm.streams.operator.internal.model.ShadowClass;
import com.ibm.streams.operator.model.InputPortSet;
import com.ibm.streams.operator.model.Libraries;
import com.ibm.streams.operator.model.Parameter;
import com.ibm.streams.operator.model.PrimitiveOperator;
import com.ibm.streams.operator.samples.patterns.TupleConsumer;
import com.ibm.streamsx.topology.internal.tester.tcp.TCPTestClient;
import com.ibm.streamsx.topology.internal.tester.tcp.TestTuple;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Queue;
import org.apache.mina.core.future.WriteFuture;

@InputPortSet
@PrimitiveOperator
@Libraries({"opt/apache-mina-2.0.2/dist/*"})
/* loaded from: input_file:com/ibm/streamsx/topology/internal/tester/ops/TesterSink.class */
public class TesterSink extends TupleConsumer {
    public static final String KIND = "com.ibm.streamsx.topology.testing::TesterSink";
    private String host;
    private int port;
    private BinaryEncoding[] encoders;
    private TCPTestClient[] clients;

    @ShadowClass("com.ibm.streamsx.topology.internal.tester.ops.TesterSink")
    @InputPortSet
    @Libraries({"opt/apache-mina-2.0.2/dist/*"})
    @PrimitiveOperator(namespace = "com.ibm.streamsx.topology.testing")
    /* loaded from: input_file:com/ibm/streamsx/topology/internal/tester/ops/TesterSink$StreamsModel.class */
    public class StreamsModel extends AbstractOperator {
        @MethodParameters({"batchSize"})
        @Parameter(optional = true, description = "Set the batch size.")
        public void setBatchSize(int i) {
        }

        @MethodParameters({"port"})
        @Parameter
        public void setPort(int i) {
        }

        @MethodParameters({"host"})
        @Parameter
        public void setHost(String str) {
        }
    }

    public void initialize(OperatorContext operatorContext) throws Exception {
        super.initialize(operatorContext);
        setBatchSize(1);
        setPreserveOrder(true);
        InetSocketAddress inetSocketAddress = new InetSocketAddress(getHost(), getPort());
        this.clients = new TCPTestClient[operatorContext.getNumberOfStreamingInputs()];
        this.encoders = new BinaryEncoding[operatorContext.getNumberOfStreamingInputs()];
        for (StreamingInput streamingInput : operatorContext.getStreamingInputs()) {
            TCPTestClient tCPTestClient = new TCPTestClient(inetSocketAddress);
            tCPTestClient.connect();
            this.clients[streamingInput.getPortNumber()] = tCPTestClient;
            this.encoders[streamingInput.getPortNumber()] = streamingInput.getStreamSchema().newNativeBinaryEncoding();
        }
    }

    protected boolean processBatch(Queue<TupleConsumer.BatchedTuple> queue) throws Exception {
        ArrayList arrayList = new ArrayList(queue.size());
        for (TupleConsumer.BatchedTuple batchedTuple : queue) {
            int portNumber = batchedTuple.getStream().getPortNumber();
            TCPTestClient tCPTestClient = this.clients[portNumber];
            BinaryEncoding binaryEncoding = this.encoders[portNumber];
            byte[] bArr = new byte[(int) binaryEncoding.getEncodedSize(batchedTuple.getTuple())];
            binaryEncoding.encodeTuple(batchedTuple.getTuple(), ByteBuffer.wrap(bArr));
            arrayList.add(tCPTestClient.writeTuple(new TestTuple(Integer.valueOf(portNumber), bArr)));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((WriteFuture) it.next()).await();
        }
        return false;
    }

    public void processPunctuation(StreamingInput<Tuple> streamingInput, StreamingData.Punctuation punctuation) throws Exception {
        super.processPunctuation(streamingInput, punctuation);
        if (punctuation == StreamingData.Punctuation.FINAL_MARKER) {
            int portNumber = streamingInput.getPortNumber();
            this.clients[portNumber].writeTuple(new TestTuple(Integer.valueOf(portNumber), new byte[0])).await();
        }
    }

    public int getPort() {
        return this.port;
    }

    @Parameter
    public void setPort(int i) {
        this.port = i;
    }

    public String getHost() {
        return this.host;
    }

    @Parameter
    public void setHost(String str) {
        this.host = str;
    }

    public void shutdown() throws Exception {
        for (TCPTestClient tCPTestClient : this.clients) {
            tCPTestClient.close();
        }
        super.shutdown();
    }
}
