/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming.async;

import java.io.IOException;
import java.net.InetSocketAddress;
import net.nmoncho.shaded.com.google.common.annotations.VisibleForTesting;
import net.nmoncho.shaded.io.netty.channel.Channel;
import net.nmoncho.shaded.io.netty.channel.ChannelHandler;
import net.nmoncho.shaded.io.netty.channel.ChannelPipeline;
import net.nmoncho.shaded.io.netty.channel.EventLoop;
import net.nmoncho.shaded.io.netty.util.concurrent.Future;
import org.apache.cassandra.locator.InetAddressAndPort;
import org.apache.cassandra.net.ConnectionCategory;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.net.OutboundConnectionInitiator;
import org.apache.cassandra.net.OutboundConnectionSettings;
import org.apache.cassandra.streaming.StreamingChannel;
import org.apache.cassandra.streaming.async.NettyStreamingChannel;

public class NettyStreamingConnectionFactory
implements StreamingChannel.Factory {
    @VisibleForTesting
    public static int MAX_CONNECT_ATTEMPTS = 3;

    public static NettyStreamingChannel connect(OutboundConnectionSettings template, int messagingVersion, StreamingChannel.Kind kind) throws IOException {
        Future<OutboundConnectionInitiator.Result<OutboundConnectionInitiator.Result.StreamingSuccess>> result;
        EventLoop eventLoop = MessagingService.instance().socketFactory.outboundStreamingGroup().next();
        int attempts = 0;
        do {
            result = OutboundConnectionInitiator.initiateStreaming(eventLoop, template.withDefaults(ConnectionCategory.STREAMING), messagingVersion);
            result.awaitUninterruptibly();
            if (!result.isSuccess()) continue;
            Channel channel = ((OutboundConnectionInitiator.Result.StreamingSuccess)((OutboundConnectionInitiator.Result)result.getNow()).success()).channel;
            NettyStreamingChannel streamingChannel = new NettyStreamingChannel(messagingVersion, channel, kind);
            if (kind == StreamingChannel.Kind.CONTROL) {
                ChannelPipeline pipeline = channel.pipeline();
                pipeline.addLast("stream", (ChannelHandler)streamingChannel);
            }
            return streamingChannel;
        } while (++attempts != MAX_CONNECT_ATTEMPTS);
        throw new IOException("failed to connect to " + template.to + " for streaming data", result.cause());
    }

    @Override
    public StreamingChannel create(InetSocketAddress to, int messagingVersion, StreamingChannel.Kind kind) throws IOException {
        return NettyStreamingConnectionFactory.connect(new OutboundConnectionSettings(InetAddressAndPort.getByAddress(to)), messagingVersion, kind);
    }

    @Override
    public StreamingChannel create(InetSocketAddress to, InetSocketAddress preferred, int messagingVersion, StreamingChannel.Kind kind) throws IOException {
        return NettyStreamingConnectionFactory.connect(new OutboundConnectionSettings(InetAddressAndPort.getByAddress(to), InetAddressAndPort.getByAddress(preferred)), messagingVersion, kind);
    }
}

