package org.apache.qpid.server.transport;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.security.AccessControlContext;
import java.security.AccessControlException;
import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.Collection;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Stream;
import javax.security.auth.Subject;
import javax.security.auth.SubjectDomainCombiner;
import org.apache.qpid.server.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.Outcome;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.ContextProvider;
import org.apache.qpid.server.model.NamedAddressSpace;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.TaskExecutorProvider;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.security.auth.AuthenticatedPrincipal;
import org.apache.qpid.server.security.auth.sasl.SaslSettings;
import org.apache.qpid.server.session.AbstractAMQPSession;
import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.transport.AMQPConnection;
import org.apache.qpid.server.transport.AbstractAMQPConnection;
import org.apache.qpid.server.transport.network.NetworkConnection;
import org.apache.qpid.server.transport.network.Ticker;
import org.apache.qpid.server.txn.FlowToDiskTransactionObserver;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.TransactionObserver;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.util.FixedKeyMapCreator;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.virtualhost.QueueManagingVirtualHost;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/qpid/server/transport/AbstractAMQPConnection.class */
public abstract class AbstractAMQPConnection<C extends AbstractAMQPConnection<C, T>, T> extends AbstractConfiguredObject<C> implements ProtocolEngine, AMQPConnection<C>, EventLoggerProvider, SaslSettings {
    private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
    private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
    private final Broker<?> _broker;
    private final ServerNetworkConnection _network;
    private final AmqpPort<?> _port;
    private final Transport _transport;
    private final Protocol _protocol;
    private final long _connectionId;
    private final AggregateTicker _aggregateTicker;
    private final Subject _subject;
    private final List<Action<? super C>> _connectionCloseTaskList;
    private final LogSubject _logSubject;
    private volatile ContextProvider _contextProvider;
    private volatile EventLoggerProvider _eventLoggerProvider;
    private String _clientProduct;
    private String _clientVersion;
    private String _remoteProcessPid;
    private String _clientId;
    private volatile boolean _stopped;
    private final AtomicLong _messagesIn;
    private final AtomicLong _messagesOut;
    private final AtomicLong _transactedMessagesIn;
    private final AtomicLong _transactedMessagesOut;
    private final AtomicLong _bytesIn;
    private final AtomicLong _bytesOut;
    private final AtomicLong _localTransactionBegins;
    private final AtomicLong _localTransactionRollbacks;
    private final AtomicLong _localTransactionOpens;
    private final SettableFuture<Void> _transportClosedFuture;
    private final SettableFuture<Void> _modelTransportRendezvousFuture;
    private volatile NamedAddressSpace _addressSpace;
    private volatile long _lastReadTime;
    private volatile long _lastWriteTime;
    private volatile long _lastMessageInboundTime;
    private volatile long _lastMessageOutboundTime;
    private volatile boolean _messagesWritten;
    private volatile AccessControlContext _accessControllerContext;
    private volatile Thread _ioThread;
    private volatile StatisticsGatherer _statisticsGatherer;
    private volatile boolean _messageAuthorizationRequired;
    private final AtomicLong _maxMessageSize;
    private volatile int _messageCompressionThreshold;
    private volatile TransactionObserver _transactionObserver;
    private long _maxUncommittedInMemorySize;
    private final Map<ServerTransaction, Set<Ticker>> _transactionTickers;
    public static final FixedKeyMapCreator PUBLISH_ACTION_MAP_CREATOR = new FixedKeyMapCreator("routingKey", "immediate");
    private static final Logger LOGGER = LoggerFactory.getLogger(AbstractAMQPConnection.class);

    /* loaded from: input_file:org/apache/qpid/server/transport/AbstractAMQPConnection$SlowConnectionOpenTicker.class */
    private class SlowConnectionOpenTicker implements Ticker, SchedulingDelayNotificationListener {
        private final long _allowedTime;
        private volatile long _accumulatedSchedulingDelay;

        SlowConnectionOpenTicker(long j) {
            this._allowedTime = j;
        }

        @Override // org.apache.qpid.server.transport.network.Ticker
        public int getTimeToNextTick(long j) {
            return (int) (((AbstractAMQPConnection.this.getCreatedTime().getTime() + this._allowedTime) + this._accumulatedSchedulingDelay) - j);
        }

        @Override // org.apache.qpid.server.transport.network.Ticker
        public int tick(long j) {
            int timeToNextTick = getTimeToNextTick(j);
            if (timeToNextTick <= 0) {
                if (AbstractAMQPConnection.this.isOpeningInProgress()) {
                    AbstractAMQPConnection.LOGGER.warn("Connection has taken more than {} ms to establish.  Closing as possible DoS.", Long.valueOf(this._allowedTime));
                    AbstractAMQPConnection.this.getEventLogger().message(ConnectionMessages.IDLE_CLOSE("Protocol connection is not established within timeout period", true));
                    AbstractAMQPConnection.this._network.close();
                } else {
                    AbstractAMQPConnection.this._aggregateTicker.removeTicker(this);
                    AbstractAMQPConnection.this._network.removeSchedulingDelayNotificationListeners(this);
                }
            }
            return timeToNextTick;
        }

        @Override // org.apache.qpid.server.transport.SchedulingDelayNotificationListener
        public void notifySchedulingDelay(long j) {
            if (j > 0) {
                this._accumulatedSchedulingDelay += j;
            }
        }
    }

    public AbstractAMQPConnection(Broker<?> broker, ServerNetworkConnection serverNetworkConnection, AmqpPort<?> amqpPort, Transport transport, Protocol protocol, long j, AggregateTicker aggregateTicker) {
        super(amqpPort, createAttributes(j, serverNetworkConnection));
        this._subject = new Subject();
        this._connectionCloseTaskList = new CopyOnWriteArrayList();
        this._messagesIn = new AtomicLong();
        this._messagesOut = new AtomicLong();
        this._transactedMessagesIn = new AtomicLong();
        this._transactedMessagesOut = new AtomicLong();
        this._bytesIn = new AtomicLong();
        this._bytesOut = new AtomicLong();
        this._localTransactionBegins = new AtomicLong();
        this._localTransactionRollbacks = new AtomicLong();
        this._localTransactionOpens = new AtomicLong();
        this._transportClosedFuture = SettableFuture.create();
        this._modelTransportRendezvousFuture = SettableFuture.create();
        this._maxMessageSize = new AtomicLong(2147483647L);
        this._transactionTickers = new ConcurrentHashMap();
        this._broker = broker;
        this._eventLoggerProvider = broker;
        this._contextProvider = broker;
        this._statisticsGatherer = broker;
        this._network = serverNetworkConnection;
        this._port = amqpPort;
        this._transport = transport;
        this._protocol = protocol;
        this._connectionId = j;
        this._aggregateTicker = aggregateTicker;
        this._subject.getPrincipals().add(new ConnectionPrincipal(this));
        updateAccessControllerContext();
        this._transportClosedFuture.addListener(() -> {
            this._modelTransportRendezvousFuture.set((Object) null);
            doAfter(closeAsync(), this::logConnectionClose);
        }, getTaskExecutor());
        setState(State.ACTIVE);
        this._logSubject = new ConnectionLogSubject(this);
    }

    private static Map<String, Object> createAttributes(long j, NetworkConnection networkConnection) {
        HashMap hashMap = new HashMap();
        String.valueOf(networkConnection.getRemoteAddress()).replaceAll("/", "");
        hashMap.put("name", "[" + j + "] " + hashMap);
        hashMap.put(ConfiguredObject.DURABLE, false);
        return hashMap;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public final AccessControlContext getAccessControlContextFromSubject(Subject subject) {
        AccessControlContext context = AccessController.getContext();
        return (AccessControlContext) AccessController.doPrivileged(() -> {
            return subject == null ? new AccessControlContext(context, null) : new AccessControlContext(context, new SubjectDomainCombiner(subject));
        });
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r4v2, types: [org.apache.qpid.server.transport.AbstractAMQPConnection, long] */
    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    public void onOpen() {
        super.onOpen();
        this._aggregateTicker.addTicker(new SlowConnectionOpenTicker(((Long) this._port.getContextValue(Long.class, Port.CONNECTION_MAXIMUM_AUTHENTICATION_DELAY)).longValue()));
        ?? time = getCreatedTime().getTime();
        this._lastMessageOutboundTime = time;
        this._lastMessageInboundTime = time;
        time._lastWriteTime = this;
        this._lastReadTime = this;
        this._maxUncommittedInMemorySize = ((Long) getContextValue(Long.class, Connection.MAX_UNCOMMITTED_IN_MEMORY_SIZE)).longValue();
        this._transactionObserver = this._maxUncommittedInMemorySize < 0 ? FlowToDiskTransactionObserver.NOOP_TRANSACTION_OBSERVER : new FlowToDiskTransactionObserver(this._maxUncommittedInMemorySize, this._logSubject, this._eventLoggerProvider.getEventLogger());
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public Broker<?> getBroker() {
        return this._broker;
    }

    public final ServerNetworkConnection getNetwork() {
        return this._network;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection, org.apache.qpid.server.model.Connection
    public final AmqpPort<?> getPort() {
        return this._port;
    }

    @Override // org.apache.qpid.server.model.Connection
    public final Transport getTransport() {
        return this._transport;
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getTransportInfo() {
        return this._network.getTransportInfo();
    }

    @Override // org.apache.qpid.server.model.Connection
    public Protocol getProtocol() {
        return this._protocol;
    }

    @Override // org.apache.qpid.server.transport.ProtocolEngine, org.apache.qpid.server.transport.AMQPConnection
    public AggregateTicker getAggregateTicker() {
        return this._aggregateTicker;
    }

    @Override // org.apache.qpid.server.model.Connection
    public final Date getLastIoTime() {
        return new Date(Math.max(getLastReadTime(), getLastWriteTime()));
    }

    @Override // org.apache.qpid.server.transport.network.TransportActivity
    public final long getLastReadTime() {
        return this._lastReadTime;
    }

    private void updateLastReadTime() {
        this._lastReadTime = System.currentTimeMillis();
    }

    @Override // org.apache.qpid.server.transport.network.TransportActivity
    public final long getLastWriteTime() {
        return this._lastWriteTime;
    }

    public final void updateLastWriteTime() {
        long currentTimeMillis = System.currentTimeMillis();
        this._lastWriteTime = currentTimeMillis;
        if (this._messagesWritten) {
            this._messagesWritten = false;
            this._lastMessageOutboundTime = currentTimeMillis;
        }
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void updateLastMessageInboundTime() {
        this._lastMessageInboundTime = this._lastReadTime;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void updateLastMessageOutboundTime() {
        this._messagesWritten = true;
    }

    @Override // org.apache.qpid.server.model.Connection
    public Date getLastInboundMessageTime() {
        return new Date(this._lastMessageInboundTime);
    }

    @Override // org.apache.qpid.server.model.Connection
    public Date getLastOutboundMessageTime() {
        return new Date(this._lastMessageOutboundTime);
    }

    @Override // org.apache.qpid.server.model.Connection
    public Date getLastMessageTime() {
        return new Date(Math.max(this._lastMessageInboundTime, this._lastMessageOutboundTime));
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public final long getConnectionId() {
        return this._connectionId;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public String getRemoteAddressString() {
        return String.valueOf(this._network.getRemoteAddress());
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public final void stopConnection() {
        this._stopped = true;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public boolean isConnectionStopped() {
        return this._stopped;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public final String getAddressSpaceName() {
        if (getAddressSpace() == null) {
            return null;
        }
        return getAddressSpace().getName();
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getClientVersion() {
        return this._clientVersion;
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getRemoteProcessPid() {
        return this._remoteProcessPid;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void pushScheduler(NetworkConnectionScheduler networkConnectionScheduler) {
        if (this._network instanceof NonBlockingConnection) {
            ((NonBlockingConnection) this._network).pushScheduler(networkConnectionScheduler);
        }
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public NetworkConnectionScheduler popScheduler() {
        if (this._network instanceof NonBlockingConnection) {
            return ((NonBlockingConnection) this._network).popScheduler();
        }
        return null;
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getClientProduct() {
        return this._clientProduct;
    }

    protected void updateMaxMessageSize() {
        this._maxMessageSize.set(Math.min(getMaxMessageSize(getPort()), getMaxMessageSize(this._contextProvider)));
    }

    private long getMaxMessageSize(ContextProvider contextProvider) {
        try {
            int intValue = ((Integer) contextProvider.getContextValue(Integer.class, Connection.MAX_MESSAGE_SIZE)).intValue();
            if (intValue > 0) {
                return intValue;
            }
            return 2147483647L;
        } catch (IllegalArgumentException | NullPointerException e) {
            LOGGER.warn("Context variable {} has invalid value and cannot be used to restrict maximum message size", Connection.MAX_MESSAGE_SIZE, e);
            return 2147483647L;
        }
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public long getMaxMessageSize() {
        return this._maxMessageSize.get();
    }

    @Override // org.apache.qpid.server.util.Deletable
    public void addDeleteTask(Action<? super C> action) {
        this._connectionCloseTaskList.add(action);
    }

    @Override // org.apache.qpid.server.util.Deletable
    public void removeDeleteTask(Action<? super C> action) {
        this._connectionCloseTaskList.remove(action);
    }

    public void performDeleteTasks() {
        if (!runningAsSubject()) {
            runAsSubject(() -> {
                performDeleteTasks();
                return null;
            });
            return;
        }
        Iterator<Action<? super C>> it = this._connectionCloseTaskList.iterator();
        while (it.hasNext()) {
            it.next().performAction(this);
        }
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getClientId() {
        return this._clientId;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public final SocketAddress getRemoteSocketAddress() {
        return this._network.getRemoteAddress();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void registerMessageDelivered(long j) {
        this._messagesOut.incrementAndGet();
        this._bytesOut.addAndGet(j);
        this._statisticsGatherer.registerMessageDelivered(j);
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void registerMessageReceived(long j) {
        updateLastMessageInboundTime();
        this._messagesIn.incrementAndGet();
        this._bytesIn.addAndGet(j);
        this._statisticsGatherer.registerMessageReceived(j);
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void registerTransactedMessageDelivered() {
        this._transactedMessagesOut.incrementAndGet();
        this._statisticsGatherer.registerTransactedMessageDelivered();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void registerTransactedMessageReceived() {
        this._transactedMessagesIn.incrementAndGet();
        this._statisticsGatherer.registerTransactedMessageReceived();
    }

    public void setClientProduct(String str) {
        this._clientProduct = str;
    }

    public void setClientVersion(String str) {
        this._clientVersion = str;
    }

    public void setRemoteProcessPid(String str) {
        this._remoteProcessPid = str;
    }

    public void setClientId(String str) {
        this._clientId = str;
    }

    @Override // org.apache.qpid.server.transport.ProtocolEngine
    public void setIOThread(Thread thread) {
        this._ioThread = thread;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public boolean isIOThread() {
        return Thread.currentThread() == this._ioThread;
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public ListenableFuture<Void> doOnIOThreadAsync(Runnable runnable) {
        if (isIOThread()) {
            runnable.run();
            return Futures.immediateFuture((Object) null);
        }
        SettableFuture create = SettableFuture.create();
        addAsyncTask(obj -> {
            try {
                runnable.run();
                create.set((Object) null);
            } catch (RuntimeException e) {
                create.setException(e);
            }
        });
        return create;
    }

    @Override // org.apache.qpid.server.transport.ProtocolEngine
    public final void received(QpidByteBuffer qpidByteBuffer) {
        AccessController.doPrivileged(() -> {
            updateLastReadTime();
            try {
                onReceive(qpidByteBuffer);
                return null;
            } catch (StoreException e) {
                if (getAddressSpace().isActive()) {
                    throw new ServerScopedRuntimeException(e);
                }
                throw new ConnectionScopedRuntimeException(e);
            }
        }, getAccessControllerContext());
    }

    protected abstract void onReceive(QpidByteBuffer qpidByteBuffer);

    protected abstract void addAsyncTask(Action<? super T> action);

    protected abstract boolean isOpeningInProgress();

    protected <T> T runAsSubject(PrivilegedAction<T> privilegedAction) {
        return (T) Subject.doAs(this._subject, privilegedAction);
    }

    private boolean runningAsSubject() {
        return this._subject.equals(Subject.getSubject(AccessController.getContext()));
    }

    @Override // org.apache.qpid.server.transport.ProtocolEngine, org.apache.qpid.server.transport.AMQPConnection
    public Subject getSubject() {
        return this._subject;
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject, org.apache.qpid.server.model.ConfiguredObject
    public TaskExecutor getChildExecutor() {
        NamedAddressSpace addressSpace = getAddressSpace();
        return addressSpace instanceof TaskExecutorProvider ? ((TaskExecutorProvider) addressSpace).getTaskExecutor() : super.getChildExecutor();
    }

    @Override // org.apache.qpid.server.model.Connection
    public boolean isIncoming() {
        return true;
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getLocalAddress() {
        return null;
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getPrincipal() {
        Principal authorizedPrincipal = getAuthorizedPrincipal();
        if (authorizedPrincipal == null) {
            return null;
        }
        return authorizedPrincipal.getName();
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getRemoteAddress() {
        return getRemoteAddressString();
    }

    @Override // org.apache.qpid.server.model.Connection
    public String getRemoteProcessName() {
        return null;
    }

    @Override // org.apache.qpid.server.model.Connection
    public Collection<Session> getSessions() {
        return getChildren(Session.class);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    public ListenableFuture<Void> onDelete() {
        return closeAsyncIfNotAlreadyClosing();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    public ListenableFuture<Void> beforeClose() {
        return closeAsyncIfNotAlreadyClosing();
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    protected ListenableFuture<Void> onClose() {
        if (this._transactionObserver != null) {
            this._transactionObserver.reset();
        }
        return Futures.immediateFuture((Object) null);
    }

    private ListenableFuture<Void> closeAsyncIfNotAlreadyClosing() {
        if (!this._modelTransportRendezvousFuture.isDone()) {
            sendConnectionCloseAsync(AMQPConnection.CloseReason.MANAGEMENT, "Connection closed by external action");
        }
        return this._modelTransportRendezvousFuture;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    public <C extends ConfiguredObject> ListenableFuture<C> addChildAsync(Class<C> cls, Map<String, Object> map) {
        if (cls == Session.class) {
            throw new IllegalStateException();
        }
        throw new IllegalArgumentException("Cannot create a child of class " + cls.getSimpleName());
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getBytesIn() {
        return this._bytesIn.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getBytesOut() {
        return this._bytesOut.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getMessagesIn() {
        return this._messagesIn.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getMessagesOut() {
        return this._messagesOut.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getTransactedMessagesIn() {
        return this._transactedMessagesIn.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getTransactedMessagesOut() {
        return this._transactedMessagesOut.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public void resetStatistics() {
        this._lastMessageInboundTime = System.currentTimeMillis();
        this._lastMessageOutboundTime = System.currentTimeMillis();
        this._bytesIn.set(0L);
        this._bytesOut.set(0L);
        this._messagesIn.set(0L);
        this._messagesOut.set(0L);
        this._transactedMessagesIn.set(0L);
        this._transactedMessagesOut.set(0L);
        this._localTransactionBegins.set(0L);
        this._localTransactionRollbacks.set(0L);
        Stream stream = getChildren(Session.class).stream();
        Class<AbstractAMQPSession> cls = AbstractAMQPSession.class;
        Objects.requireNonNull(AbstractAMQPSession.class);
        stream.filter((v1) -> {
            return r1.isInstance(v1);
        }).map(session -> {
            return (AbstractAMQPSession) session;
        }).forEach((v0) -> {
            v0.resetStatistics();
        });
    }

    public AccessControlContext getAccessControllerContext() {
        return this._accessControllerContext;
    }

    public final void updateAccessControllerContext() {
        this._accessControllerContext = getAccessControlContextFromSubject(getSubject());
    }

    private void logConnectionOpen() {
        runAsSubject(() -> {
            String obj;
            SocketAddress localAddress = this._network.getLocalAddress();
            if (localAddress instanceof InetSocketAddress) {
                InetSocketAddress inetSocketAddress = (InetSocketAddress) localAddress;
                obj = inetSocketAddress.getAddress().getHostAddress() + ":" + inetSocketAddress.getPort();
            } else {
                obj = localAddress.toString();
            }
            getEventLogger().message(ConnectionMessages.OPEN(getPort().getName(), obj, getProtocol().getProtocolVersion(), getClientId(), getClientVersion(), getClientProduct(), getTransport().isSecure(), getClientId() != null, getClientVersion() != null, getClientProduct() != null));
            return null;
        });
    }

    private void logConnectionClose() {
        runAsSubject(() -> {
            LogMessage DROPPED_CONNECTION;
            String closeCause = getCloseCause();
            EventLogger eventLogger = getEventLogger();
            if (isOrderlyClose()) {
                DROPPED_CONNECTION = ConnectionMessages.CLOSE(closeCause, closeCause != null);
            } else {
                DROPPED_CONNECTION = ConnectionMessages.DROPPED_CONNECTION();
            }
            eventLogger.message(DROPPED_CONNECTION);
            return null;
        });
    }

    protected void initialiseHeartbeating(long j, long j2) {
        if (j > 0) {
            this._aggregateTicker.addTicker(new ServerIdleWriteTimeoutTicker(this, (int) j));
            this._network.setMaxWriteIdleMillis(j);
        }
        if (j2 > 0) {
            this._aggregateTicker.addTicker(new ServerIdleReadTimeoutTicker(this._network, this, (int) j2));
            this._network.setMaxReadIdleMillis(j2);
        }
    }

    protected abstract boolean isOrderlyClose();

    protected abstract String getCloseCause();

    @Override // org.apache.qpid.server.model.Connection
    public int getSessionCount() {
        return getSessionModels().size();
    }

    protected void markTransportClosed() {
        this._transportClosedFuture.set((Object) null);
    }

    public LogSubject getLogSubject() {
        return this._logSubject;
    }

    @Override // org.apache.qpid.server.logging.EventLoggerProvider
    public EventLogger getEventLogger() {
        return this._eventLoggerProvider.getEventLogger();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public final void checkAuthorizedMessagePrincipal(String str) {
        if (str != null && !"".equals(str.trim()) && this._messageAuthorizationRequired && !getAuthorizedPrincipal().getName().equals(str)) {
            throw new AccessControlException("The user id of the message '" + str + "' is not valid on a connection authenticated as  " + getAuthorizedPrincipal().getName());
        }
    }

    @Override // org.apache.qpid.server.model.Connection
    public NamedAddressSpace getAddressSpace() {
        return this._addressSpace;
    }

    public ContextProvider getContextProvider() {
        return this._contextProvider;
    }

    public void setAddressSpace(NamedAddressSpace namedAddressSpace) {
        this._addressSpace = namedAddressSpace;
        if (namedAddressSpace instanceof EventLoggerProvider) {
            this._eventLoggerProvider = (EventLoggerProvider) namedAddressSpace;
        }
        if (namedAddressSpace instanceof ContextProvider) {
            this._contextProvider = (ContextProvider) namedAddressSpace;
        }
        if (namedAddressSpace instanceof StatisticsGatherer) {
            this._statisticsGatherer = (StatisticsGatherer) namedAddressSpace;
        }
        updateMaxMessageSize();
        this._messageAuthorizationRequired = ((Boolean) this._contextProvider.getContextValue(Boolean.class, Broker.BROKER_MSG_AUTH)).booleanValue();
        this._messageCompressionThreshold = ((Integer) this._contextProvider.getContextValue(Integer.class, Broker.MESSAGE_COMPRESSION_THRESHOLD_SIZE)).intValue();
        if (this._messageCompressionThreshold <= 0) {
            this._messageCompressionThreshold = Integer.MAX_VALUE;
        }
        getSubject().getPrincipals().add(namedAddressSpace.getPrincipal());
        updateAccessControllerContext();
        logConnectionOpen();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public int getMessageCompressionThreshold() {
        return this._messageCompressionThreshold;
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getMaxUncommittedInMemorySize() {
        return this._maxUncommittedInMemorySize;
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    public String toString() {
        return getNetwork().getRemoteAddress() + "(" + (getAuthorizedPrincipal() == null ? "?" : getAuthorizedPrincipal().getName()) + ")";
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public Principal getAuthorizedPrincipal() {
        return AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject());
    }

    public void setSubject(Subject subject) {
        if (subject == null) {
            throw new IllegalArgumentException("subject cannot be null");
        }
        getSubject().getPrincipals().addAll(subject.getPrincipals());
        getSubject().getPrivateCredentials().addAll(subject.getPrivateCredentials());
        getSubject().getPublicCredentials().addAll(subject.getPublicCredentials());
        updateAccessControllerContext();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public LocalTransaction createLocalTransaction() {
        this._localTransactionBegins.incrementAndGet();
        this._localTransactionOpens.incrementAndGet();
        return new LocalTransaction(getAddressSpace().getMessageStore(), () -> {
            return getLastReadTime();
        }, this._transactionObserver, getProtocol() != Protocol.AMQP_1_0);
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void registerTransactionTickers(ServerTransaction serverTransaction, Action<String> action, long j) {
        NamedAddressSpace addressSpace = getAddressSpace();
        if (addressSpace instanceof QueueManagingVirtualHost) {
            QueueManagingVirtualHost queueManagingVirtualHost = (QueueManagingVirtualHost) addressSpace;
            EventLogger eventLogger = queueManagingVirtualHost.getEventLogger();
            LinkedHashSet linkedHashSet = new LinkedHashSet(4);
            if (queueManagingVirtualHost.getStoreTransactionOpenTimeoutWarn() > 0) {
                long storeTransactionOpenTimeoutWarn = queueManagingVirtualHost.getStoreTransactionOpenTimeoutWarn();
                Objects.requireNonNull(serverTransaction);
                linkedHashSet.add(new TransactionTimeoutTicker(storeTransactionOpenTimeoutWarn, j, serverTransaction::getTransactionStartTime, l -> {
                    eventLogger.message(getLogSubject(), ConnectionMessages.OPEN_TXN(l));
                }));
            }
            if (queueManagingVirtualHost.getStoreTransactionOpenTimeoutClose() > 0) {
                long storeTransactionOpenTimeoutClose = queueManagingVirtualHost.getStoreTransactionOpenTimeoutClose();
                Objects.requireNonNull(serverTransaction);
                linkedHashSet.add(new TransactionTimeoutTicker(storeTransactionOpenTimeoutClose, j, serverTransaction::getTransactionStartTime, l2 -> {
                    action.performAction(OPEN_TRANSACTION_TIMEOUT_ERROR);
                }));
            }
            if (queueManagingVirtualHost.getStoreTransactionIdleTimeoutWarn() > 0) {
                long storeTransactionIdleTimeoutWarn = queueManagingVirtualHost.getStoreTransactionIdleTimeoutWarn();
                Objects.requireNonNull(serverTransaction);
                linkedHashSet.add(new TransactionTimeoutTicker(storeTransactionIdleTimeoutWarn, j, serverTransaction::getTransactionUpdateTime, l3 -> {
                    eventLogger.message(getLogSubject(), ConnectionMessages.IDLE_TXN(l3));
                }));
            }
            if (queueManagingVirtualHost.getStoreTransactionIdleTimeoutClose() > 0) {
                long storeTransactionIdleTimeoutClose = queueManagingVirtualHost.getStoreTransactionIdleTimeoutClose();
                Objects.requireNonNull(serverTransaction);
                linkedHashSet.add(new TransactionTimeoutTicker(storeTransactionIdleTimeoutClose, j, serverTransaction::getTransactionUpdateTime, l4 -> {
                    action.performAction(IDLE_TRANSACTION_TIMEOUT_ERROR);
                }));
            }
            if (!linkedHashSet.isEmpty()) {
                Iterator it = linkedHashSet.iterator();
                while (it.hasNext()) {
                    getAggregateTicker().addTicker((Ticker) it.next());
                }
                notifyWork();
            }
            this._transactionTickers.put(serverTransaction, linkedHashSet);
        }
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void unregisterTransactionTickers(ServerTransaction serverTransaction) {
        if (getAddressSpace() instanceof QueueManagingVirtualHost) {
            this._transactionTickers.remove(serverTransaction).forEach(ticker -> {
                getAggregateTicker().removeTicker(ticker);
            });
        }
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    protected void logOperation(String str) {
        getEventLogger().message(ConnectionMessages.OPERATION(str));
    }

    @Override // org.apache.qpid.server.security.auth.sasl.SaslSettings
    public String getLocalFQDN() {
        SocketAddress localAddress = getNetwork().getLocalAddress();
        if (localAddress instanceof InetSocketAddress) {
            return ((InetSocketAddress) localAddress).getHostName();
        }
        throw new IllegalArgumentException("Unsupported socket address class: " + localAddress);
    }

    @Override // org.apache.qpid.server.security.auth.sasl.SaslSettings
    public Principal getExternalPrincipal() {
        return getNetwork().getPeerPrincipal();
    }

    @Override // org.apache.qpid.server.model.Connection
    public Date getOldestTransactionStartTime() {
        long j = Long.MAX_VALUE;
        Iterator<ServerTransaction> openTransactions = getOpenTransactions();
        while (openTransactions.hasNext()) {
            ServerTransaction next = openTransactions.next();
            if (next instanceof LocalTransaction) {
                long transactionStartTime = next.getTransactionStartTime();
                if (transactionStartTime > 0 && j > transactionStartTime) {
                    j = transactionStartTime;
                }
            }
        }
        if (j == Long.MAX_VALUE) {
            return null;
        }
        return new Date(j);
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getLocalTransactionBegins() {
        return this._localTransactionBegins.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getLocalTransactionOpen() {
        return this._localTransactionOpens.get();
    }

    @Override // org.apache.qpid.server.model.Connection
    public long getLocalTransactionRollbacks() {
        return this._localTransactionRollbacks.get();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void incrementTransactionRollbackCounter() {
        this._localTransactionRollbacks.incrementAndGet();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void decrementTransactionOpenCounter() {
        this._localTransactionOpens.decrementAndGet();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void incrementTransactionOpenCounter() {
        this._localTransactionOpens.incrementAndGet();
    }

    @Override // org.apache.qpid.server.transport.AMQPConnection
    public void incrementTransactionBeginCounter() {
        this._localTransactionBegins.incrementAndGet();
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    protected void logCreated(Map<String, Object> map, Outcome outcome) {
        logConnectionOpen();
    }

    @Override // org.apache.qpid.server.model.AbstractConfiguredObject
    protected void logDeleted(Outcome outcome) {
        getEventLogger().message(this._logSubject, ConnectionMessages.MODEL_DELETE());
    }
}
