package org.apache.uima.ducc.ps.service.protocol.builtin;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.uima.UIMAFramework;
import org.apache.uima.ducc.ps.net.iface.IMetaTask;
import org.apache.uima.ducc.ps.net.iface.IMetaTaskTransaction;
import org.apache.uima.ducc.ps.net.impl.MetaTaskTransaction;
import org.apache.uima.ducc.ps.net.impl.TransactionId;
import org.apache.uima.ducc.ps.service.IService;
import org.apache.uima.ducc.ps.service.errors.IServiceErrorHandler;
import org.apache.uima.ducc.ps.service.errors.ServiceException;
import org.apache.uima.ducc.ps.service.errors.ServiceInitializationException;
import org.apache.uima.ducc.ps.service.processor.IProcessResult;
import org.apache.uima.ducc.ps.service.processor.IServiceProcessor;
import org.apache.uima.ducc.ps.service.protocol.INoTaskAvailableStrategy;
import org.apache.uima.ducc.ps.service.protocol.IServiceProtocolHandler;
import org.apache.uima.ducc.ps.service.transport.IServiceTransport;
import org.apache.uima.ducc.ps.service.transport.TransportException;
import org.apache.uima.ducc.ps.service.transport.XStreamUtils;
import org.apache.uima.ducc.ps.service.utils.Utils;
import org.apache.uima.util.Level;
import org.apache.uima.util.Logger;

/* loaded from: input_file:org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler.class */
public class DefaultServiceProtocolHandler implements IServiceProtocolHandler {
    Logger logger;
    private volatile boolean initError;
    private volatile boolean running;
    private volatile boolean quiescing;
    private IServiceTransport transport;
    private IServiceProcessor processor;
    private INoTaskAvailableStrategy noTaskStrategy;
    private CountDownLatch initLatch;
    private CountDownLatch stopLatch;
    private CountDownLatch startLatch;
    private IService service;
    private static ReentrantLock initLock = new ReentrantLock();
    private static AtomicInteger idGenerator = new AtomicInteger();
    private Thread retryThread;

    /* loaded from: input_file:org/apache/uima/ducc/ps/service/protocol/builtin/DefaultServiceProtocolHandler$Builder.class */
    public static class Builder {
        private IServiceTransport transport;
        private IServiceProcessor processor;
        private INoTaskAvailableStrategy strategy;
        private CountDownLatch initLatch;
        private CountDownLatch stopLatch;
        private IService service;

        public Builder withTransport(IServiceTransport iServiceTransport) {
            this.transport = iServiceTransport;
            return this;
        }

        public Builder withProcessor(IServiceProcessor iServiceProcessor) {
            this.processor = iServiceProcessor;
            return this;
        }

        public Builder withInitCompleteLatch(CountDownLatch countDownLatch) {
            this.initLatch = countDownLatch;
            return this;
        }

        public Builder withDoneLatch(CountDownLatch countDownLatch) {
            this.stopLatch = countDownLatch;
            return this;
        }

        public Builder withNoTaskStrategy(INoTaskAvailableStrategy iNoTaskAvailableStrategy) {
            this.strategy = iNoTaskAvailableStrategy;
            return this;
        }

        public Builder withService(IService iService) {
            this.service = iService;
            return this;
        }

        public DefaultServiceProtocolHandler build() {
            return new DefaultServiceProtocolHandler(this);
        }
    }

    private DefaultServiceProtocolHandler(Builder builder) {
        this.logger = UIMAFramework.getLogger(DefaultServiceProtocolHandler.class);
        this.initError = false;
        this.running = false;
        this.quiescing = false;
        this.retryThread = null;
        this.initLatch = builder.initLatch;
        this.stopLatch = builder.stopLatch;
        this.service = builder.service;
        this.transport = builder.transport;
        this.processor = builder.processor;
        this.noTaskStrategy = builder.strategy;
    }

    private void waitForAllThreadsToInitialize() {
        try {
            this.initLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }

    private void initialize() throws ServiceInitializationException {
        boolean z;
        this.startLatch = new CountDownLatch(1);
        try {
            try {
                initLock.lock();
                if (this.initError) {
                    if (z) {
                        return;
                    } else {
                        return;
                    }
                }
                this.processor.initialize();
                this.initLatch.countDown();
                initLock.unlock();
                if (this.initError) {
                    return;
                }
                waitForAllThreadsToInitialize();
            } catch (Throwable th) {
                this.initError = true;
                this.running = false;
                th.printStackTrace();
                this.logger.log(Level.WARNING, "ProtocolHandler initialize() failed -", th);
                throw new ServiceInitializationException("Thread:" + Thread.currentThread().getName() + " Failed initialization - " + th);
            }
        } finally {
            this.initLatch.countDown();
            initLock.unlock();
            if (!this.initError) {
                waitForAllThreadsToInitialize();
            }
        }
    }

    @Override // org.apache.uima.ducc.ps.service.protocol.IServiceProtocolHandler
    public boolean initialized() {
        return !this.initError;
    }

    private IMetaTaskTransaction sendAndReceive(IMetaTaskTransaction iMetaTaskTransaction) throws Exception {
        if (IMetaTaskTransaction.Type.Get.equals(iMetaTaskTransaction.getType())) {
            new TransactionId(idGenerator.addAndGet(1), 0);
        } else {
            iMetaTaskTransaction.getTransactionId().next();
        }
        iMetaTaskTransaction.setRequesterProcessName(this.service.getType());
        this.transport.addRequestorInfo(iMetaTaskTransaction);
        try {
            IMetaTaskTransaction dispatch = this.transport.dispatch(XStreamUtils.marshall(iMetaTaskTransaction));
            if (Objects.isNull(dispatch)) {
                throw new TransportException("Received invalid content (null) in response from client - rejecting request");
            }
            return dispatch;
        } catch (Exception e) {
            if (this.running) {
                throw e;
            }
            throw new TransportException("Service stopping - rejecting request");
        }
    }

    private IMetaTaskTransaction callEnd(IMetaTaskTransaction iMetaTaskTransaction) throws Exception {
        iMetaTaskTransaction.setType(IMetaTaskTransaction.Type.End);
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, "ProtocolHandler calling END");
        }
        return sendAndReceive(iMetaTaskTransaction);
    }

    private IMetaTaskTransaction callAck(IMetaTaskTransaction iMetaTaskTransaction) throws Exception {
        iMetaTaskTransaction.setType(IMetaTaskTransaction.Type.Ack);
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, "ProtocolHandler calling ACK");
        }
        return sendAndReceive(iMetaTaskTransaction);
    }

    private synchronized IMetaTaskTransaction callGet(IMetaTaskTransaction iMetaTaskTransaction) throws Exception {
        iMetaTaskTransaction.setType(IMetaTaskTransaction.Type.Get);
        if (this.logger.isLoggable(Level.FINE)) {
            this.logger.log(Level.FINE, "ProtocolHandler calling GET");
        }
        IMetaTaskTransaction iMetaTaskTransaction2 = null;
        boolean z = true;
        while (this.running) {
            iMetaTaskTransaction2 = sendAndReceive(iMetaTaskTransaction);
            if (iMetaTaskTransaction2.getMetaTask() != null && iMetaTaskTransaction2.getMetaTask().getUserSpaceTask() != null) {
                break;
            }
            if (z) {
                if (this.logger.isLoggable(Level.FINE)) {
                    this.logger.log(Level.FINE, "Process Thread:" + Thread.currentThread().getId() + " - Driver is out of tasks - waiting for awhile (" + this.noTaskStrategy.getWaitTimeInMillis() + " ms) and will try again ");
                }
                z = false;
            }
            this.noTaskStrategy.handleNoTaskSupplied();
        }
        return iMetaTaskTransaction2;
    }

    private void awaitStart() throws ServiceInitializationException {
        try {
            this.startLatch.await();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ServiceInitializationException("Thread interrupted while awaiting start()");
        }
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // java.util.concurrent.Callable
    public String call() throws ServiceInitializationException, ServiceException {
        IMetaTaskTransaction callGet;
        initialize();
        awaitStart();
        if (this.logger.isLoggable(Level.INFO)) {
            this.logger.log(Level.INFO, ".............. Thread " + Thread.currentThread().getId() + " ready to process");
        }
        while (this.running) {
            try {
                callGet = callGet(new MetaTaskTransaction());
            } catch (IllegalStateException e) {
            } catch (TransportException e2) {
            } catch (Exception e3) {
                this.logger.log(Level.WARNING, "", e3);
            }
            if (!this.running && !this.quiescing) {
                break;
            }
            if (!Objects.isNull(callGet) && (this.running || this.quiescing)) {
                this.logger.log(Level.FINE, ".............. Thread " + Thread.currentThread().getId() + " processing new task");
                if (!Objects.isNull(callGet.getMetaTask())) {
                    Object userSpaceTask = callGet.getMetaTask().getUserSpaceTask();
                    IMetaTaskTransaction callAck = callAck(callGet);
                    if (!this.running && !this.quiescing) {
                        break;
                    }
                    IProcessResult process = this.processor.process((String) userSpaceTask);
                    IServiceErrorHandler.Action action = IServiceErrorHandler.Action.CONTINUE;
                    String error = process.getError();
                    if (process.terminateProcess()) {
                        action = IServiceErrorHandler.Action.TERMINATE;
                    } else if (Objects.isNull(error)) {
                        callAck.getMetaTask().setPerformanceMetrics(process.getResult());
                    }
                    if (Objects.nonNull(error)) {
                        IMetaTask metaTask = callAck.getMetaTask();
                        if (Objects.isNull(System.getProperty("ducc.deploy.JpType"))) {
                            metaTask.setUserSpaceException(error);
                        } else {
                            this.logger.log(Level.INFO, "Sending Exception to JD:\n" + process.getExceptionObject());
                            metaTask.setUserSpaceException(serializeError(process.getExceptionObject()));
                        }
                    }
                    callEnd(callAck);
                    if (this.running && IServiceErrorHandler.Action.TERMINATE.equals(action)) {
                        this.logger.log(Level.WARNING, "Processor Failure - Action=Terminate");
                        new Thread(new Runnable() { // from class: org.apache.uima.ducc.ps.service.protocol.builtin.DefaultServiceProtocolHandler.1
                            @Override // java.lang.Runnable
                            public void run() {
                                DefaultServiceProtocolHandler.this.delegateStop();
                            }
                        }).start();
                        this.running = false;
                    }
                } else if (this.running) {
                    this.logger.log(Level.INFO, ".............. Thread " + Thread.currentThread().getId() + " GET returned null MetaTask while service is in a running state - this is unexpected");
                }
            } else {
                break;
            }
        }
        this.stopLatch.countDown();
        System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(getClass()) + ".call() >>>>>>>>>> Thread [" + Thread.currentThread().getId() + "]  ProtocolHandler stopped requesting new tasks - Stopping processor");
        this.logger.log(Level.INFO, "ProtocolHandler stopped requesting new tasks - Stopping processor");
        if (this.processor != null) {
            this.processor.stop();
        }
        return String.valueOf(Thread.currentThread().getId());
    }

    private byte[] serializeError(Throwable th) throws Exception {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        ObjectOutputStream objectOutputStream = new ObjectOutputStream(byteArrayOutputStream);
        try {
            try {
                objectOutputStream.writeObject(th);
                objectOutputStream.close();
                return byteArrayOutputStream.toByteArray();
            } catch (Exception e) {
                try {
                    this.logger.log(Level.WARNING, "Unable to Serialize " + th.getClass().getName() + " - Will Stringify It Instead");
                } catch (Exception e2) {
                }
                throw e;
            }
        } catch (Throwable th2) {
            objectOutputStream.close();
            throw th2;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void delegateStop() {
        this.service.quiesceAndStop();
    }

    @Override // org.apache.uima.ducc.ps.service.Lifecycle
    public void stop() {
        this.quiescing = false;
        this.running = false;
        try {
            if (this.retryThread != null) {
                this.retryThread.interrupt();
            }
        } catch (Exception e) {
        }
        if (this.logger.isLoggable(Level.INFO)) {
            this.logger.log(Level.INFO, getClass().getName() + " stop() called");
        }
    }

    @Override // org.apache.uima.ducc.ps.service.Lifecycle
    public void quiesceAndStop() {
        System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(getClass()) + ".queisceAndStop()");
        this.logger.log(Level.INFO, getClass().getName() + " quiesceAndStop() called");
        this.transport.stop(true);
        this.quiescing = true;
        this.running = false;
        try {
            if (this.retryThread != null) {
                this.retryThread.interrupt();
            }
        } catch (Exception e) {
        }
        try {
            this.stopLatch.await();
        } catch (Exception e2) {
        }
        System.out.println(Utils.getTimestamp() + ">>>>>>> " + Utils.getShortClassname(getClass()) + ".queisceAndStop() All process threads completed quiesce");
        this.logger.log(Level.INFO, getClass().getName() + " All process threads completed quiesce");
    }

    @Override // org.apache.uima.ducc.ps.service.Lifecycle
    public void start() {
        this.running = true;
        this.startLatch.countDown();
    }

    @Override // org.apache.uima.ducc.ps.service.protocol.IServiceProtocolHandler
    public void setServiceProcessor(IServiceProcessor iServiceProcessor) {
        this.processor = iServiceProcessor;
    }

    @Override // org.apache.uima.ducc.ps.service.protocol.IServiceProtocolHandler
    public void setTransport(IServiceTransport iServiceTransport) {
        this.transport = iServiceTransport;
    }
}
