package org.apache.storm.sql.runtime.datasource.socket;

import java.net.URI;
import java.util.List;
import java.util.Properties;
import org.apache.storm.spout.Scheme;
import org.apache.storm.sql.runtime.DataSourcesProvider;
import org.apache.storm.sql.runtime.FieldInfo;
import org.apache.storm.sql.runtime.IOutputSerializer;
import org.apache.storm.sql.runtime.ISqlStreamsDataSource;
import org.apache.storm.sql.runtime.datasource.socket.bolt.SocketBolt;
import org.apache.storm.sql.runtime.datasource.socket.spout.SocketSpout;
import org.apache.storm.sql.runtime.utils.FieldInfoUtils;
import org.apache.storm.sql.runtime.utils.SerdeUtils;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.IRichSpout;

/* loaded from: input_file:org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider.class */
public class SocketDataSourcesProvider implements DataSourcesProvider {

    /* loaded from: input_file:org/apache/storm/sql/runtime/datasource/socket/SocketDataSourcesProvider$SocketStreamsDataSource.class */
    private static class SocketStreamsDataSource implements ISqlStreamsDataSource {
        private final String host;
        private final int port;
        private final Scheme scheme;
        private final IOutputSerializer serializer;

        SocketStreamsDataSource(String str, int i, Scheme scheme, IOutputSerializer iOutputSerializer) {
            this.host = str;
            this.port = i;
            this.scheme = scheme;
            this.serializer = iOutputSerializer;
        }

        @Override // org.apache.storm.sql.runtime.ISqlStreamsDataSource
        public IRichSpout getProducer() {
            return new SocketSpout(this.scheme, this.host, this.port);
        }

        @Override // org.apache.storm.sql.runtime.ISqlStreamsDataSource
        public IRichBolt getConsumer() {
            return new SocketBolt(this.serializer, this.host, this.port);
        }
    }

    @Override // org.apache.storm.sql.runtime.DataSourcesProvider
    public String scheme() {
        return "socket";
    }

    @Override // org.apache.storm.sql.runtime.DataSourcesProvider
    public ISqlStreamsDataSource constructStreams(URI uri, String str, String str2, Properties properties, List<FieldInfo> list) {
        String host = uri.getHost();
        int port = uri.getPort();
        if (port == -1) {
            throw new RuntimeException("Port information is not available. URI: " + uri);
        }
        List<String> fieldNames = FieldInfoUtils.getFieldNames(list);
        return new SocketStreamsDataSource(host, port, SerdeUtils.getScheme(str, properties, fieldNames), SerdeUtils.getSerializer(str2, properties, fieldNames));
    }
}
