package com.github.thorbenkuck.netcom2.network.shared.clients;

import com.github.thorbenkuck.netcom2.exceptions.ClientCreationFailedException;
import com.github.thorbenkuck.netcom2.interfaces.Mutex;
import com.github.thorbenkuck.netcom2.network.client.DefaultSynchronize;
import com.github.thorbenkuck.netcom2.network.interfaces.Logging;
import com.github.thorbenkuck.netcom2.network.interfaces.ReceivingService;
import com.github.thorbenkuck.netcom2.network.interfaces.SendingService;
import com.github.thorbenkuck.netcom2.network.shared.Callback;
import com.github.thorbenkuck.netcom2.network.shared.Pipeline;
import com.github.thorbenkuck.netcom2.network.shared.PipelineCondition;
import com.github.thorbenkuck.netcom2.network.shared.QueuedPipeline;
import com.github.thorbenkuck.netcom2.network.shared.Session;
import com.github.thorbenkuck.netcom2.network.shared.Synchronize;
import com.github.thorbenkuck.netcom2.utility.NetCom2Utils;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.function.Consumer;

/* loaded from: input_file:com/github/thorbenkuck/netcom2/network/shared/clients/AbstractConnection.class */
public abstract class AbstractConnection implements Connection, Mutex {
    private final Socket socket;
    private boolean setup;
    private boolean started;
    private Session session;
    private Class<?> key;
    protected SendingService sendingService;
    protected ReceivingService receivingService;
    private final BlockingQueue<Object> toSend = new LinkedBlockingQueue();
    private final Pipeline<Connection> disconnectedPipeline = new QueuedPipeline();
    private final Semaphore semaphore = new Semaphore(1);
    private ExecutorService threadPool = NetCom2Utils.getNetComExecutorService();
    protected Logging logging = Logging.unified();

    /* loaded from: input_file:com/github/thorbenkuck/netcom2/network/shared/clients/AbstractConnection$DefaultReceiveCallback.class */
    private class DefaultReceiveCallback implements Callback<Object> {
        private DefaultReceiveCallback() {
        }

        @Override // com.github.thorbenkuck.netcom2.network.shared.Callback
        public boolean isRemovable() {
            return !AbstractConnection.this.started;
        }

        @Override // java.util.function.Consumer
        public void accept(Object obj) {
            AbstractConnection.this.receivedObject(obj);
        }

        public String toString() {
            return "DefaultReceiveCallback{removable=" + (!AbstractConnection.this.started) + "}";
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractConnection(Socket socket, SendingService sendingService, ReceivingService receivingService, Session session, Class<?> cls) {
        this.socket = socket;
        this.sendingService = sendingService;
        this.receivingService = receivingService;
        this.session = session;
        this.key = cls;
    }

    protected abstract void beforeSend(Object obj);

    abstract void receivedObject(Object obj);

    protected abstract void onClose();

    protected abstract void afterSend(Object obj);

    @Override // com.github.thorbenkuck.netcom2.network.interfaces.Loggable
    public void setLogging(Logging logging) {
        this.logging.debug("Overriding set Logging ..");
        this.logging = logging;
        logging.debug("Overrode Logging!");
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void close() throws IOException {
        this.logging.debug("Closing Connection " + this);
        this.logging.trace("Requesting soft-stop of set ReceivingService ..");
        this.receivingService.softStop();
        this.logging.trace("Requesting soft-stop of set SendingService ..");
        this.sendingService.softStop();
        this.logging.trace("Requesting soft-stop of ThreadPool ..");
        this.logging.info("Sending Service will be shut down forcefully! Expect an InterruptedException!");
        this.sendingService.notifyAll();
        this.logging.trace("Shutting down socket ..");
        this.socket.close();
        this.logging.debug("Successfully shut down Connection " + this);
        onClose();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void setup() {
        this.logging.debug("Connection setup for " + this.socket);
        try {
            this.logging.trace("SendingService setup..");
            synchronized (this) {
                this.sendingService.setup(this.socket.getOutputStream(), this.toSend);
                this.logging.trace("SendingService was successfully setup!");
                this.logging.trace("ReceivingService setup..");
                this.receivingService.setup(this, getSession());
            }
            this.logging.trace("ReceivingService was successfully setup!");
            this.logging.trace("Adding Call-Back-Hook to ReceivingService");
            this.receivingService.addReceivingCallback(new DefaultReceiveCallback());
            this.setup = true;
        } catch (IOException e) {
            try {
                this.logging.warn("Encountered Exception while ConnectionSetup!");
                this.logging.catching(e);
                close();
            } catch (IOException e2) {
                e.addSuppressed(e2);
                this.logging.fatal("Encountered Exception while cleaning up over a previously encountered Exception!", e2);
            }
            throw new ClientCreationFailedException(e);
        }
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void removeOnDisconnectedConsumer(Consumer<Connection> consumer) {
        this.logging.debug("Removed DisconnectedConsumer(" + consumer + ") from Connection " + this);
        this.disconnectedPipeline.remove(consumer);
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void write(Object obj) {
        if (!this.setup) {
            throw new IllegalStateException("Connection has to be setup to beforeSend objects!");
        }
        this.logging.trace("Running write in new Thread to write " + obj + " ..");
        this.threadPool.submit(() -> {
            this.logging.trace("notifying of new Object to send ..");
            beforeSend(obj);
            this.logging.trace("Offering object " + obj + " to write..");
            this.toSend.offer(obj);
            this.logging.trace("notifying of new Object extracted of thread ..");
            afterSend(obj);
        });
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void addObjectSendListener(Callback<Object> callback) {
        this.logging.trace("Adding SendCallback " + callback + " to " + this);
        this.sendingService.addSendDoneCallback(callback);
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void addObjectReceivedListener(Callback<Object> callback) {
        this.logging.trace("Adding ReceiveCallback " + callback + " to " + this);
        this.receivingService.addReceivingCallback(callback);
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void setThreadPool(ExecutorService executorService) {
        this.logging.error("This operation is not yet supported!");
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public Synchronize startListening() {
        if (!this.setup) {
            throw new IllegalStateException("Connection has to be setup to listen!");
        }
        if (this.started) {
            throw new IllegalStateException("Cannot startListening to an already listening Connection");
        }
        DefaultSynchronize defaultSynchronize = new DefaultSynchronize();
        this.logging.debug("Starting to listen to: " + this);
        this.threadPool.submit(() -> {
            try {
                this.logging.trace("Awaiting Synchronization of ReceivingService");
                this.receivingService.started().synchronize();
                this.logging.trace("Awaiting Synchronization of SendingService");
                this.sendingService.setConnectionIDSupplier(this::toString);
                this.sendingService.started().synchronize();
            } catch (InterruptedException e) {
                this.logging.catching(e);
            }
            this.logging.info("Synchronization complete! Connection is now listening.");
            this.started = true;
            this.logging.trace("Releasing awaiting Threads..");
            defaultSynchronize.goOn();
        });
        this.logging.trace("Executing ReceivingService ..");
        this.threadPool.submit(this.receivingService);
        this.logging.trace("Executing SendingService ..");
        this.threadPool.submit(this.sendingService);
        return defaultSynchronize;
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public PipelineCondition<Connection> addOnDisconnectedConsumer(Consumer<Connection> consumer) {
        this.logging.debug("Added DisconnectedConsumer(" + consumer + ") for Connection " + this);
        return this.disconnectedPipeline.addLast(consumer);
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public final InputStream getInputStream() throws IOException {
        return this.socket.getInputStream();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public final OutputStream getOutputStream() throws IOException {
        return this.socket.getOutputStream();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public BlockingQueue<Object> getSendInterface() {
        return this.toSend;
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public final Session getSession() {
        return this.session;
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void setSession(Session session) {
        this.logging.debug("Overriding Session for " + this);
        this.receivingService.setSession(session);
        this.session = session;
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public String getFormattedAddress() {
        return getInetAddress() + ":" + getPort();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public int getPort() {
        return this.socket.getPort();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public InetAddress getInetAddress() {
        return this.socket.getInetAddress();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public boolean isActive() {
        return this.socket.isConnected();
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public Class<?> getKey() {
        return this.key;
    }

    @Override // com.github.thorbenkuck.netcom2.network.shared.clients.Connection
    public void setKey(Class<?> cls) {
        this.key = cls;
    }

    public boolean equals(Object obj) {
        return obj != null && obj.getClass().equals(AbstractConnection.class) && ((AbstractConnection) obj).socket.equals(this.socket);
    }

    public String toString() {
        return "Connection{" + this.key.getSimpleName() + "," + getFormattedAddress() + "}";
    }

    @Override // com.github.thorbenkuck.netcom2.interfaces.Mutex
    public void acquire() throws InterruptedException {
        this.semaphore.acquire();
    }

    protected void finalize() throws Throwable {
        Logging.unified().debug("Connection ist collected by the GC ..");
        Iterator it = this.toSend.iterator();
        while (it.hasNext()) {
            Logging.unified().warn("LeftOver-Object " + it.next() + " at dead connection!");
        }
        super.finalize();
    }

    @Override // com.github.thorbenkuck.netcom2.interfaces.Mutex
    public void release() {
        this.semaphore.release();
    }
}
