/*
 * Decompiled with CFR 0.152.
 */
package net.wicp.tams.common.doris.constant;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.Serializable;
import java.net.HttpURLConnection;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Calendar;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import net.wicp.tams.common.doris.bean.DorisConfig;
import net.wicp.tams.common.doris.bean.RespContent;
import net.wicp.tams.common.doris.exception.StreamLoadException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DorisStreamLoadBe
implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(DorisStreamLoadBe.class);
    private static final long serialVersionUID = 1L;
    private static final List<String> DORIS_SUCCESS_STATUS = new ArrayList<String>(Arrays.asList("Success", "Publish Timeout"));
    private static final String loadUrlPattern = "http://%s/api/%s/%s/_stream_load?";
    private final String authEncoding;
    private final DorisConfig config;
    private final List<String> hostsBe;
    Random random = new Random();

    public DorisStreamLoadBe(DorisConfig config) {
        this.config = config;
        this.authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", config.getUsername(), config.getPassword()).getBytes(StandardCharsets.UTF_8));
        this.hostsBe = Lists.newArrayList((Object[])config.getHostsBe().split(","));
    }

    private HttpURLConnection getConnection(String urlStr, String label) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod("PUT");
        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", this.config.getUsername(), this.config.getPassword()).getBytes(StandardCharsets.UTF_8));
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        conn.addRequestProperty("Expect", "100-continue");
        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        conn.addRequestProperty("label", label);
        conn.addRequestProperty("column_separator", ",");
        conn.addRequestProperty("format", "json");
        conn.addRequestProperty("strip_outer_array", "true");
        conn.addRequestProperty("merge_type", "APPEND");
        conn.setDoOutput(true);
        conn.setDoInput(true);
        return conn;
    }

    private HttpURLConnection getConnectionDelete(String urlStr, String label) throws IOException {
        URL url = new URL(urlStr);
        HttpURLConnection conn = (HttpURLConnection)url.openConnection();
        conn.setInstanceFollowRedirects(false);
        conn.setRequestMethod("PUT");
        String authEncoding = Base64.getEncoder().encodeToString(String.format("%s:%s", this.config.getUsername(), this.config.getPassword()).getBytes(StandardCharsets.UTF_8));
        conn.setRequestProperty("Authorization", "Basic " + authEncoding);
        conn.addRequestProperty("Expect", "100-continue");
        conn.addRequestProperty("Content-Type", "text/plain; charset=UTF-8");
        conn.addRequestProperty("label", label);
        conn.addRequestProperty("column_separator", ",");
        conn.addRequestProperty("format", "json");
        conn.addRequestProperty("strip_outer_array", "true");
        conn.addRequestProperty("merge_type", "DELETE");
        conn.setDoOutput(true);
        conn.setDoInput(true);
        return conn;
    }

    public void flushAndRetry(String db, String tb, String data) throws Exception {
        for (int i = 0; i <= 5; ++i) {
            try {
                this.loadJsonArrayAppend(data, db, tb);
                break;
            }
            catch (Exception e) {
                log.info(" stream load retry times :" + i + " " + e.getMessage());
                if (i >= 5) {
                    throw e;
                }
                try {
                    Thread.sleep(1000L);
                    continue;
                }
                catch (InterruptedException ex) {
                    Thread.currentThread().interrupt();
                    throw new IOException("unable to flush; interrupted while doing another attempt", e);
                }
            }
        }
    }

    public LoadResponse loadJsonArrayAppend(String value, String db, String tb) throws Exception {
        Calendar calendar = Calendar.getInstance();
        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", calendar.get(1), calendar.get(2) + 1, calendar.get(5), calendar.get(11), calendar.get(12), calendar.get(13), UUID.randomUUID().toString().replaceAll("-", ""));
        HttpURLConnection feConn = null;
        HttpURLConnection beConn = null;
        try {
            String line;
            String url = this.getLoadUrlStr(db, tb);
            beConn = this.getConnection(url, label);
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(value.getBytes());
            bos.close();
            int status = beConn.getResponseCode();
            String respMsg = beConn.getResponseMessage();
            InputStream stream = (InputStream)beConn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder response = new StringBuilder();
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            if (status != 200) {
                log.error("stream load error={},url={}", (Object)respMsg, (Object)url);
                throw new StreamLoadException(respMsg);
            }
            ObjectMapper obj = new ObjectMapper();
            RespContent respContent = new RespContent();
            try {
                respContent = (RespContent)obj.readValue(response.toString(), RespContent.class);
                if (!DORIS_SUCCESS_STATUS.contains(respContent.getStatus())) {
                    log.error("stream load error={},url={}", (Object)respContent.getMessage(), (Object)url);
                    throw new StreamLoadException("stream load error:" + respContent.getMessage() + ",errorUrl:" + respContent.getErrorURL());
                }
            }
            catch (IOException | StreamLoadException e) {
                log.error("ori resp result:{}", (Object)respContent.toString());
                throw new StreamLoadException(e);
            }
            LoadResponse loadResponse = new LoadResponse(status, respMsg, response.toString());
            return loadResponse;
        }
        catch (Exception e) {
            e.printStackTrace();
            String err = "failed to load audit via AuditLoader plugin with label: " + label;
            log.error(err, (Throwable)e);
            throw e;
        }
        finally {
            if (feConn != null) {
                feConn.disconnect();
            }
            if (beConn != null) {
                beConn.disconnect();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public LoadResponse loadJsonArrayDelete(String value, String db, String tb) {
        Calendar calendar = Calendar.getInstance();
        String label = String.format("audit_%s%02d%02d_%02d%02d%02d_%s", calendar.get(1), calendar.get(2) + 1, calendar.get(5), calendar.get(11), calendar.get(12), calendar.get(13), UUID.randomUUID().toString().replaceAll("-", ""));
        HttpURLConnection feConn = null;
        HttpURLConnection beConn = null;
        try {
            String line;
            beConn = this.getConnectionDelete(this.getLoadUrlStr(db, tb), label);
            BufferedOutputStream bos = new BufferedOutputStream(beConn.getOutputStream());
            bos.write(value.getBytes());
            bos.close();
            int status = beConn.getResponseCode();
            String respMsg = beConn.getResponseMessage();
            InputStream stream = (InputStream)beConn.getContent();
            BufferedReader br = new BufferedReader(new InputStreamReader(stream));
            StringBuilder response = new StringBuilder();
            while ((line = br.readLine()) != null) {
                response.append(line);
            }
            LoadResponse loadResponse = new LoadResponse(status, respMsg, response.toString());
            return loadResponse;
        }
        catch (Exception e) {
            e.printStackTrace();
            String err = "failed to load audit via AuditLoader plugin with label: " + label;
            log.error(err, (Throwable)e);
            LoadResponse loadResponse = new LoadResponse(-1, e.getMessage(), err);
            return loadResponse;
        }
        finally {
            if (feConn != null) {
                feConn.disconnect();
            }
            if (beConn != null) {
                beConn.disconnect();
            }
        }
    }

    private String getLoadUrlStr(String db, String tb) {
        return String.format(loadUrlPattern, this.getBeHostPortRotation(), db, tb);
    }

    private String getBeHostPortRotation() {
        int nextInt = this.random.nextInt(this.hostsBe.size());
        return this.hostsBe.get(nextInt) + ":" + this.config.getHttpPortBe();
    }

    public static class LoadResponse {
        public int status;
        public String respMsg;
        public String respContent;

        public LoadResponse(int status, String respMsg, String respContent) {
            this.status = status;
            this.respMsg = respMsg;
            this.respContent = respContent;
        }

        public String toString() {
            StringBuilder sb = new StringBuilder();
            sb.append("status: ").append(this.status);
            sb.append(", resp msg: ").append(this.respMsg);
            sb.append(", resp content: ").append(this.respContent);
            return sb.toString();
        }
    }
}

