package com.ibm.streamsx.topology.internal.tester.tcp;

import com.ibm.streams.flow.declare.InputPortDeclaration;
import com.ibm.streams.flow.declare.OperatorGraph;
import com.ibm.streams.flow.declare.OperatorGraphFactory;
import com.ibm.streams.flow.declare.OperatorInvocation;
import com.ibm.streams.flow.declare.OutputPortDeclaration;
import com.ibm.streams.flow.handlers.StreamHandler;
import com.ibm.streams.flow.javaprimitives.JavaOperatorTester;
import com.ibm.streams.flow.javaprimitives.JavaTestableGraph;
import com.ibm.streams.operator.Tuple;
import com.ibm.streams.operator.samples.operators.PassThrough;
import com.ibm.streamsx.topology.TStream;
import com.ibm.streamsx.topology.builder.BOperatorInvocation;
import com.ibm.streamsx.topology.context.StreamsContext;
import com.ibm.streamsx.topology.generator.operator.OpProperties;
import com.ibm.streamsx.topology.internal.streams.InvokeCancel;
import com.ibm.streamsx.topology.internal.tester.ConditionTesterImpl;
import com.ibm.streamsx.topology.internal.tester.TesterRuntime;
import com.ibm.streamsx.topology.internal.tester.conditions.UserCondition;
import com.ibm.streamsx.topology.internal.tester.conditions.handlers.HandlerTesterRuntime;
import com.ibm.streamsx.topology.internal.tester.ops.TesterSink;
import com.ibm.streamsx.topology.tester.Condition;
import java.math.BigInteger;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

/* loaded from: input_file:com/ibm/streamsx/topology/internal/tester/tcp/TCPTesterRuntime.class */
public class TCPTesterRuntime extends HandlerTesterRuntime {
    private OperatorGraph collectorGraph;
    private JavaTestableGraph localCollector;
    private Future<JavaTestableGraph> localRunningCollector;
    private TCPTestServer tcpServer;
    private BOperatorInvocation testerSinkOp;
    private final Map<TStream<?>, StreamTester> testers;
    private final StreamsContext.Type contextType;
    private Map<Integer, TestTupleInjector> injectors;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/ibm/streamsx/topology/internal/tester/tcp/TCPTesterRuntime$StreamTester.class */
    public static class StreamTester {
        final int testerId;
        final InputPortDeclaration input;
        final OutputPortDeclaration output;

        StreamTester(OperatorGraph operatorGraph, int i, TStream<?> tStream) {
            this.testerId = i;
            OperatorInvocation addOperator = operatorGraph.addOperator(PassThrough.class);
            this.input = addOperator.addInput(tStream.output()._type());
            this.output = addOperator.addOutput(tStream.output()._type());
        }
    }

    public TCPTesterRuntime(StreamsContext.Type type, ConditionTesterImpl conditionTesterImpl) {
        super(conditionTesterImpl);
        this.testers = new HashMap();
        this.injectors = Collections.synchronizedMap(new HashMap());
        this.contextType = type;
    }

    @Override // com.ibm.streamsx.topology.internal.tester.conditions.handlers.HandlerTesterRuntime, com.ibm.streamsx.topology.internal.tester.TesterRuntime
    public void finalizeTester(Map<TStream<?>, Set<StreamHandler<Tuple>>> map, Map<TStream<?>, Set<UserCondition<?>>> map2) throws Exception {
        super.finalizeTester(map, map2);
        addTCPServerAndSink();
        this.collectorGraph = OperatorGraphFactory.newGraph();
        for (TStream<?> tStream : this.handlers.keySet()) {
            if (tStream != null) {
                this.testers.put(tStream, new StreamTester(this.collectorGraph, connectToTesterSink(tStream), tStream));
            }
        }
        this.localCollector = new JavaOperatorTester().executable(this.collectorGraph);
        setupTestHandlers();
    }

    @Override // com.ibm.streamsx.topology.internal.tester.TesterRuntime
    public void start(Object obj) {
        if (!$assertionsDisabled && this.localCollector == null) {
            throw new AssertionError();
        }
        this.localRunningCollector = this.localCollector.execute();
    }

    private void addTCPServerAndSink() throws Exception {
        this.tcpServer = new TCPTestServer(0, this.contextType == StreamsContext.Type.STANDALONE_TESTER, new IoHandlerAdapter() { // from class: com.ibm.streamsx.topology.internal.tester.tcp.TCPTesterRuntime.1
            public void messageReceived(IoSession ioSession, Object obj) throws Exception {
                TestTuple testTuple = (TestTuple) obj;
                ((TestTupleInjector) TCPTesterRuntime.this.injectors.get(testTuple.getTesterId())).tuple(testTuple.getTupleData());
            }
        });
        addTesterSink(this.tcpServer.start());
    }

    @Override // com.ibm.streamsx.topology.internal.tester.TesterRuntime
    public void shutdown(Future<?> future, TesterRuntime.TestState testState) throws Exception {
        try {
            if (this.contextType == StreamsContext.Type.DISTRIBUTED_TESTER) {
                new InvokeCancel((BigInteger) future.get()).invoke(false);
            }
            try {
                this.tcpServer.shutdown();
                try {
                    this.localRunningCollector.cancel(true);
                    future.cancel(true);
                } finally {
                }
            } catch (Throwable th) {
                try {
                    this.localRunningCollector.cancel(true);
                    future.cancel(true);
                    throw th;
                } finally {
                }
            }
        } catch (Throwable th2) {
            try {
                this.tcpServer.shutdown();
                try {
                    this.localRunningCollector.cancel(true);
                    future.cancel(true);
                    throw th2;
                } finally {
                    future.cancel(true);
                }
            } catch (Throwable th3) {
                try {
                    this.localRunningCollector.cancel(true);
                    future.cancel(true);
                    throw th3;
                } finally {
                    future.cancel(true);
                }
            }
        }
    }

    private void addTesterSink(InetSocketAddress inetSocketAddress) {
        HashMap hashMap = new HashMap();
        hashMap.put("host", inetSocketAddress.getHostString());
        hashMap.put("port", Integer.valueOf(inetSocketAddress.getPort()));
        this.testerSinkOp = topology().builder().addOperator("TesterTCP" + inetSocketAddress.getPort(), TesterSink.KIND, hashMap);
        this.testerSinkOp.setModel("spl", OpProperties.LANGUAGE_JAVA);
    }

    private int connectToTesterSink(TStream<?> tStream) {
        return tStream.connectTo(this.testerSinkOp, true, null).index();
    }

    private void setupTestHandlers() throws Exception {
        for (TStream<?> tStream : this.handlers.keySet()) {
            if (tStream != null) {
                Set<StreamHandler<Tuple>> set = this.handlers.get(tStream);
                StreamTester streamTester = this.testers.get(tStream);
                this.injectors.put(Integer.valueOf(streamTester.testerId), new TestTupleInjector(this.localCollector.getInputTester(streamTester.input)));
                Iterator<StreamHandler<Tuple>> it = set.iterator();
                while (it.hasNext()) {
                    this.localCollector.registerStreamHandler(streamTester.output, it.next());
                }
            }
        }
    }

    @Override // com.ibm.streamsx.topology.internal.tester.TesterRuntime
    public TesterRuntime.TestState checkTestState(StreamsContext<?> streamsContext, Map<String, Object> map, Future<?> future, Condition<?> condition) throws Exception {
        TesterRuntime.TestState testStateFromConditions;
        if (streamsContext.getType() == StreamsContext.Type.STANDALONE_TESTER) {
            testStateFromConditions = checkStandaloneTestState(future, condition);
        } else {
            TimeUnit.SECONDS.sleep(1L);
            testStateFromConditions = testStateFromConditions(false, true);
        }
        return testStateFromConditions;
    }

    private TesterRuntime.TestState checkStandaloneTestState(Future<?> future, Condition<?> condition) throws Exception {
        try {
            return ((Integer) future.get(1L, TimeUnit.SECONDS)).intValue() != 0 ? TesterRuntime.TestState.FAIL : testStateFromConditions(true, true);
        } catch (TimeoutException e) {
            return condition.valid() ? testStateFromConditions(true, true) : TesterRuntime.TestState.NO_PROGRESS;
        }
    }

    static {
        $assertionsDisabled = !TCPTesterRuntime.class.desiredAssertionStatus();
    }
}
