/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.processors.beats;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import javax.net.ssl.SSLContext;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.flowfile.attributes.FlowFileAttributeKey;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.util.listen.AbstractListenEventBatchingProcessor;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferPool;
import org.apache.nifi.processor.util.listen.dispatcher.ByteBufferSource;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.dispatcher.SocketChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.response.ChannelResponder;
import org.apache.nifi.processor.util.listen.response.ChannelResponse;
import org.apache.nifi.processors.beats.event.BeatsEvent;
import org.apache.nifi.processors.beats.event.BeatsEventFactory;
import org.apache.nifi.processors.beats.frame.BeatsEncoder;
import org.apache.nifi.processors.beats.handler.BeatsSocketChannelHandlerFactory;
import org.apache.nifi.processors.beats.response.BeatsChannelResponse;
import org.apache.nifi.processors.beats.response.BeatsResponse;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.RestrictedSSLContextService;
import org.apache.nifi.ssl.SSLContextService;

@InputRequirement(value=InputRequirement.Requirement.INPUT_FORBIDDEN)
@Tags(value={"listen", "beats", "tcp", "logs"})
@CapabilityDescription(value="Listens for messages sent by libbeat compatible clients (e.g. filebeats, metricbeats, etc) using Libbeat's 'output.logstash', writing its JSON formatted payload to the content of a FlowFile.This processor replaces the now deprecated ListenLumberjack")
@WritesAttributes(value={@WritesAttribute(attribute="beats.sender", description="The sending host of the messages."), @WritesAttribute(attribute="beats.port", description="The sending port the messages were received over."), @WritesAttribute(attribute="beats.sequencenumber", description="The sequence number of the message. Only included if <Batch Size> is 1."), @WritesAttribute(attribute="mime.type", description="The mime.type of the content which is application/json")})
@SeeAlso(classNames={"org.apache.nifi.processors.standard.ParseSyslog"})
public class ListenBeats
extends AbstractListenEventBatchingProcessor<BeatsEvent> {
    public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder().name("SSL_CONTEXT_SERVICE").displayName("SSL Context Service").description("The Controller Service to use in order to obtain an SSL Context. If this property is set, messages will be received over a secure connection.").required(false).identifiesControllerService(RestrictedSSLContextService.class).build();
    public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder().name("Client Auth").displayName("Client Auth").description("The client authentication policy to use for the SSL Context. Only used if an SSL Context Service is provided.").required(false).allowableValues((Enum[])ClientAuth.values()).defaultValue(ClientAuth.REQUIRED.name()).build();
    private volatile BeatsEncoder beatsEncoder;

    protected List<PropertyDescriptor> getAdditionalProperties() {
        return Arrays.asList(MAX_CONNECTIONS, SSL_CONTEXT_SERVICE, CLIENT_AUTH);
    }

    protected Collection<ValidationResult> customValidate(ValidationContext validationContext) {
        ArrayList<ValidationResult> results = new ArrayList<ValidationResult>();
        SSLContextService sslContextService = (SSLContextService)validationContext.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null && !sslContextService.isTrustStoreConfigured()) {
            results.add(new ValidationResult.Builder().explanation("The context service must have a truststore  configured for the beats forwarder client to work correctly").valid(false).subject(SSL_CONTEXT_SERVICE.getName()).build());
        }
        String clientAuth = validationContext.getProperty(CLIENT_AUTH).getValue();
        if (sslContextService != null && StringUtils.isBlank((CharSequence)clientAuth)) {
            results.add(new ValidationResult.Builder().explanation("Client Auth must be provided when using TLS/SSL").valid(false).subject("Client Auth").build());
        }
        return results;
    }

    @OnScheduled
    public void onScheduled(ProcessContext context) throws IOException {
        super.onScheduled(context);
        this.beatsEncoder = new BeatsEncoder();
    }

    protected ChannelDispatcher createDispatcher(ProcessContext context, BlockingQueue<BeatsEvent> events) throws IOException {
        BeatsEventFactory eventFactory = new BeatsEventFactory();
        BeatsSocketChannelHandlerFactory handlerFactory = new BeatsSocketChannelHandlerFactory();
        int maxConnections = context.getProperty(MAX_CONNECTIONS).asInteger();
        int bufferSize = context.getProperty(RECV_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        Charset charSet = Charset.forName(context.getProperty(CHARSET).getValue());
        ByteBufferPool byteBufferSource = new ByteBufferPool(maxConnections, bufferSize);
        SSLContext sslContext = null;
        ClientAuth clientAuth = null;
        SSLContextService sslContextService = (SSLContextService)context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class);
        if (sslContextService != null) {
            String clientAuthValue = context.getProperty(CLIENT_AUTH).getValue();
            sslContext = sslContextService.createContext();
            clientAuth = ClientAuth.valueOf((String)clientAuthValue);
        }
        return new SocketChannelDispatcher((EventFactory)eventFactory, handlerFactory, (ByteBufferSource)byteBufferSource, events, this.getLogger(), maxConnections, sslContext, clientAuth, charSet);
    }

    protected String getBatchKey(BeatsEvent event) {
        return event.getSender();
    }

    protected void respond(BeatsEvent event, BeatsResponse beatsResponse) {
        BeatsChannelResponse response = new BeatsChannelResponse(this.beatsEncoder, beatsResponse);
        ChannelResponder responder = event.getResponder();
        responder.addResponse((ChannelResponse)response);
        try {
            responder.respond();
        }
        catch (IOException e) {
            this.getLogger().error("Error sending response for transaction {} due to {}", new Object[]{event.getSeqNumber(), e.getMessage()}, (Throwable)e);
        }
    }

    protected void postProcess(ProcessContext context, ProcessSession session, List<BeatsEvent> events) {
        session.commitAsync(() -> {
            for (BeatsEvent event : events) {
                this.respond(event, BeatsResponse.ok(event.getSeqNumber()));
            }
        });
    }

    protected String getTransitUri(AbstractListenEventBatchingProcessor.FlowFileEventBatch batch) {
        String sender = ((BeatsEvent)((Object)batch.getEvents().get(0))).getSender();
        String senderHost = sender.startsWith("/") && sender.length() > 1 ? sender.substring(1) : sender;
        String transitUri = "beats" + "://" + senderHost + ":" + this.port;
        return transitUri;
    }

    protected Map<String, String> getAttributes(AbstractListenEventBatchingProcessor.FlowFileEventBatch batch) {
        List events = batch.getEvents();
        String sender = ((BeatsEvent)((Object)events.get(0))).getSender();
        int numAttributes = events.size() == 1 ? 5 : 4;
        HashMap<String, String> attributes = new HashMap<String, String>(numAttributes);
        attributes.put(beatsAttributes.SENDER.key(), sender);
        attributes.put(beatsAttributes.PORT.key(), String.valueOf(this.port));
        attributes.put(CoreAttributes.MIME_TYPE.key(), "application/json");
        if (events.size() == 1) {
            attributes.put(beatsAttributes.SEQNUMBER.key(), String.valueOf(((BeatsEvent)((Object)events.get(0))).getSeqNumber()));
        }
        return attributes;
    }

    public static enum beatsAttributes implements FlowFileAttributeKey
    {
        SENDER("beats.sender"),
        PORT("beats.port"),
        SEQNUMBER("beats.sequencenumber");

        private final String key;

        private beatsAttributes(String key) {
            this.key = key;
        }

        public String key() {
            return this.key;
        }
    }
}

