package org.apache.pulsar.functions.worker.rest.api;

import java.net.URI;
import java.util.Collection;
import java.util.LinkedList;
import java.util.function.Supplier;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.functions.proto.Function;
import org.apache.pulsar.functions.proto.InstanceCommunication;
import org.apache.pulsar.functions.utils.SinkConfigUtils;
import org.apache.pulsar.functions.utils.Utils;
import org.apache.pulsar.functions.worker.FunctionMetaDataManager;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.rest.RestException;
import org.apache.pulsar.functions.worker.rest.api.ComponentImpl;
import org.apache.pulsar.shade.javax.ws.rs.WebApplicationException;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.pulsar.common.io.SinkConfig;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.ExceptionInformation;
import org.apache.pulsar.shade.org.apache.pulsar.common.policies.data.SinkStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/SinkImpl.class */
public class SinkImpl extends ComponentImpl {
    private static final Logger log = LoggerFactory.getLogger(SinkImpl.class);

    /* loaded from: input_file:org/apache/pulsar/functions/worker/rest/api/SinkImpl$GetSinkStatus.class */
    private class GetSinkStatus extends ComponentImpl.GetStatus<SinkStatus, SinkStatus.SinkInstanceStatus.SinkInstanceStatusData> {
        private GetSinkStatus() {
            super();
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notScheduledInstance() {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            sinkInstanceStatusData.setError("Sink has not been scheduled");
            return sinkInstanceStatusData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData fromFunctionStatusProto(InstanceCommunication.FunctionStatus functionStatus, String str) {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(functionStatus.getRunning());
            sinkInstanceStatusData.setError(functionStatus.getFailureException());
            sinkInstanceStatusData.setNumRestarts(functionStatus.getNumRestarts());
            sinkInstanceStatusData.setNumReadFromPulsar(functionStatus.getNumReceived());
            sinkInstanceStatusData.setNumSystemExceptions(functionStatus.getNumSystemExceptions() + functionStatus.getNumUserExceptions() + functionStatus.getNumSourceExceptions());
            LinkedList linkedList = new LinkedList();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionInformation : functionStatus.getLatestUserExceptionsList()) {
                ExceptionInformation exceptionInformation2 = new ExceptionInformation();
                exceptionInformation2.setTimestampMs(exceptionInformation.getMsSinceEpoch());
                exceptionInformation2.setExceptionString(exceptionInformation.getExceptionString());
                linkedList.add(exceptionInformation2);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionInformation3 : functionStatus.getLatestSystemExceptionsList()) {
                ExceptionInformation exceptionInformation4 = new ExceptionInformation();
                exceptionInformation4.setTimestampMs(exceptionInformation3.getMsSinceEpoch());
                exceptionInformation4.setExceptionString(exceptionInformation3.getExceptionString());
                linkedList.add(exceptionInformation4);
            }
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionInformation5 : functionStatus.getLatestSourceExceptionsList()) {
                ExceptionInformation exceptionInformation6 = new ExceptionInformation();
                exceptionInformation6.setTimestampMs(exceptionInformation5.getMsSinceEpoch());
                exceptionInformation6.setExceptionString(exceptionInformation5.getExceptionString());
                linkedList.add(exceptionInformation6);
            }
            sinkInstanceStatusData.setLatestSystemExceptions(linkedList);
            sinkInstanceStatusData.setNumSinkExceptions(functionStatus.getNumSinkExceptions());
            LinkedList linkedList2 = new LinkedList();
            for (InstanceCommunication.FunctionStatus.ExceptionInformation exceptionInformation7 : functionStatus.getLatestSinkExceptionsList()) {
                ExceptionInformation exceptionInformation8 = new ExceptionInformation();
                exceptionInformation8.setTimestampMs(exceptionInformation7.getMsSinceEpoch());
                exceptionInformation8.setExceptionString(exceptionInformation7.getExceptionString());
                linkedList2.add(exceptionInformation8);
            }
            sinkInstanceStatusData.setLatestSinkExceptions(linkedList2);
            sinkInstanceStatusData.setNumWrittenToSink(functionStatus.getNumSuccessfullyProcessed());
            sinkInstanceStatusData.setLastReceivedTime(functionStatus.getLastInvocationTime());
            sinkInstanceStatusData.setWorkerId(str);
            return sinkInstanceStatusData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData notRunning(String str, String str2) {
            SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
            sinkInstanceStatusData.setRunning(false);
            if (str2 != null) {
                sinkInstanceStatusData.setError(str2);
            }
            sinkInstanceStatusData.setWorkerId(str);
            return sinkInstanceStatusData;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus getStatus(String str, String str2, String str3, Collection<Function.Assignment> collection, URI uri) throws PulsarAdminException {
            SinkStatus sinkStatus = new SinkStatus();
            for (Function.Assignment assignment : collection) {
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData componentInstanceStatus = SinkImpl.this.worker().getWorkerConfig().getWorkerId().equals(assignment.getWorkerId()) ? getComponentInstanceStatus(str, str2, str3, assignment.getInstance().getInstanceId(), null) : SinkImpl.this.worker().getFunctionAdmin().sink().getSinkStatus(assignment.getInstance().getFunctionMetaData().getFunctionDetails().getTenant(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getNamespace(), assignment.getInstance().getFunctionMetaData().getFunctionDetails().getName(), assignment.getInstance().getInstanceId());
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(assignment.getInstance().getInstanceId());
                sinkInstanceStatus.setStatus(componentInstanceStatus);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus2 -> {
                if (sinkInstanceStatus2.getStatus().isRunning()) {
                    sinkStatus.numRunning++;
                }
            });
            return sinkStatus;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus getStatusExternal(String str, String str2, String str3, int i) {
            SinkStatus sinkStatus = new SinkStatus();
            for (int i2 = 0; i2 < i; i2++) {
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData componentInstanceStatus = getComponentInstanceStatus(str, str2, str3, i2, null);
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(i2);
                sinkInstanceStatus.setStatus(componentInstanceStatus);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            sinkStatus.setNumInstances(sinkStatus.instances.size());
            sinkStatus.getInstances().forEach(sinkInstanceStatus2 -> {
                if (sinkInstanceStatus2.getStatus().isRunning()) {
                    sinkStatus.numRunning++;
                }
            });
            return sinkStatus;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public SinkStatus emptyStatus(int i) {
            SinkStatus sinkStatus = new SinkStatus();
            sinkStatus.setNumInstances(i);
            sinkStatus.setNumRunning(0);
            for (int i2 = 0; i2 < i; i2++) {
                SinkStatus.SinkInstanceStatus sinkInstanceStatus = new SinkStatus.SinkInstanceStatus();
                sinkInstanceStatus.setInstanceId(i2);
                SinkStatus.SinkInstanceStatus.SinkInstanceStatusData sinkInstanceStatusData = new SinkStatus.SinkInstanceStatus.SinkInstanceStatusData();
                sinkInstanceStatusData.setRunning(false);
                sinkInstanceStatusData.setError("Sink has not been scheduled");
                sinkInstanceStatus.setStatus(sinkInstanceStatusData);
                sinkStatus.addInstance(sinkInstanceStatus);
            }
            return sinkStatus;
        }

        @Override // org.apache.pulsar.functions.worker.rest.api.ComponentImpl.GetStatus
        public /* bridge */ /* synthetic */ SinkStatus getStatus(String str, String str2, String str3, Collection collection, URI uri) throws PulsarAdminException {
            return getStatus(str, str2, str3, (Collection<Function.Assignment>) collection, uri);
        }
    }

    public SinkImpl(Supplier<WorkerService> supplier) {
        super(supplier, Utils.ComponentType.SINK);
    }

    public SinkStatus.SinkInstanceStatus.SinkInstanceStatusData getSinkInstanceStatus(String str, String str2, String str3, String str4, URI uri) {
        componentInstanceStatusRequestValidate(str, str2, str3, Integer.parseInt(str4));
        try {
            return new GetSinkStatus().getComponentInstanceStatus(str, str2, str3, Integer.parseInt(str4), uri);
        } catch (WebApplicationException e) {
            throw e;
        } catch (Exception e2) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{str, str2, str3, e2});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e2.getMessage());
        }
    }

    public SinkStatus getSinkStatus(String str, String str2, String str3, URI uri) {
        componentStatusRequestValidate(str, str2, str3);
        try {
            return new GetSinkStatus().getComponentStatus(str, str2, str3, uri);
        } catch (WebApplicationException e) {
            throw e;
        } catch (Exception e2) {
            log.error("{}/{}/{} Got Exception Getting Status", new Object[]{str, str2, str3, e2});
            throw new RestException(Response.Status.INTERNAL_SERVER_ERROR, e2.getMessage());
        }
    }

    public SinkConfig getSinkInfo(String str, String str2, String str3) {
        if (!isWorkerServiceAvailable()) {
            throwUnavailableException();
        }
        try {
            validateGetFunctionRequestParams(str, str2, str3, this.componentType);
            FunctionMetaDataManager functionMetaDataManager = worker().getFunctionMetaDataManager();
            if (!functionMetaDataManager.containsFunction(str, str2, str3)) {
                log.error("{} does not exist @ /{}/{}/{}", new Object[]{this.componentType, str, str2, str3});
                throw new RestException(Response.Status.NOT_FOUND, String.format(this.componentType + " %s doesn't exist", str3));
            }
            Function.FunctionMetaData functionMetaData = functionMetaDataManager.getFunctionMetaData(str, str2, str3);
            if (calculateSubjectType(functionMetaData).equals(this.componentType)) {
                return SinkConfigUtils.convertFromDetails(functionMetaData.getFunctionDetails());
            }
            log.error("{}/{}/{} is not a {}", new Object[]{str, str2, str3, this.componentType});
            throw new RestException(Response.Status.NOT_FOUND, String.format(this.componentType + " %s doesn't exist", str3));
        } catch (IllegalArgumentException e) {
            log.error("Invalid get {} request @ /{}/{}/{}", new Object[]{this.componentType, str, str2, str3, e});
            throw new RestException(Response.Status.BAD_REQUEST, e.getMessage());
        }
    }
}
