package org.apache.apex.malhar.flume.discovery;

import com.datatorrent.api.Component;
import com.datatorrent.api.Context;
import com.google.common.base.Throwables;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import javax.validation.constraints.NotNull;
import org.apache.apex.malhar.flume.discovery.Discovery;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.RetryNTimes;
import org.apache.curator.utils.EnsurePath;
import org.apache.curator.x.discovery.ServiceDiscovery;
import org.apache.curator.x.discovery.ServiceDiscoveryBuilder;
import org.apache.curator.x.discovery.ServiceInstance;
import org.apache.curator.x.discovery.details.InstanceSerializer;
import org.apache.flume.conf.Configurable;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.ObjectReader;
import org.codehaus.jackson.map.ObjectWriter;
import org.codehaus.jackson.type.TypeReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery.class */
public class ZKAssistedDiscovery implements Discovery<byte[]>, Component<Context>, Configurable, Serializable {

    @NotNull
    private String connectionString;

    @NotNull
    private String basePath;
    private transient InstanceSerializerFactory instanceSerializerFactory;
    private transient CuratorFramework curatorFramework;
    private transient ServiceDiscovery<byte[]> discovery;
    private static final long serialVersionUID = 201401221145L;
    private static final Logger logger = LoggerFactory.getLogger(ZKAssistedDiscovery.class);

    @NotNull
    private String serviceName = "ApexFlume";
    private int conntectionRetrySleepMillis = 500;
    private int connectionRetryCount = 10;
    private int connectionTimeoutMillis = 1000;

    /* loaded from: input_file:org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery$InstanceSerializerFactory.class */
    public class InstanceSerializerFactory {
        private final ObjectReader objectReader;
        private final ObjectWriter objectWriter;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/apex/malhar/flume/discovery/ZKAssistedDiscovery$InstanceSerializerFactory$JacksonInstanceSerializer.class */
        public final class JacksonInstanceSerializer<T> implements InstanceSerializer<T> {
            private final TypeReference<ServiceInstance<T>> typeRef;
            private final ObjectWriter objectWriter;
            private final ObjectReader objectReader;

            JacksonInstanceSerializer(ObjectReader objectReader, ObjectWriter objectWriter, TypeReference<ServiceInstance<T>> typeReference) {
                this.objectReader = objectReader;
                this.objectWriter = objectWriter;
                this.typeRef = typeReference;
            }

            public ServiceInstance<T> deserialize(byte[] bArr) throws Exception {
                return (ServiceInstance) this.objectReader.withType(this.typeRef).readValue(bArr);
            }

            public byte[] serialize(ServiceInstance<T> serviceInstance) throws Exception {
                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                this.objectWriter.writeValue(byteArrayOutputStream, serviceInstance);
                return byteArrayOutputStream.toByteArray();
            }
        }

        InstanceSerializerFactory(ObjectReader objectReader, ObjectWriter objectWriter) {
            this.objectReader = objectReader;
            this.objectWriter = objectWriter;
        }

        public <T> InstanceSerializer<T> getInstanceSerializer(TypeReference<ServiceInstance<T>> typeReference) {
            return new JacksonInstanceSerializer(this.objectReader, this.objectWriter, typeReference);
        }
    }

    @Override // org.apache.apex.malhar.flume.discovery.Discovery
    public void unadvertise(Discovery.Service<byte[]> service) {
        doAdvertise(service, false);
    }

    @Override // org.apache.apex.malhar.flume.discovery.Discovery
    public void advertise(Discovery.Service<byte[]> service) {
        doAdvertise(service, true);
    }

    public void doAdvertise(Discovery.Service<byte[]> service, boolean z) {
        try {
            new EnsurePath(this.basePath).ensure(this.curatorFramework.getZookeeperClient());
            ServiceInstance<byte[]> zKAssistedDiscovery = getInstance(service);
            if (z) {
                this.discovery.registerService(zKAssistedDiscovery);
            } else {
                this.discovery.unregisterService(zKAssistedDiscovery);
            }
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    @Override // org.apache.apex.malhar.flume.discovery.Discovery
    public Collection<Discovery.Service<byte[]>> discover() {
        try {
            new EnsurePath(this.basePath).ensure(this.curatorFramework.getZookeeperClient());
            Collection<ServiceInstance> queryForInstances = this.discovery.queryForInstances(this.serviceName);
            ArrayList arrayList = new ArrayList(queryForInstances.size());
            for (final ServiceInstance serviceInstance : queryForInstances) {
                arrayList.add(new Discovery.Service<byte[]>() { // from class: org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery.1
                    @Override // org.apache.apex.malhar.flume.discovery.Discovery.Service
                    public String getHost() {
                        return serviceInstance.getAddress();
                    }

                    @Override // org.apache.apex.malhar.flume.discovery.Discovery.Service
                    public int getPort() {
                        return serviceInstance.getPort().intValue();
                    }

                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // org.apache.apex.malhar.flume.discovery.Discovery.Service
                    public byte[] getPayload() {
                        return (byte[]) serviceInstance.getPayload();
                    }

                    @Override // org.apache.apex.malhar.flume.discovery.Discovery.Service
                    public String getId() {
                        return serviceInstance.getId();
                    }

                    public String toString() {
                        return "{" + getId() + " => " + getHost() + ':' + getPort() + '}';
                    }
                });
            }
            return arrayList;
        } catch (Exception e) {
            throw Throwables.propagate(e);
        }
    }

    public String toString() {
        return "ZKAssistedDiscovery{serviceName=" + this.serviceName + ", connectionString=" + this.connectionString + ", basePath=" + this.basePath + ", connectionTimeoutMillis=" + this.connectionTimeoutMillis + ", connectionRetryCount=" + this.connectionRetryCount + ", conntectionRetrySleepMillis=" + this.conntectionRetrySleepMillis + '}';
    }

    public int hashCode() {
        return (47 * ((47 * ((47 * ((47 * ((47 * ((47 * 7) + this.serviceName.hashCode())) + this.connectionString.hashCode())) + this.basePath.hashCode())) + this.connectionTimeoutMillis)) + this.connectionRetryCount)) + this.conntectionRetrySleepMillis;
    }

    public boolean equals(Object obj) {
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        ZKAssistedDiscovery zKAssistedDiscovery = (ZKAssistedDiscovery) obj;
        return this.serviceName.equals(zKAssistedDiscovery.serviceName) && this.connectionString.equals(zKAssistedDiscovery.connectionString) && this.basePath.equals(zKAssistedDiscovery.basePath) && this.connectionTimeoutMillis == zKAssistedDiscovery.connectionTimeoutMillis && this.connectionRetryCount == zKAssistedDiscovery.connectionRetryCount && this.conntectionRetrySleepMillis == zKAssistedDiscovery.conntectionRetrySleepMillis;
    }

    ServiceInstance<byte[]> getInstance(Discovery.Service<byte[]> service) throws Exception {
        return ServiceInstance.builder().name(this.serviceName).address(service.getHost()).port(service.getPort()).id(service.getId()).payload(service.getPayload()).build();
    }

    private ServiceDiscovery<byte[]> getDiscovery(CuratorFramework curatorFramework) {
        return ServiceDiscoveryBuilder.builder(byte[].class).basePath(this.basePath).client(curatorFramework).serializer(this.instanceSerializerFactory.getInstanceSerializer(new TypeReference<ServiceInstance<byte[]>>() { // from class: org.apache.apex.malhar.flume.discovery.ZKAssistedDiscovery.2
        })).build();
    }

    InstanceSerializerFactory getInstanceSerializerFactory() {
        return this.instanceSerializerFactory;
    }

    public String getConnectionString() {
        return this.connectionString;
    }

    public void setConnectionString(String str) {
        this.connectionString = str;
    }

    public String getBasePath() {
        return this.basePath;
    }

    public void setBasePath(String str) {
        this.basePath = str;
    }

    public int getConnectionTimeoutMillis() {
        return this.connectionTimeoutMillis;
    }

    public void setConnectionTimeoutMillis(int i) {
        this.connectionTimeoutMillis = i;
    }

    public int getConnectionRetryCount() {
        return this.connectionRetryCount;
    }

    public void setConnectionRetryCount(int i) {
        this.connectionRetryCount = i;
    }

    public int getConntectionRetrySleepMillis() {
        return this.conntectionRetrySleepMillis;
    }

    public void setConntectionRetrySleepMillis(int i) {
        this.conntectionRetrySleepMillis = i;
    }

    public String getServiceName() {
        return this.serviceName;
    }

    public void setServiceName(String str) {
        this.serviceName = str;
    }

    public void configure(org.apache.flume.Context context) {
        this.serviceName = context.getString("serviceName", "ApexFlume");
        this.connectionString = context.getString("connectionString");
        this.basePath = context.getString("basePath");
        this.connectionTimeoutMillis = context.getInteger("connectionTimeoutMillis", 1000).intValue();
        this.connectionRetryCount = context.getInteger("connectionRetryCount", 10).intValue();
        this.conntectionRetrySleepMillis = context.getInteger("connectionRetrySleepMillis", 500).intValue();
    }

    public void setup(Context context) {
        ObjectMapper objectMapper = new ObjectMapper();
        this.instanceSerializerFactory = new InstanceSerializerFactory(objectMapper.reader(), objectMapper.writer());
        this.curatorFramework = CuratorFrameworkFactory.builder().connectionTimeoutMs(this.connectionTimeoutMillis).retryPolicy(new RetryNTimes(this.connectionRetryCount, this.conntectionRetrySleepMillis)).connectString(this.connectionString).build();
        this.curatorFramework.start();
        this.discovery = getDiscovery(this.curatorFramework);
        try {
            this.discovery.start();
        } catch (Exception e) {
            Throwables.propagate(e);
        }
    }

    public void teardown() {
        try {
            try {
                this.discovery.close();
                this.curatorFramework.close();
                this.curatorFramework = null;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } catch (Throwable th) {
            this.curatorFramework.close();
            this.curatorFramework = null;
            throw th;
        }
    }
}
