package com.bfw.flume.plugin.es;

import com.bfw.flume.plugin.es.util.ClassLoaderUtil;
import com.bfw.flume.plugin.es.util.TypeUtil;
import java.io.File;
import java.io.IOException;
import java.io.StringWriter;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.util.Map;
import java.util.Properties;
import java.util.regex.Pattern;
import org.apache.flume.Channel;
import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.EventDeliveryException;
import org.apache.flume.Sink;
import org.apache.flume.Transaction;
import org.apache.flume.conf.BatchSizeSupported;
import org.apache.flume.conf.Configurable;
import org.apache.flume.sink.AbstractSink;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.http.message.BasicHeader;
import org.codehaus.jackson.map.ObjectMapper;
import org.elasticsearch.client.Request;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;

/* loaded from: input_file:com/bfw/flume/plugin/es/ElasticSink.class */
public class ElasticSink extends AbstractSink implements Configurable, BatchSizeSupported {
    private static String docId;
    private static int batchSize;
    private static String filterName;
    private static String clusterName;
    private static HttpHost[] hostList;
    private static RestClient restClient;
    private static Method doFilter;
    private static Object filterObject;
    private static String indexType;
    private static String indexName;
    private static boolean enableAuthCache;
    private static CredentialsProvider credentialsProvider;
    private static final ObjectMapper MAPPER = new ObjectMapper();
    private static final Pattern COLON_REGEX = Pattern.compile(":");
    private static final Pattern COMMA_REGEX = Pattern.compile(",");
    public static final Pattern NUMBER_REGEX = Pattern.compile("^[0-9]+$");
    public static final Pattern IP_REGEX = Pattern.compile("^\\d+\\.\\d+\\.\\d+\\.\\d+$");
    private static final String DEFAULT_FILTER = "com.bfw.flume.plugin.es.filter.impl.DefaultSinkFilter";

    public synchronized void start() {
        super.start();
        RestClientBuilder builder = RestClient.builder(hostList);
        builder.setDefaultHeaders(new Header[]{new BasicHeader("Content-Type", "application/json;charset=UTF-8")});
        if (null == credentialsProvider) {
            restClient = builder.build();
        } else {
            builder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() { // from class: com.bfw.flume.plugin.es.ElasticSink.1
                public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpAsyncClientBuilder) {
                    if (!ElasticSink.enableAuthCache) {
                        httpAsyncClientBuilder.disableAuthCaching();
                    }
                    return httpAsyncClientBuilder.setDefaultCredentialsProvider(ElasticSink.credentialsProvider);
                }
            });
            restClient = builder.build();
        }
    }

    public synchronized void stop() {
        try {
            if (null != restClient) {
                restClient.close();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
        super.stop();
    }

    public Sink.Status process() throws EventDeliveryException {
        Map map;
        Sink.Status status = Sink.Status.READY;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        int i = 0;
        while (true) {
            try {
                try {
                    if (i >= batchSize) {
                        break;
                    }
                    Event take = channel.take();
                    if (null == take) {
                        status = Sink.Status.BACKOFF;
                        break;
                    }
                    String trim = new String(take.getBody(), Charset.defaultCharset()).trim();
                    if (0 != trim.length() && null != (map = (Map) doFilter.invoke(filterObject, trim)) && 0 != map.size()) {
                        if (0 != docId.length()) {
                            String trim2 = map.getOrDefault(docId, "").toString().trim();
                            if (0 != trim2.length()) {
                                push(indexName, indexType, trim2, map);
                            }
                        }
                        pushByType(indexName, indexType, map);
                    }
                    i++;
                } catch (Throwable th) {
                    th.printStackTrace();
                    transaction.rollback();
                    Sink.Status status2 = Sink.Status.BACKOFF;
                    transaction.close();
                    return status2;
                }
            } catch (Throwable th2) {
                transaction.close();
                throw th2;
            }
        }
        transaction.commit();
        Sink.Status status3 = status;
        transaction.close();
        return status3;
    }

    public long getBatchSize() {
        return batchSize;
    }

    public void configure(Context context) {
        filterName = getParamValue(context, "filterName", "filter");
        clusterName = getParamValue(context, "clusterName", "ES-Cluster");
        batchSize = Integer.parseInt(getParamValue(context, "batchSize", "100"));
        enableAuthCache = Boolean.parseBoolean(getParamValue(context, "enableAuthCache", "true"));
        initHostAddress(context);
        try {
            addFilterClassPath();
            Properties properties = new Properties();
            try {
                System.out.println("INFO:====load filter config file:" + filterName + ".properties");
                properties.load(ClassLoaderUtil.getClassPathFileStream(filterName + ".properties"));
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                String str = (String) properties.remove("type");
                if (null == str || 0 == str.trim().length()) {
                    str = DEFAULT_FILTER;
                    System.out.println("WARN:filterName==" + filterName + " the filter is empty or not found, the default filter will be used...");
                }
                System.out.println("INFO:====load filter class file:" + str);
                Class<?> cls = Class.forName(str);
                try {
                    filterObject = cls.newInstance();
                    try {
                        Method declaredMethod = cls.getDeclaredMethod("pluginConfig", Map.class);
                        if (null != declaredMethod) {
                            declaredMethod.invoke(filterObject, context.getParameters());
                        }
                    } catch (Exception e2) {
                        System.out.println("Warn: " + cls.getName() + " may not be initialized:contextConfig");
                    }
                    try {
                        initFilter(cls, properties);
                    } catch (IOException | ClassNotFoundException e3) {
                        System.out.println("Warn: " + cls.getName() + " may not be initialized:filterConfig");
                    }
                    try {
                        Method declaredMethod2 = cls.getDeclaredMethod("filterConfig", Properties.class);
                        if (null != declaredMethod2) {
                            declaredMethod2.invoke(filterObject, properties);
                        }
                    } catch (Exception e4) {
                        System.out.println("Warn: " + cls.getName() + " may not be initialized:filterConfig");
                    }
                    initFilterFace(cls);
                } catch (IllegalAccessException | InstantiationException e5) {
                    throw new RuntimeException("Error:filter object instance failure!!!");
                }
            } catch (ClassNotFoundException e6) {
                throw new RuntimeException(e6);
            }
        } catch (Exception e7) {
            throw new RuntimeException(e7);
        }
    }

    private static final void initHostAddress(Context context) {
        String[] split = COMMA_REGEX.split(getParamValue(context, "hostList", "127.0.0.1:9200"));
        hostList = new HttpHost[split.length];
        for (int i = 0; i < split.length; i++) {
            String trim = split[i].trim();
            if (0 != trim.length()) {
                String[] split2 = COLON_REGEX.split(trim);
                if (split2.length >= 2) {
                    String trim2 = split2[0].trim();
                    String trim3 = split2[1].trim();
                    if (IP_REGEX.matcher(trim2).matches() && NUMBER_REGEX.matcher(trim3).matches()) {
                        hostList[i] = new HttpHost(trim2, Integer.parseInt(trim3), "http");
                    }
                } else if (split2.length > 0) {
                    String trim4 = split2[0].trim();
                    if (NUMBER_REGEX.matcher(trim4).matches()) {
                        hostList[i] = new HttpHost("127.0.0.1", Integer.parseInt(trim4), "http");
                    } else if (IP_REGEX.matcher(trim4).matches()) {
                        hostList[i] = new HttpHost(trim4, 9200, "http");
                    }
                }
            }
        }
    }

    private static final void initFilterFace(Class<?> cls) {
        try {
            String str = (String) cls.getDeclaredMethod("getIndexName", new Class[0]).invoke(filterObject, new Object[0]);
            if (null != str) {
                String trim = str.trim();
                indexName = trim;
                if (0 != trim.length()) {
                    String str2 = (String) cls.getDeclaredMethod("getIndexType", new Class[0]).invoke(filterObject, new Object[0]);
                    if (null != str2) {
                        String trim2 = str2.trim();
                        indexType = trim2;
                        if (0 != trim2.length()) {
                            doFilter = cls.getDeclaredMethod("doFilter", String.class);
                            String str3 = null;
                            String str4 = null;
                            try {
                                str3 = (String) cls.getDeclaredMethod("getUsername", new Class[0]).invoke(filterObject, new Object[0]);
                                str4 = (String) cls.getDeclaredMethod("getPassword", new Class[0]).invoke(filterObject, new Object[0]);
                            } catch (Exception e) {
                                System.out.println("Warn:===authentication information not found, will login anonymously...");
                            }
                            if (null != str3 && null != str4) {
                                credentialsProvider = new BasicCredentialsProvider();
                                credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(str3, str4));
                            }
                            try {
                                String str5 = (String) cls.getDeclaredMethod("getDocId", new Class[0]).invoke(filterObject, new Object[0]);
                                docId = null == str5 ? null : str5.trim();
                                return;
                            } catch (Exception e2) {
                                System.out.println("Warn:===no docid value was obtained, the default generated value will be used...");
                                return;
                            }
                        }
                    }
                    throw new RuntimeException("indexType can not be NULL!!!");
                }
            }
            throw new RuntimeException("indexName can not be NULL!!!");
        } catch (Exception e3) {
            throw new RuntimeException(e3);
        }
    }

    private static final void initFilter(Class<?> cls, Properties properties) throws IOException, ClassNotFoundException {
        if (null == cls || 0 == properties.size()) {
            return;
        }
        for (Map.Entry entry : properties.entrySet()) {
            String trim = ((String) entry.getKey()).trim();
            if (0 != trim.length()) {
                Field field = null;
                try {
                    field = cls.getDeclaredField(trim);
                    field.setAccessible(true);
                } catch (NoSuchFieldException | SecurityException e) {
                    e.printStackTrace();
                }
                if (null != field) {
                    Object obj = null;
                    try {
                        obj = TypeUtil.toType((String) entry.getValue(), field.getType());
                    } catch (RuntimeException e2) {
                        e2.printStackTrace();
                    }
                    if (null != obj) {
                        try {
                            if ((field.getModifiers() & 8) == 0) {
                                field.set(filterObject, obj);
                            } else {
                                field.set(cls, obj);
                            }
                        } catch (IllegalAccessException | IllegalArgumentException e3) {
                            e3.printStackTrace();
                        }
                    }
                }
            }
        }
    }

    private static final void addFilterClassPath() throws URISyntaxException, IOException {
        File file = new File(new File(ElasticSink.class.getResource("/").toURI()).getParentFile(), "filter");
        if (!file.exists()) {
            file.mkdirs();
        }
        ClassLoaderUtil.addFileToCurrentClassPath(file, (Class<?>[]) new Class[]{ElasticSink.class});
    }

    private static final String getParamValue(Context context, String str, String str2) {
        String trim = context.getString(str, str2).trim();
        return trim.length() == 0 ? str2 : trim;
    }

    private static final void push(String str, Object obj) {
        push(str, null, null, obj);
    }

    private static final void pushById(String str, String str2, Object obj) {
        push(str, null, str2, obj);
    }

    private static final void pushByType(String str, String str2, Object obj) {
        push(str, str2, null, obj);
    }

    private static final void push(String str, String str2, String str3, Object obj) {
        StringBuilder sb = new StringBuilder("/");
        sb.append(str).append("/");
        sb.append(str2);
        if (null != str3 && 0 != str3.trim().length()) {
            sb.append("/").append(str3);
        }
        sb.append("?pretty");
        executePost(sb.toString(), obj, new BasicHeader("Content-Type", "application/json;charset=UTF-8"));
    }

    private static final Response executeGet(String str, Map<String, String> map, Header... headerArr) {
        return executeRequest(str, "GET", map, null, headerArr);
    }

    private static final Response executePost(String str, Object obj, Header... headerArr) {
        return executeRequest(str, "POST", null, obj, headerArr);
    }

    private static final Response executeRequest(String str, Map<String, String> map, Object obj, Header... headerArr) {
        return executeRequest(str, "POST", map, obj, headerArr);
    }

    private static final Response executeRequest(String str, String str2, Map<String, String> map, Object obj, Header... headerArr) {
        if (null == str || 0 == str.trim().length()) {
            return null;
        }
        if (null == str2 || 0 == str2.trim().length()) {
            str2 = "POST";
        }
        Request request = new Request(str2, str);
        if (null != map && 0 != map.size()) {
            request.addParameters(map);
        }
        if (null != headerArr && 0 != headerArr.length) {
            addHeader(request, headerArr);
        }
        StringEntity stringEntity = null;
        if (null != obj) {
            StringWriter stringWriter = new StringWriter();
            try {
                MAPPER.writeValue(stringWriter, obj);
                stringEntity = new StringEntity(stringWriter.toString(), ContentType.APPLICATION_JSON);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        if (null != stringEntity) {
            request.setEntity(stringEntity);
        }
        try {
            return restClient.performRequest(request);
        } catch (IOException e2) {
            e2.printStackTrace();
            return null;
        }
    }

    private static final void addHeader(Request request, Header... headerArr) {
        RequestOptions.Builder builder = request.getOptions().toBuilder();
        for (Header header : headerArr) {
            builder.addHeader(header.getName(), header.getValue());
        }
        request.setOptions(builder);
    }
}
