/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.beats;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.event.transport.EventException;
import org.apache.nifi.event.transport.EventServer;
import org.apache.nifi.event.transport.configuration.ShutdownQuietPeriod;
import org.apache.nifi.event.transport.configuration.ShutdownTimeout;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.listen.EventBatcher;
import org.apache.nifi.processor.util.listen.FlowFileEventBatch;
import org.apache.nifi.processor.util.listen.ListenerProperties;
import org.apache.nifi.processors.beats.protocol.BatchMessage;
import org.apache.nifi.processors.beats.server.BeatsMessageServerFactory;
import org.apache.nifi.remote.io.socket.NetworkUtils;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"beats", "logstash", "elasticsearch", "log"})
@CapabilityDescription(value="Receive messages encoded using the Elasticsearch Beats protocol and write decoded JSON")
@WritesAttributes(value={@WritesAttribute(attribute="beats.sender", description="Internet Protocol address of the message sender"), @WritesAttribute(attribute="beats.port", description="TCP port on which the Processor received messages"), @WritesAttribute(attribute="beats.sequencenumber", description="The sequence number of the message included for batches containing single messages"), @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is application/json")})
public class ListenBeats
extends AbstractProcessor {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL_CONTEXT_SERVICE").displayName("SSL Context Service").description("SSL Context Service is required to enable TLS for socket connections").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").displayName("Client Authentication").description("Client authentication policy when TLS is enabled").required(false).dependsOn(SSL_CONTEXT_SERVICE, new AllowableValue[0]).allowableValues((Enum[])ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    private static final List<PropertyDescriptor> DESCRIPTORS = Collections.unmodifiableList(Arrays.asList(ListenerProperties.NETWORK_INTF_NAME, ListenerProperties.PORT, ListenerProperties.RECV_BUFFER_SIZE, ListenerProperties.MAX_MESSAGE_QUEUE_SIZE, ListenerProperties.MAX_SOCKET_BUFFER_SIZE, ListenerProperties.CHARSET, ListenerProperties.MAX_BATCH_SIZE, ListenerProperties.MESSAGE_DELIMITER, ListenerProperties.WORKER_THREADS, SSL_CONTEXT_SERVICE, CLIENT_AUTH));
    private static final Set<Relationship> RELATIONSHIPS = Collections.singleton(REL_SUCCESS);
    protected volatile BlockingQueue<BatchMessage> events;
    protected volatile BlockingQueue<BatchMessage> errorEvents;
    protected volatile EventServer eventServer;
    protected volatile byte[] messageDemarcatorBytes;
    protected volatile EventBatcher<BatchMessage> eventBatcher;

    public final Set<Relationship> getRelationships() {
        return RELATIONSHIPS;
    }

    public List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return DESCRIPTORS;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        int workerThreads = context.getProperty(ListenerProperties.WORKER_THREADS).asInteger();
        int socketBufferSize = context.getProperty(ListenerProperties.MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        String networkInterface = context.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue();
        InetAddress address = NetworkUtils.getInterfaceAddress((String)networkInterface);
        Charset charset = Charset.forName(context.getProperty(ListenerProperties.CHARSET).getValue());
        int port = context.getProperty(ListenerProperties.PORT).evaluateAttributeExpressions().asInteger();
        this.events = new LinkedBlockingQueue<BatchMessage>(context.getProperty(ListenerProperties.MAX_MESSAGE_QUEUE_SIZE).asInteger());
        this.errorEvents = new LinkedBlockingQueue<BatchMessage>();
        String msgDemarcator = this.getMessageDemarcator(context);
        this.messageDemarcatorBytes = msgDemarcator.getBytes(charset);
        BeatsMessageServerFactory eventFactory = new BeatsMessageServerFactory(this.getLogger(), address, port, this.events);
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null) {
            String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
            ClientAuth clientAuth = ClientAuth.valueOf((String)clientAuthValue);
            SSLContext sslContext = sslContextService.createContext();
            eventFactory.setSslContext(sslContext);
            eventFactory.setClientAuth(clientAuth);
        }
        eventFactory.setSocketReceiveBuffer(socketBufferSize);
        eventFactory.setWorkerThreads(workerThreads);
        eventFactory.setThreadNamePrefix(String.format("%s[%s]", ((Object)((Object)this)).getClass().getSimpleName(), this.getIdentifier()));
        eventFactory.setShutdownQuietPeriod(ShutdownQuietPeriod.QUICK.getDuration());
        eventFactory.setShutdownTimeout(ShutdownTimeout.QUICK.getDuration());
        try {
            this.eventServer = eventFactory.getEventServer();
        }
        catch (EventException e) {
            this.getLogger().error("Failed to bind to [{}:{}]", new Object[]{address, port, e});
        }
    }

    public int getListeningPort() {
        return this.eventServer.getListeningPort();
    }

    public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException {
        EventBatcher<BatchMessage> eventBatcher = this.getEventBatcher();
        int batchSize = context.getProperty(ListenerProperties.MAX_BATCH_SIZE).asInteger();
        Map batches = eventBatcher.getBatches(session, batchSize, this.messageDemarcatorBytes);
        this.processEvents(session, batches);
    }

    @OnStopped
    public void shutdown() {
        if (this.eventServer == null) {
            this.getLogger().warn("Event Server not configured");
        } else {
            this.eventServer.shutdown();
        }
        this.eventBatcher = null;
    }

    private void processEvents(ProcessSession session, Map<String, FlowFileEventBatch<BatchMessage>> batches) {
        for (Map.Entry<String, FlowFileEventBatch<BatchMessage>> entry : batches.entrySet()) {
            FlowFile flowFile = entry.getValue().getFlowFile();
            List events = entry.getValue().getEvents();
            if (flowFile.getSize() == 0L || events.size() == 0) {
                session.remove(flowFile);
                continue;
            }
            Map<String, String> attributes = this.getAttributes(entry.getValue());
            flowFile = session.putAllAttributes(flowFile, attributes);
            session.transfer(flowFile, REL_SUCCESS);
            String transitUri = this.getTransitUri(entry.getValue());
            session.getProvenanceReporter().receive(flowFile, transitUri);
        }
    }

    private String getTransitUri(FlowFileEventBatch<BatchMessage> batch) {
        List events = batch.getEvents();
        String sender = ((BatchMessage)((Object)events.get(0))).getSender();
        return String.format("beats://%s:%d", sender, this.getListeningPort());
    }

    private Map<String, String> getAttributes(FlowFileEventBatch<BatchMessage> batch) {
        List events = batch.getEvents();
        String sender = ((BatchMessage)((Object)events.get(0))).getSender();
        LinkedHashMap<String, String> attributes = new LinkedHashMap<String, String>();
        attributes.put(BeatsAttributes.SENDER.key(), sender);
        attributes.put(BeatsAttributes.PORT.key(), String.valueOf(this.getListeningPort()));
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        if (events.size() == 1) {
            attributes.put(BeatsAttributes.SEQUENCE_NUMBER.key(), String.valueOf(((BatchMessage)((Object)events.get(0))).getSequenceNumber()));
        }
        return attributes;
    }

    private String getMessageDemarcator(ProcessContext context) {
        return context.getProperty(ListenerProperties.MESSAGE_DELIMITER).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
    }

    private EventBatcher<BatchMessage> getEventBatcher() {
        if (this.eventBatcher == null) {
            this.eventBatcher = new EventBatcher<BatchMessage>(this.getLogger(), this.events, this.errorEvents){

                protected String getBatchKey(BatchMessage event) {
                    return event.getSender();
                }
            };
        }
        return this.eventBatcher;
    }

    private static enum BeatsAttributes implements FlowFileAttributeKey
    {
        SENDER("beats.sender"),
        PORT("beats.port"),
        SEQUENCE_NUMBER("beats.sequencenumber");

        private final String key;

        private BeatsAttributes(String key) {
            this.key = key;
        }

        public String key() {
            return this.key;
        }
    }
}

