package org.apache.inlong.audit.service;

import com.google.gson.FieldNamingPolicy;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.inlong.audit.config.ElasticsearchConfig;
import org.apache.inlong.audit.db.entities.ESDataPo;
import org.apache.inlong.audit.protocol.AuditData;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.admin.indices.get.GetIndexRequest;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.rest.RestStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

@Service
/* loaded from: input_file:BOOT-INF/classes/org/apache/inlong/audit/service/ElasticsearchService.class */
public class ElasticsearchService implements InsertData, AutoCloseable {
    private static final Logger LOG = LoggerFactory.getLogger(ElasticsearchService.class);
    private static ScheduledExecutorService timerService = Executors.newScheduledThreadPool(1);
    private final Semaphore semaphore = new Semaphore(1);
    private List<ESDataPo> datalist = new ArrayList();

    @Autowired
    @Qualifier("restClient")
    private RestHighLevelClient client;

    @Autowired
    private ElasticsearchConfig esConfig;

    public void startTimerRoutine() {
        timerService.scheduleAtFixedRate(new Runnable() { // from class: org.apache.inlong.audit.service.ElasticsearchService.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ElasticsearchService.this.deleteTimeoutIndices();
                } catch (IOException e) {
                    ElasticsearchService.LOG.error("deleteTimeoutIndices has err: ", e);
                }
            }
        }, 1L, 1L, TimeUnit.DAYS);
        timerService.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.inlong.audit.service.ElasticsearchService.2
            @Override // java.lang.Runnable
            public void run() {
                try {
                    ElasticsearchService.this.bulkInsert();
                } catch (IOException e) {
                    ElasticsearchService.LOG.error("bulkInsert has err: ", e);
                }
            }
        }, this.esConfig.getBulkInterval(), this.esConfig.getBulkInterval(), TimeUnit.SECONDS);
    }

    public void insertData(ESDataPo eSDataPo) {
        if (this.datalist.size() >= this.esConfig.getBulkThreshold()) {
            try {
                if (bulkInsert()) {
                    LOG.info("success bulk insert {} docs", Integer.valueOf(this.esConfig.getBulkThreshold()));
                } else {
                    LOG.error("failed to bulk insert");
                }
            } catch (IOException e) {
                LOG.error("bulkInsert has err: ", e);
            }
        }
        try {
            this.semaphore.acquire();
            this.datalist.add(eSDataPo);
            this.semaphore.release();
        } catch (InterruptedException e2) {
            LOG.error("datalist semaphore has err: ", e2);
        }
    }

    protected boolean createIndex(String str) throws IOException {
        if (existsIndex(str)) {
            return true;
        }
        CreateIndexRequest createIndexRequest = new CreateIndexRequest(str);
        createIndexRequest.settings(Settings.builder().put("index.number_of_shards", this.esConfig.getShardsNum()).put("index.number_of_replicas", this.esConfig.getReplicaNum()));
        createIndexRequest.mapping("_doc", generateBuilder());
        boolean isAcknowledged = this.client.indices().create(createIndexRequest, RequestOptions.DEFAULT).isAcknowledged();
        if (isAcknowledged) {
            LOG.info("success creating index {}", str);
        } else {
            LOG.info("fail to create index {}", str);
        }
        return isAcknowledged;
    }

    protected boolean existsIndex(String str) throws IOException {
        GetIndexRequest getIndexRequest = new GetIndexRequest();
        getIndexRequest.indices(new String[]{str});
        return this.client.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
    }

    protected boolean bulkInsert() throws IOException {
        if (this.datalist.isEmpty()) {
            return true;
        }
        BulkRequest bulkRequest = new BulkRequest();
        try {
            this.semaphore.acquire();
            for (ESDataPo eSDataPo : this.datalist) {
                String str = new SimpleDateFormat("yyyyMMdd").format(eSDataPo.getLogTs()) + "_" + eSDataPo.getAuditId();
                GsonBuilder gsonBuilder = new GsonBuilder();
                gsonBuilder.setFieldNamingPolicy(FieldNamingPolicy.LOWER_CASE_WITH_UNDERSCORES).setDateFormat("yyyy-MM-dd HH:mm:ss");
                String json = gsonBuilder.create().toJson(eSDataPo);
                if (createIndex(str)) {
                    bulkRequest.add(this.esConfig.isEnableCustomDocId() ? new IndexRequest(str).type("_doc").id(eSDataPo.getDocId()).source(json, XContentType.JSON) : new IndexRequest(str).type("_doc").source(json, XContentType.JSON));
                } else {
                    LOG.error("fail to create index {}", str);
                }
            }
            BulkResponse bulk = this.client.bulk(bulkRequest, RequestOptions.DEFAULT);
            this.datalist.clear();
            this.semaphore.release();
            return bulk.status().equals(RestStatus.OK);
        } catch (InterruptedException e) {
            LOG.error("datalist semaphore has err: ", e);
            return false;
        }
    }

    protected void deleteTimeoutIndices() throws IOException {
        List<String> auditIdList = this.esConfig.getAuditIdList();
        if (auditIdList.isEmpty()) {
            return;
        }
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd");
        Calendar calendar = Calendar.getInstance();
        calendar.add(5, -this.esConfig.getIndexDeleteDay());
        String format = simpleDateFormat.format(calendar.getTime());
        Iterator<String> it = auditIdList.iterator();
        while (it.hasNext()) {
            deleteSingleIndex(format + "_" + it.next());
        }
    }

    protected boolean deleteSingleIndex(String str) throws IOException {
        if (!existsIndex(str)) {
            return true;
        }
        boolean isAcknowledged = this.client.indices().delete(new DeleteIndexRequest(str), RequestOptions.DEFAULT).isAcknowledged();
        if (isAcknowledged) {
            LOG.info("success deleting index {}", str);
        } else {
            LOG.error("fail to delete index {}", str);
        }
        return isAcknowledged;
    }

    @Override // java.lang.AutoCloseable
    public void close() {
        try {
            bulkInsert();
        } catch (IOException e) {
            LOG.error("bulkInsert has err: ", e);
        }
        timerService.shutdown();
    }

    protected XContentBuilder generateBuilder() throws IOException {
        XContentBuilder jsonBuilder = XContentFactory.jsonBuilder();
        jsonBuilder.startObject();
        jsonBuilder.startObject("properties");
        jsonBuilder.startObject("audit_id");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("audit_tag");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("inlong_group_id");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("inlong_stream_id");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("docker_id");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("thread_id");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("ip");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("log_ts");
        jsonBuilder.field("type", "keyword");
        jsonBuilder.endObject();
        jsonBuilder.startObject("sdk_ts");
        jsonBuilder.field("type", "long");
        jsonBuilder.endObject();
        jsonBuilder.startObject("count");
        jsonBuilder.field("type", "long");
        jsonBuilder.endObject();
        jsonBuilder.startObject("size");
        jsonBuilder.field("type", "long");
        jsonBuilder.endObject();
        jsonBuilder.startObject("delay");
        jsonBuilder.field("type", "long");
        jsonBuilder.endObject();
        jsonBuilder.startObject("packet_id");
        jsonBuilder.field("type", "long");
        jsonBuilder.endObject();
        jsonBuilder.endObject();
        jsonBuilder.endObject();
        return jsonBuilder;
    }

    @Override // org.apache.inlong.audit.service.InsertData
    public void insert(AuditData auditData) {
        ESDataPo eSDataPo = new ESDataPo();
        eSDataPo.setIp(auditData.getIp());
        eSDataPo.setThreadId(auditData.getThreadId());
        eSDataPo.setDockerId(auditData.getDockerId());
        eSDataPo.setSdkTs(new Date(auditData.getSdkTs()).getTime());
        eSDataPo.setLogTs(new Date(auditData.getLogTs()));
        eSDataPo.setAuditId(auditData.getAuditId());
        eSDataPo.setAuditTag(auditData.getAuditTag());
        eSDataPo.setCount(auditData.getCount());
        eSDataPo.setDelay(auditData.getDelay());
        eSDataPo.setInlongGroupId(auditData.getInlongGroupId());
        eSDataPo.setInlongStreamId(auditData.getInlongStreamId());
        eSDataPo.setSize(auditData.getSize());
        eSDataPo.setPacketId(auditData.getPacketId());
        insertData(eSDataPo);
    }
}
