package com.datatorrent.stram.stream;

import com.datatorrent.api.StreamCodec;
import com.datatorrent.bufferserver.client.Publisher;
import com.datatorrent.bufferserver.packet.BeginWindowTuple;
import com.datatorrent.bufferserver.packet.DataTuple;
import com.datatorrent.bufferserver.packet.EndStreamTuple;
import com.datatorrent.bufferserver.packet.EndWindowTuple;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.bufferserver.packet.WindowIdTuple;
import com.datatorrent.bufferserver.util.Codec;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.stram.codec.StatefulStreamCodec;
import com.datatorrent.stram.engine.ByteCounterStream;
import com.datatorrent.stram.engine.StreamContext;
import com.datatorrent.stram.tuple.ResetWindowTuple;
import com.datatorrent.stram.tuple.Tuple;
import com.datatorrent.stram.webapp.asm.MethodSignatureVisitor;
import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/stram/stream/BufferServerPublisher.class */
public class BufferServerPublisher extends Publisher implements ByteCounterStream {
    private StreamCodec<Object> serde;
    private final AtomicLong publishedByteCount;
    private EventLoop eventloop;
    private int count;
    private StatefulStreamCodec<Object> statefulSerde;
    private static final Logger logger = LoggerFactory.getLogger(BufferServerPublisher.class);

    /* renamed from: com.datatorrent.stram.stream.BufferServerPublisher$1, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/stram/stream/BufferServerPublisher$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$bufferserver$packet$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.CHECKPOINT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.BEGIN_WINDOW.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.END_WINDOW.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.END_STREAM.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.RESET_WINDOW.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
        }
    }

    public BufferServerPublisher(String str, int i) {
        super(str, i);
        this.publishedByteCount = new AtomicLong(0L);
    }

    public void put(Object obj) {
        byte[] serializedTuple;
        this.count++;
        if (obj instanceof Tuple) {
            Tuple tuple = (Tuple) obj;
            switch (AnonymousClass1.$SwitchMap$com$datatorrent$bufferserver$packet$MessageType[tuple.getType().ordinal()]) {
                case MethodSignatureVisitor.VISIT_PARAM /* 1 */:
                    if (this.statefulSerde != null) {
                        this.statefulSerde.resetState();
                    }
                    serializedTuple = WindowIdTuple.getSerializedTuple((int) tuple.getWindowId());
                    serializedTuple[0] = 10;
                    break;
                case MethodSignatureVisitor.VISIT_RETURN /* 2 */:
                    serializedTuple = BeginWindowTuple.getSerializedTuple((int) tuple.getWindowId());
                    break;
                case MethodSignatureVisitor.VISIT_EXCEPTION /* 3 */:
                    serializedTuple = EndWindowTuple.getSerializedTuple((int) tuple.getWindowId());
                    break;
                case 4:
                    serializedTuple = EndStreamTuple.getSerializedTuple((int) tuple.getWindowId());
                    break;
                case 5:
                    ResetWindowTuple resetWindowTuple = (ResetWindowTuple) tuple;
                    serializedTuple = com.datatorrent.bufferserver.packet.ResetWindowTuple.getSerializedTuple(resetWindowTuple.getBaseSeconds(), resetWindowTuple.getIntervalMillis());
                    break;
                default:
                    throw new UnsupportedOperationException("this data type is not handled in the stream");
            }
        } else if (this.statefulSerde == null) {
            serializedTuple = PayloadTuple.getSerializedTuple(this.serde.getPartition(obj), this.serde.toByteArray(obj));
        } else {
            StatefulStreamCodec.DataStatePair dataStatePair = this.statefulSerde.toDataStatePair(obj);
            if (dataStatePair.state != null) {
                byte[] serializedTuple2 = DataTuple.getSerializedTuple((byte) 11, dataStatePair.state);
                while (!write(serializedTuple2)) {
                    try {
                        Thread.sleep(5L);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                }
            }
            serializedTuple = PayloadTuple.getSerializedTuple(this.statefulSerde.getPartition(obj), dataStatePair.data);
        }
        while (!write(serializedTuple)) {
            try {
                Thread.sleep(5L);
            } catch (InterruptedException e2) {
                throw new RuntimeException(e2);
            }
        }
        this.publishedByteCount.addAndGet(serializedTuple.length);
    }

    public void activate(StreamContext streamContext) {
        setToken((byte[]) streamContext.get(StreamContext.BUFFER_SERVER_TOKEN));
        InetSocketAddress bufferServerAddress = streamContext.getBufferServerAddress();
        this.eventloop = (EventLoop) streamContext.get(StreamContext.EVENT_LOOP);
        this.eventloop.connect(bufferServerAddress.isUnresolved() ? new InetSocketAddress(bufferServerAddress.getHostName(), bufferServerAddress.getPort()) : bufferServerAddress, this);
        logger.debug("Registering publisher: {} {} windowId={} server={}", new Object[]{streamContext.getSourceId(), streamContext.getId(), Codec.getStringWindowId(streamContext.getFinishedWindowId()), streamContext.getBufferServerAddress()});
        super.activate((String) null, streamContext.getFinishedWindowId());
    }

    public void deactivate() {
        setToken(null);
        this.eventloop.disconnect(this);
    }

    public void onMessage(byte[] bArr, int i, int i2) {
        throw new RuntimeException("OutputStream is not supposed to receive anything!");
    }

    public void setup(StreamContext streamContext) {
        StreamCodec<Object> streamCodec = (StreamCodec) streamContext.get(StreamContext.CODEC);
        if (streamCodec == null) {
            this.statefulSerde = ((StatefulStreamCodec) StreamContext.CODEC.defaultValue).newInstance();
        } else if (streamCodec instanceof StatefulStreamCodec) {
            this.statefulSerde = ((StatefulStreamCodec) streamCodec).newInstance();
        } else {
            this.serde = streamCodec;
        }
    }

    public void teardown() {
    }

    @Override // com.datatorrent.stram.engine.ByteCounterStream
    public long getByteCount(boolean z) {
        return z ? this.publishedByteCount.getAndSet(0L) : this.publishedByteCount.get();
    }

    public int getCount(boolean z) {
        try {
            int i = this.count;
            if (z) {
                this.count = 0;
            }
            return i;
        } catch (Throwable th) {
            if (z) {
                this.count = 0;
            }
            throw th;
        }
    }
}
