package com.datatorrent.bufferserver.server;

import com.datatorrent.bufferserver.internal.DataList;
import com.datatorrent.bufferserver.internal.FastDataList;
import com.datatorrent.bufferserver.internal.LogicalNode;
import com.datatorrent.bufferserver.packet.MessageType;
import com.datatorrent.bufferserver.packet.PayloadTuple;
import com.datatorrent.bufferserver.packet.PublishRequestTuple;
import com.datatorrent.bufferserver.packet.PurgeRequestTuple;
import com.datatorrent.bufferserver.packet.ResetRequestTuple;
import com.datatorrent.bufferserver.packet.SubscribeRequestTuple;
import com.datatorrent.bufferserver.packet.Tuple;
import com.datatorrent.bufferserver.storage.Storage;
import com.datatorrent.common.util.NameableThreadFactory;
import com.datatorrent.netlet.AbstractLengthPrependerClient;
import com.datatorrent.netlet.AbstractServer;
import com.datatorrent.netlet.DefaultEventLoop;
import com.datatorrent.netlet.EventLoop;
import com.datatorrent.netlet.Listener;
import com.datatorrent.netlet.WriteOnlyLengthPrependerClient;
import com.datatorrent.netlet.util.VarInt;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/datatorrent/bufferserver/server/Server.class */
public class Server extends AbstractServer {
    public static final int DEFAULT_BUFFER_SIZE = 67108864;
    public static final int DEFAULT_NUMBER_OF_CACHED_BLOCKS = 8;
    private final int port;
    private String identity;
    private Storage storage;
    private final EventLoop eventloop;
    private final ExecutorService serverHelperExecutor;
    private final ExecutorService storageHelperExecutor;
    private volatile CountDownLatch latch;
    private byte[] authToken;
    private static final boolean BACK_PRESSURE_ENABLED;
    private final ConcurrentHashMap<String, DataList> publisherBuffers;
    private final ConcurrentHashMap<String, LogicalNode> subscriberGroups;
    private final ConcurrentHashMap<String, AbstractLengthPrependerClient> publisherChannels;
    private final int blockSize;
    private final int numberOfCacheBlocks;
    private static final Logger logger;

    /* renamed from: com.datatorrent.bufferserver.server.Server$4, reason: invalid class name */
    /* loaded from: input_file:com/datatorrent/bufferserver/server/Server$4.class */
    static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$com$datatorrent$bufferserver$packet$MessageType = new int[MessageType.values().length];

        static {
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.PUBLISHER_REQUEST.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.SUBSCRIBER_REQUEST.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.PURGE_REQUEST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$com$datatorrent$bufferserver$packet$MessageType[MessageType.RESET_REQUEST.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/bufferserver/server/Server$AuthClient.class */
    class AuthClient extends com.datatorrent.bufferserver.client.AuthClient {
        boolean ignore;

        AuthClient() {
        }

        public void onMessage(byte[] bArr, int i, int i2) {
            if (this.ignore) {
                return;
            }
            authenticateMessage(bArr, i, i2);
            unregistered(this.key);
            UnidentifiedClient unidentifiedClient = new UnidentifiedClient();
            this.key.attach(unidentifiedClient);
            this.key.interestOps(1);
            unidentifiedClient.registered(this.key);
            unidentifiedClient.connected();
            int i3 = (this.writeOffset - this.readOffset) - i2;
            if (i3 > 0) {
                unidentifiedClient.transferBuffer(bArr, this.readOffset + i2, i3);
            }
            this.ignore = true;
        }
    }

    /* loaded from: input_file:com/datatorrent/bufferserver/server/Server$Publisher.class */
    class Publisher extends SeedDataClient {
        private final DataList datalist;
        boolean dirty;
        private volatile boolean torndown;

        Publisher(DataList dataList, long j) {
            super(dataList.getBuffer(j), dataList.getPosition(), 1024);
            this.datalist = dataList;
        }

        public void onMessage(byte[] bArr, int i, int i2) {
            this.dirty = true;
        }

        public boolean resumeReadIfSuspended() {
            Server.this.eventloop.submit(new Runnable() { // from class: com.datatorrent.bufferserver.server.Server.Publisher.1
                @Override // java.lang.Runnable
                public void run() {
                    int interestOps = Publisher.this.key.interestOps();
                    if ((interestOps & 1) == 0) {
                        if (Publisher.this.readExt(0)) {
                            Server.logger.debug("Resuming read on key {} with attachment {}", Publisher.this.key, Publisher.this.key.attachment());
                            Publisher.this.key.interestOps(interestOps | 1);
                        } else {
                            Server.logger.debug("Keeping read on key {} with attachment {} suspended. ", new Object[]{Publisher.this.key, Publisher.this.key.attachment(), Publisher.this.datalist});
                            Publisher.this.datalist.notifyListeners();
                        }
                    }
                }
            });
            return true;
        }

        public void read(int i) {
            readExt(i);
        }

        /* JADX INFO: Access modifiers changed from: private */
        /* JADX WARN: Failed to find 'out' block for switch in B:5:0x001a. Please report as an issue. */
        public boolean readExt(int i) {
            this.writeOffset += i;
            while (true) {
                if (this.size <= 0) {
                    int readSize = readSize();
                    this.size = readSize;
                    switch (readSize) {
                        case -1:
                            if (this.writeOffset == this.buffer.length) {
                                if (this.readOffset <= this.writeOffset - 5) {
                                    return true;
                                }
                                this.dirty = false;
                                this.datalist.flush(this.writeOffset);
                                return switchToNewBufferOrSuspendRead(this.buffer, this.readOffset, this.size + VarInt.getSize(this.size));
                            }
                            if (!this.dirty) {
                                return true;
                            }
                            this.dirty = false;
                            this.datalist.flush(this.writeOffset);
                            return true;
                        case NO_MESSAGE_VALUE:
                    }
                }
                if (this.writeOffset - this.readOffset < this.size) {
                    if (this.writeOffset != this.buffer.length) {
                        if (!this.dirty) {
                            return true;
                        }
                        this.dirty = false;
                        this.datalist.flush(this.writeOffset);
                        return true;
                    }
                    this.dirty = false;
                    this.datalist.flush(this.writeOffset);
                    if (switchToNewBufferOrSuspendRead(this.buffer, this.readOffset - VarInt.getSize(this.size), this.size + VarInt.getSize(this.size))) {
                        this.size = 0;
                        return true;
                    }
                    this.readOffset -= VarInt.getSize(this.size);
                    this.size = 0;
                    return false;
                }
                onMessage(this.buffer, this.readOffset, this.size);
                this.readOffset += this.size;
                this.size = 0;
            }
        }

        private boolean switchToNewBufferOrSuspendRead(byte[] bArr, int i, int i2) {
            if (switchToNewBuffer(bArr, i, i2)) {
                return true;
            }
            this.datalist.suspendRead(this);
            return false;
        }

        private boolean switchToNewBuffer(byte[] bArr, int i, int i2) {
            if (!this.datalist.isMemoryBlockAvailable() && (Server.this.storage != null || this.datalist.areSubscribersBehindByMax())) {
                return false;
            }
            byte[] newBuffer = this.datalist.newBuffer(i2);
            this.byteBuffer = ByteBuffer.wrap(newBuffer);
            if (bArr == null || bArr.length - i == 0) {
                this.writeOffset = 0;
            } else {
                this.writeOffset = bArr.length - i;
                System.arraycopy(this.buffer, i, newBuffer, 0, this.writeOffset);
                this.byteBuffer.position(this.writeOffset);
            }
            this.buffer = newBuffer;
            this.readOffset = 0;
            this.datalist.addBuffer(this.buffer);
            return true;
        }

        public void unregistered(SelectionKey selectionKey) {
            super.unregistered(selectionKey);
            teardown();
        }

        public void handleException(Exception exc, EventLoop eventLoop) {
            teardown();
            if (!(exc instanceof RejectedExecutionException) || !Server.this.serverHelperExecutor.isTerminated()) {
                super.handleException(exc, eventLoop);
            } else {
                Server.logger.warn("Terminated Executor Exception for {}.", this, exc);
                eventLoop.disconnect(this);
            }
        }

        public String toString() {
            return getClass().getName() + '@' + Integer.toHexString(hashCode()) + " {datalist=" + this.datalist + '}';
        }

        private void teardown() {
            if (this.torndown) {
                return;
            }
            this.torndown = true;
            if (Server.this.publisherChannels.containsValue(this)) {
                Iterator it = Server.this.publisherChannels.entrySet().iterator();
                while (true) {
                    if (it.hasNext()) {
                        if (((Map.Entry) it.next()).getValue() == this) {
                            it.remove();
                            break;
                        }
                    } else {
                        break;
                    }
                }
            }
            ArrayList arrayList = new ArrayList();
            String identifier = this.datalist.getIdentifier();
            for (LogicalNode logicalNode : Server.this.subscriberGroups.values()) {
                if (identifier.equals(logicalNode.getUpstream())) {
                    arrayList.add(logicalNode);
                }
            }
            Iterator it2 = arrayList.iterator();
            while (it2.hasNext()) {
                ((LogicalNode) it2.next()).boot();
            }
        }
    }

    /* loaded from: input_file:com/datatorrent/bufferserver/server/Server$SeedDataClient.class */
    abstract class SeedDataClient extends AbstractLengthPrependerClient {
        public SeedDataClient() {
        }

        public SeedDataClient(int i, int i2) {
            super(i, i2);
        }

        public SeedDataClient(byte[] bArr, int i, int i2) {
            super(bArr, i, i2);
        }

        public void transferBuffer(byte[] bArr, int i, int i2) {
            int i3;
            do {
                int length = this.buffer.length - this.writeOffset;
                if (i2 < length) {
                    length = i2;
                    this.byteBuffer.position(this.writeOffset + length);
                } else {
                    this.byteBuffer.position(this.buffer.length);
                }
                System.arraycopy(bArr, i, this.buffer, this.writeOffset, length);
                read(length);
                i += length;
                i3 = i2 - length;
                i2 = i3;
            } while (i3 > 0);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/datatorrent/bufferserver/server/Server$Subscriber.class */
    public class Subscriber extends WriteOnlyLengthPrependerClient {
        private LogicalNode ln;

        Subscriber(LogicalNode logicalNode, int i) {
            super(1048576, i == 0 ? 262144 : i);
            this.ln = logicalNode;
            logicalNode.addConnection(this);
        }

        public void connected() {
            super.connected();
            Server.this.serverHelperExecutor.submit(new Runnable() { // from class: com.datatorrent.bufferserver.server.Server.Subscriber.1
                @Override // java.lang.Runnable
                public void run() {
                    DataList dataList = (DataList) Server.this.publisherBuffers.get(Subscriber.this.ln.getUpstream());
                    if (dataList != null) {
                        dataList.addDataListener(Subscriber.this.ln);
                    } else {
                        Server.logger.error("Disconnecting {} with no matching data list.", this);
                        Subscriber.this.ln.boot();
                    }
                }
            });
        }

        public void unregistered(SelectionKey selectionKey) {
            Server.this.handleSubscriberTeardown(selectionKey);
            super.unregistered(selectionKey);
        }

        public String toString() {
            return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{ln=" + this.ln + "}";
        }
    }

    /* loaded from: input_file:com/datatorrent/bufferserver/server/Server$UnidentifiedClient.class */
    class UnidentifiedClient extends SeedDataClient {
        boolean ignore;

        UnidentifiedClient() {
            super();
        }

        public void onMessage(byte[] bArr, int i, int i2) {
            if (this.ignore) {
                return;
            }
            Tuple tuple = Tuple.getTuple(bArr, i, i2);
            switch (AnonymousClass4.$SwitchMap$com$datatorrent$bufferserver$packet$MessageType[tuple.getType().ordinal()]) {
                case PAYLOAD_VALUE:
                    unregistered(this.key);
                    Server.logger.info("Received publisher request: {}", tuple);
                    PublishRequestTuple publishRequestTuple = (PublishRequestTuple) tuple;
                    DataList handlePublisherRequest = Server.this.handlePublisherRequest(publishRequestTuple, this);
                    handlePublisherRequest.setAutoFlushExecutor(Server.this.serverHelperExecutor);
                    Publisher publisher = publishRequestTuple.getVersion().equals(Tuple.FAST_VERSION) ? new Publisher(handlePublisherRequest, (tuple.getBaseSeconds() << 32) | tuple.getWindowId()) { // from class: com.datatorrent.bufferserver.server.Server.UnidentifiedClient.1
                        {
                            Server server = Server.this;
                        }

                        public int readSize() {
                            if (this.writeOffset - this.readOffset < 2) {
                                return -1;
                            }
                            byte[] bArr2 = this.buffer;
                            int i3 = this.readOffset;
                            this.readOffset = i3 + 1;
                            short s = bArr2[i3];
                            byte[] bArr3 = this.buffer;
                            int i4 = this.readOffset;
                            this.readOffset = i4 + 1;
                            return s | (bArr3[i4] << 8);
                        }
                    } : new Publisher(handlePublisherRequest, (tuple.getBaseSeconds() << 32) | tuple.getWindowId());
                    this.key.attach(publisher);
                    this.key.interestOps(1);
                    publisher.registered(this.key);
                    int i3 = (this.writeOffset - this.readOffset) - i2;
                    if (i3 > 0) {
                        publisher.transferBuffer(this.buffer, this.readOffset + i2, i3);
                    }
                    this.ignore = true;
                    return;
                case RESET_WINDOW_VALUE:
                    unregistered(this.key.interestOps(0));
                    this.ignore = true;
                    Server.logger.info("Received subscriber request: {}", tuple);
                    Server.this.handleSubscriberRequest((SubscribeRequestTuple) tuple, this.key);
                    return;
                case BEGIN_WINDOW_VALUE:
                    Server.logger.info("Received purge request: {}", tuple);
                    try {
                        Server.this.handlePurgeRequest((PurgeRequestTuple) tuple, this);
                        return;
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                case END_WINDOW_VALUE:
                    Server.logger.info("Received reset all request: {}", tuple);
                    try {
                        Server.this.handleResetRequest((ResetRequestTuple) tuple, this);
                        return;
                    } catch (IOException e2) {
                        throw new RuntimeException(e2);
                    }
                default:
                    throw new RuntimeException("unexpected message: " + tuple.toString());
            }
        }
    }

    public Server(EventLoop eventLoop, int i) {
        this(eventLoop, i, DEFAULT_BUFFER_SIZE, 8);
    }

    public Server(EventLoop eventLoop, int i, int i2, int i3) {
        this.publisherBuffers = new ConcurrentHashMap<>(1, 0.75f, 1);
        this.subscriberGroups = new ConcurrentHashMap<>();
        this.publisherChannels = new ConcurrentHashMap<>();
        this.eventloop = eventLoop;
        this.port = i;
        this.blockSize = i2;
        this.numberOfCacheBlocks = i3;
        this.serverHelperExecutor = Executors.newSingleThreadExecutor(new NameableThreadFactory("ServerHelper"));
        this.storageHelperExecutor = new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(i3), new NameableThreadFactory("StorageHelper"), new ThreadPoolExecutor.CallerRunsPolicy());
    }

    public void setSpoolStorage(Storage storage) {
        this.storage = storage;
    }

    public void registered(SelectionKey selectionKey) {
        super.registered(selectionKey);
        logger.info("Server started listening at {}", getServerAddress());
        this.latch.countDown();
        this.latch = null;
    }

    public void unregistered(SelectionKey selectionKey) {
        Iterator<LogicalNode> it = this.subscriberGroups.values().iterator();
        while (it.hasNext()) {
            it.next().boot();
        }
        this.eventloop.submit(new Runnable() { // from class: com.datatorrent.bufferserver.server.Server.1
            @Override // java.lang.Runnable
            public void run() {
                Server.this.serverHelperExecutor.shutdown();
                Server.this.storageHelperExecutor.shutdown();
                try {
                    Server.this.serverHelperExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                    Server.this.storageHelperExecutor.awaitTermination(5000L, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    Server.logger.debug("Executor Termination", e);
                }
                Server.logger.info("Server stopped listening at {}", Server.this.getServerAddress());
                Server.this.latch.countDown();
                Server.this.latch = null;
            }
        });
    }

    public InetSocketAddress run() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.latch = countDownLatch;
        this.eventloop.start((String) null, this.port, this);
        try {
            countDownLatch.await();
            return (InetSocketAddress) getServerAddress();
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    public InetSocketAddress run(long j) {
        if (j < 0) {
            throw new IllegalArgumentException(String.format("Wait time %d can't be negative", Long.valueOf(j)));
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.latch = countDownLatch;
        this.eventloop.start((String) null, this.port, this);
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (countDownLatch.getCount() != 0 && j > 0 && countDownLatch.await(j, TimeUnit.MILLISECONDS)) {
            try {
                j = currentTimeMillis - System.currentTimeMillis();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
        return (InetSocketAddress) getServerAddress();
    }

    public void stop() {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.latch = countDownLatch;
        this.eventloop.stop(this);
        try {
            try {
                countDownLatch.await();
                shutdownExecutors(countDownLatch.getCount() == 0);
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            shutdownExecutors(countDownLatch.getCount() == 0);
            throw th;
        }
    }

    public void stop(long j) {
        if (j < 0) {
            throw new IllegalArgumentException(String.format("Wait time %d can't be negative", Long.valueOf(j)));
        }
        CountDownLatch countDownLatch = new CountDownLatch(1);
        this.latch = countDownLatch;
        this.eventloop.stop(this);
        long currentTimeMillis = System.currentTimeMillis() + j;
        while (countDownLatch.getCount() != 0 && j > 0 && countDownLatch.await(j, TimeUnit.MILLISECONDS)) {
            try {
                try {
                    j = currentTimeMillis - System.currentTimeMillis();
                } catch (InterruptedException e) {
                    throw new RuntimeException(e);
                }
            } finally {
                shutdownExecutors(countDownLatch.getCount() == 0);
            }
        }
    }

    private void shutdownExecutors(boolean z) {
        if (z) {
            return;
        }
        logger.warn("Buffer server {} did not terminate.", this);
        try {
            if (!this.serverHelperExecutor.isTerminated()) {
                logger.warn("Forcing termination of {}", this.serverHelperExecutor);
                this.serverHelperExecutor.shutdownNow();
            }
            if (!this.storageHelperExecutor.isTerminated()) {
                logger.warn("Forcing termination of {}", this.storageHelperExecutor);
                this.storageHelperExecutor.shutdownNow();
            }
        } catch (RuntimeException e) {
            logger.error("Exception while terminating executors", e);
        }
    }

    public void setAuthToken(byte[] bArr) {
        this.authToken = bArr;
    }

    public static void main(String[] strArr) throws Exception {
        int parseInt = strArr.length > 0 ? Integer.parseInt(strArr[0]) : 0;
        DefaultEventLoop createEventLoop = DefaultEventLoop.createEventLoop("alone");
        Thread start = createEventLoop.start();
        new Server(createEventLoop, parseInt).run();
        start.join();
    }

    public String toString() {
        return getClass().getSimpleName() + '@' + Integer.toHexString(hashCode()) + "{address=" + getServerAddress() + "}";
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handlePurgeRequest(PurgeRequestTuple purgeRequestTuple, AbstractLengthPrependerClient abstractLengthPrependerClient) throws IOException {
        byte[] bytes;
        DataList dataList = this.publisherBuffers.get(purgeRequestTuple.getIdentifier());
        if (dataList == null) {
            bytes = ("Invalid identifier '" + purgeRequestTuple.getIdentifier() + "'").getBytes();
        } else {
            dataList.purge((purgeRequestTuple.getBaseSeconds() << 32) | purgeRequestTuple.getWindowId());
            bytes = ("Request sent for processing: " + purgeRequestTuple).getBytes();
        }
        byte[] serializedTuple = PayloadTuple.getSerializedTuple(0, bytes.length);
        System.arraycopy(bytes, 0, serializedTuple, serializedTuple.length - bytes.length, bytes.length);
        if (abstractLengthPrependerClient.write(serializedTuple)) {
            abstractLengthPrependerClient.write();
        } else {
            logger.error("Failed to deliver purge ack message. {} send buffers are full.", abstractLengthPrependerClient);
            throw new RuntimeException("Failed to deliver purge ack message. " + abstractLengthPrependerClient + "send buffers are full.");
        }
    }

    public void purge(long j) {
        Iterator<DataList> it = this.publisherBuffers.values().iterator();
        while (it.hasNext()) {
            it.next().purge(j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleResetRequest(ResetRequestTuple resetRequestTuple, AbstractLengthPrependerClient abstractLengthPrependerClient) throws IOException {
        byte[] bytes;
        DataList remove = this.publisherBuffers.remove(resetRequestTuple.getIdentifier());
        if (remove == null) {
            bytes = ("Invalid identifier '" + resetRequestTuple.getIdentifier() + "'").getBytes();
        } else {
            AbstractLengthPrependerClient remove2 = this.publisherChannels.remove(resetRequestTuple.getIdentifier());
            if (remove2 != null) {
                this.eventloop.disconnect(remove2);
            }
            remove.reset();
            bytes = ("Request sent for processing: " + resetRequestTuple).getBytes();
        }
        byte[] serializedTuple = PayloadTuple.getSerializedTuple(0, bytes.length);
        System.arraycopy(bytes, 0, serializedTuple, serializedTuple.length - bytes.length, bytes.length);
        if (abstractLengthPrependerClient.write(serializedTuple)) {
            abstractLengthPrependerClient.write();
        } else {
            logger.error("Failed to deliver reset ack message. {} send buffers are full.", abstractLengthPrependerClient);
            throw new RuntimeException("Failed to deliver reset ack message. " + abstractLengthPrependerClient + "send buffers are full.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleSubscriberRequest(final SubscribeRequestTuple subscribeRequestTuple, final SelectionKey selectionKey) {
        try {
            this.serverHelperExecutor.submit(new Runnable() { // from class: com.datatorrent.bufferserver.server.Server.2
                @Override // java.lang.Runnable
                public void run() {
                    String upstreamIdentifier = subscribeRequestTuple.getUpstreamIdentifier();
                    DataList dataList = (DataList) Server.this.publisherBuffers.get(upstreamIdentifier);
                    if (dataList == null) {
                        dataList = Tuple.FAST_VERSION.equals(subscribeRequestTuple.getVersion()) ? new FastDataList(upstreamIdentifier, Server.this.blockSize, Server.this.numberOfCacheBlocks, Server.BACK_PRESSURE_ENABLED) : new DataList(upstreamIdentifier, Server.this.blockSize, Server.this.numberOfCacheBlocks, Server.BACK_PRESSURE_ENABLED);
                        DataList dataList2 = (DataList) Server.this.publisherBuffers.putIfAbsent(upstreamIdentifier, dataList);
                        if (dataList2 != null) {
                            dataList = dataList2;
                        }
                    }
                    String identifier = subscribeRequestTuple.getIdentifier();
                    String streamType = subscribeRequestTuple.getStreamType();
                    long baseSeconds = (subscribeRequestTuple.getBaseSeconds() << 32) | subscribeRequestTuple.getWindowId();
                    LogicalNode logicalNode = new LogicalNode(identifier, upstreamIdentifier, streamType, dataList.newIterator(baseSeconds), baseSeconds, Server.this.eventloop);
                    int mask = subscribeRequestTuple.getMask();
                    if (mask != 0) {
                        for (int i : subscribeRequestTuple.getPartitions()) {
                            logicalNode.addPartition(Integer.valueOf(i).intValue(), mask);
                        }
                    }
                    LogicalNode logicalNode2 = (LogicalNode) Server.this.subscriberGroups.put(streamType, logicalNode);
                    if (logicalNode2 != null) {
                        logicalNode2.boot();
                    }
                    final Subscriber subscriber = new Subscriber(logicalNode, subscribeRequestTuple.getBufferSize());
                    Server.this.eventloop.submit(new Runnable() { // from class: com.datatorrent.bufferserver.server.Server.2.1
                        @Override // java.lang.Runnable
                        public void run() {
                            selectionKey.attach(subscriber);
                            subscriber.registered(selectionKey);
                            subscriber.connected();
                        }
                    });
                }
            });
        } catch (RejectedExecutionException e) {
            logger.error("Received subscriber request {} after server {} termination. Disconnecting {}.", new Object[]{subscribeRequestTuple, this, selectionKey.channel(), e});
            if (selectionKey.isValid()) {
                try {
                    selectionKey.channel().close();
                } catch (IOException e2) {
                    logger.error("Failed to close channel {}", selectionKey.channel(), e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void handleSubscriberTeardown(SelectionKey selectionKey) {
        try {
            final Subscriber subscriber = (Subscriber) selectionKey.attachment();
            if (subscriber != null) {
                this.serverHelperExecutor.submit(new Runnable() { // from class: com.datatorrent.bufferserver.server.Server.3
                    @Override // java.lang.Runnable
                    public void run() {
                        try {
                            LogicalNode logicalNode = subscriber.ln;
                            if (logicalNode != null) {
                                logicalNode.removeChannel(subscriber);
                                if (logicalNode.getPhysicalNodeCount() == 0) {
                                    DataList dataList = (DataList) Server.this.publisherBuffers.get(logicalNode.getUpstream());
                                    if (dataList != null) {
                                        Server.logger.info("Removing ln {} from dl {}", logicalNode, dataList);
                                        dataList.removeDataListener(logicalNode);
                                    }
                                    Server.this.subscriberGroups.remove(logicalNode.getGroup(), logicalNode);
                                    logicalNode.getIterator().close();
                                }
                                subscriber.ln = null;
                            }
                        } catch (Throwable th) {
                            Server.logger.error("Buffer server {} failed to tear down subscriber {}.", new Object[]{Server.this, subscriber, th});
                        }
                    }

                    public String toString() {
                        return subscriber + " teardown task.";
                    }
                });
            } else {
                logger.error("Selection key {} has unexpected attachment {}.", selectionKey, selectionKey.attachment());
            }
        } catch (ClassCastException e) {
            logger.error("Selection key {} has unexpected attachment {}.", selectionKey, selectionKey.attachment());
        } catch (RejectedExecutionException e2) {
            logger.error("Subscriber {} teardown after server {} termination.", new Object[]{selectionKey.attachment(), this, e2});
        }
    }

    public DataList handlePublisherRequest(PublishRequestTuple publishRequestTuple, AbstractLengthPrependerClient abstractLengthPrependerClient) {
        String identifier = publishRequestTuple.getIdentifier();
        DataList dataList = this.publisherBuffers.get(identifier);
        if (dataList != null) {
            AbstractLengthPrependerClient put = this.publisherChannels.put(identifier, abstractLengthPrependerClient);
            if (put != null) {
                this.eventloop.disconnect(put);
            }
            try {
                dataList.rewind(publishRequestTuple.getBaseSeconds(), publishRequestTuple.getWindowId());
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } else {
            dataList = Tuple.FAST_VERSION.equals(publishRequestTuple.getVersion()) ? new FastDataList(identifier, this.blockSize, this.numberOfCacheBlocks, BACK_PRESSURE_ENABLED) : new DataList(identifier, this.blockSize, this.numberOfCacheBlocks, BACK_PRESSURE_ENABLED);
            DataList putIfAbsent = this.publisherBuffers.putIfAbsent(identifier, dataList);
            if (putIfAbsent != null) {
                dataList = putIfAbsent;
            }
        }
        dataList.setSecondaryStorage(this.storage, this.storageHelperExecutor);
        return dataList;
    }

    public Listener.ClientListener getClientConnection(SocketChannel socketChannel, ServerSocketChannel serverSocketChannel) {
        Listener.ClientListener clientListener;
        if (this.authToken == null) {
            clientListener = new UnidentifiedClient();
        } else {
            Listener.ClientListener authClient = new AuthClient();
            authClient.setToken(this.authToken);
            clientListener = authClient;
        }
        return clientListener;
    }

    public void handleException(Exception exc, EventLoop eventLoop) {
        if (!(exc instanceof RuntimeException)) {
            throw new RuntimeException(exc);
        }
        throw ((RuntimeException) exc);
    }

    static {
        BACK_PRESSURE_ENABLED = !Boolean.getBoolean("org.apache.apex.bufferserver.backpressure.disable");
        logger = LoggerFactory.getLogger(Server.class);
    }
}
