package com.addthis.meshy.service.stream;

import com.addthis.basis.util.Bytes;
import com.addthis.basis.util.Parameter;
import com.addthis.meshy.ChannelMaster;
import com.addthis.meshy.ChannelState;
import com.addthis.meshy.Meshy;
import com.addthis.meshy.MeshyConstants;
import com.addthis.meshy.SourceHandler;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.jboss.netty.buffer.ChannelBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/addthis/meshy/service/stream/StreamSource.class */
public class StreamSource extends SourceHandler {
    private static final int DEFAULT_MAX_SEND = (Parameter.intValue("meshy.stream.buffer", 1) * 1024) * 1024;
    private static final boolean FETCH_ON_OPEN = Parameter.boolValue("meshy.stream.prefetch", false);
    protected static final Logger log = LoggerFactory.getLogger(StreamSource.class);
    private final SourceInputStream stream;
    private final String fileName;
    private final String nodeUuid;
    private int moreRequests;
    private int recvBytes;

    public StreamSource(ChannelMaster channelMaster, String str, String str2, int i) throws IOException {
        this(channelMaster, MeshyConstants.LINK_ALL, str, str2, null, i);
    }

    public StreamSource(ChannelMaster channelMaster, String str, String str2, Map<String, String> map, int i) throws IOException {
        this(channelMaster, MeshyConstants.LINK_ALL, str, str2, map, i);
    }

    public StreamSource(ChannelMaster channelMaster, String str, String str2, String str3, Map<String, String> map, int i) throws IOException {
        super(channelMaster, StreamTarget.class, str);
        this.moreRequests = 0;
        this.recvBytes = 0;
        this.fileName = str3;
        this.nodeUuid = str2;
        i = i <= 0 ? DEFAULT_MAX_SEND : i;
        if (str == MeshyConstants.LINK_ALL || str == str2) {
            this.stream = new SourceInputStream(this, i);
        } else {
            this.stream = null;
        }
        log.debug("{} new stream={} file={} sendBuffer={} prefetch={}", new Object[]{this, this.stream, str3, Integer.valueOf(i), Boolean.valueOf(FETCH_ON_OPEN)});
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            byteArrayOutputStream.write(map == null ? 0 : 4);
            Bytes.writeString(str2, byteArrayOutputStream);
            Bytes.writeString(str3, byteArrayOutputStream);
            Bytes.writeInt(i, byteArrayOutputStream);
            if (map != null) {
                Bytes.writeInt(map.size(), byteArrayOutputStream);
                for (Map.Entry<String, String> entry : map.entrySet()) {
                    Bytes.writeString(entry.getKey(), byteArrayOutputStream);
                    Bytes.writeString(entry.getValue(), byteArrayOutputStream);
                }
            }
            if (!send(byteArrayOutputStream.toByteArray())) {
                log.warn("Failed to send stream init data for {}", this);
            }
            if (this.stream == null || !FETCH_ON_OPEN) {
                return;
            }
            this.stream.requestMoreData();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override // com.addthis.meshy.SourceHandler
    public String toString() {
        return super.toString() + ";fileName=" + this.fileName + ";nodeUuid=" + this.nodeUuid + ";more=" + this.moreRequests + ";recv=" + this.recvBytes + ";stream=" + (this.stream != null ? this.stream.toStatus() : "null");
    }

    public SourceInputStream getInputStream() {
        log.trace("{} get input stream", this);
        return this.stream;
    }

    public void requestMoreData() {
        log.trace("{} send request more", this);
        send(new byte[]{1});
        this.moreRequests++;
    }

    public void requestClose() {
        log.trace("{} send close", this);
        send(new byte[]{3});
        sendComplete();
    }

    @Override // com.addthis.meshy.SourceHandler
    public void receive(ChannelState channelState, int i, ChannelBuffer channelBuffer) throws Exception {
        ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(Meshy.getBytes(i, channelBuffer));
        int read = byteArrayInputStream.read();
        log.trace("{} recv mode={} len={}", new Object[]{this, Integer.valueOf(read), Integer.valueOf(i)});
        switch (read) {
            case StreamService.STREAM_BYTE_OVERHEAD /* 1 */:
                byte[] readBytes = Bytes.readBytes(byteArrayInputStream, byteArrayInputStream.available());
                this.recvBytes += readBytes.length;
                this.recvBytes += 13;
                this.stream.feed(readBytes);
                return;
            case 2:
                this.stream.feed(StreamService.FAIL_BYTES);
                this.stream.feed(Bytes.readFully(byteArrayInputStream));
                return;
            case 3:
                this.stream.feed(StreamService.CLOSE_BYTES);
                return;
            default:
                log.warn("source unknown mode: {}", Integer.valueOf(read));
                return;
        }
    }

    @Override // com.addthis.meshy.SourceHandler
    public void receiveComplete() throws Exception {
        log.trace("{} recv send complete", this);
        sendComplete();
    }

    @Override // com.addthis.meshy.SourceHandler
    public void channelClosed(ChannelState channelState) {
        if (this.stream != null) {
            this.stream.feed(StreamService.FAIL_BYTES);
            this.stream.feed(Bytes.toBytes(StreamService.ERROR_CHANNEL_LOST));
        }
    }
}
