package org.apache.giraph.rexster.io;

import com.ziclix.python.sql.pipe.csv.CSVString;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.edge.Edge;
import org.apache.giraph.io.EdgeOutputFormat;
import org.apache.giraph.io.EdgeWriter;
import org.apache.giraph.rexster.conf.GiraphRexsterConstants;
import org.apache.giraph.rexster.utils.RexsterUtils;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: input_file:org/apache/giraph/rexster/io/RexsterEdgeOutputFormat.class */
public class RexsterEdgeOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends EdgeOutputFormat<I, V, E> {
    private static final Logger LOG = Logger.getLogger(RexsterEdgeOutputFormat.class);

    /* loaded from: input_file:org/apache/giraph/rexster/io/RexsterEdgeOutputFormat$NullOutputCommitter.class */
    private static class NullOutputCommitter extends OutputCommitter {
        private NullOutputCommitter() {
        }

        public void abortTask(TaskAttemptContext taskAttemptContext) {
        }

        public void cleanupJob(JobContext jobContext) {
        }

        public void commitTask(TaskAttemptContext taskAttemptContext) {
        }

        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
            return false;
        }

        public void setupJob(JobContext jobContext) {
        }

        public void setupTask(TaskAttemptContext taskAttemptContext) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/giraph/rexster/io/RexsterEdgeOutputFormat$RexsterEdgeWriter.class */
    public class RexsterEdgeWriter extends EdgeWriter<I, V, E> {
        private static final String JSON_ARRAY_KEY = "tx";
        private HttpURLConnection rexsterConn;
        private BufferedWriter rexsterBufferedStream;
        private int txsize;
        private String vlabel;
        private boolean isFirstElement = true;
        private int txcounter = 0;
        private int backoffDelay = 0;
        private int backoffRetry = 0;

        /* JADX INFO: Access modifiers changed from: protected */
        public RexsterEdgeWriter() {
        }

        @Override // org.apache.giraph.io.EdgeWriter
        public void initialize(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            this.txsize = GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_E_TXSIZE.get(getConf());
            this.vlabel = GiraphRexsterConstants.GIRAPH_REXSTER_VLABEL.get(getConf());
            this.backoffDelay = GiraphRexsterConstants.GIRAPH_REXSTER_BACKOFF_DELAY.get(getConf());
            this.backoffRetry = GiraphRexsterConstants.GIRAPH_REXSTER_BACKOFF_RETRY.get(getConf());
            startConnection();
        }

        @Override // org.apache.giraph.io.EdgeWriter
        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            stopConnection();
        }

        @Override // org.apache.giraph.io.EdgeWriter
        public void writeEdge(I i, V v, Edge<I, E> edge) throws IOException, InterruptedException {
            String str;
            if (this.txcounter == this.txsize) {
                this.txcounter = 0;
                this.isFirstElement = true;
                stopConnection();
                startConnection();
            }
            try {
                JSONObject edge2 = getEdge(i, v, edge);
                if (this.isFirstElement) {
                    this.isFirstElement = false;
                    str = "";
                } else {
                    str = CSVString.DELIMITER;
                }
                this.rexsterBufferedStream.write(str + edge2);
                this.txcounter++;
            } catch (JSONException e) {
                throw new InterruptedException("Error writing the edge: " + e.getMessage());
            }
        }

        private void startConnection() throws IOException, InterruptedException {
            this.rexsterConn = RexsterUtils.Edge.openOutputConnection(getConf());
            this.rexsterBufferedStream = new BufferedWriter(new OutputStreamWriter(this.rexsterConn.getOutputStream(), Charset.forName("UTF-8")));
            this.rexsterBufferedStream.write("{ ");
            this.rexsterBufferedStream.write("\"vlabel\" : \"" + this.vlabel + "\",");
            this.rexsterBufferedStream.write("\"delay\" : \"" + this.backoffDelay + "\",");
            this.rexsterBufferedStream.write("\"retry\" : \"" + this.backoffRetry + "\",");
            this.rexsterBufferedStream.write("\"tx\"");
            this.rexsterBufferedStream.write(" : [ ");
        }

        private void stopConnection() throws IOException, InterruptedException {
            this.rexsterBufferedStream.write(" ] }");
            this.rexsterBufferedStream.flush();
            this.rexsterBufferedStream.close();
            RexsterUtils.Edge.handleResponse(this.rexsterConn);
        }

        protected JSONObject getEdge(I i, V v, Edge<I, E> edge) throws JSONException {
            String obj = i.toString();
            String obj2 = edge.getTargetVertexId().toString();
            String obj3 = edge.mo2242getValue().toString();
            JSONObject jSONObject = new JSONObject();
            jSONObject.accumulate("_outV", obj);
            jSONObject.accumulate("_inV", obj2);
            jSONObject.accumulate("value", obj3);
            return jSONObject;
        }
    }

    @Override // org.apache.giraph.io.EdgeOutputFormat
    public RexsterEdgeOutputFormat<I, V, E>.RexsterEdgeWriter createEdgeWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new RexsterEdgeWriter();
    }

    @Override // org.apache.giraph.io.EdgeOutputFormat
    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
        GiraphConfiguration giraphConfiguration = new GiraphConfiguration(jobContext.getConfiguration());
        if (!giraphConfiguration.hasVertexOutputFormat()) {
            LOG.error("Rexster OutputFormat usage requires both Edge and Vertex OutputFormat's.");
            throw new InterruptedException("Rexster OutputFormat usage requires both Edge and Vertex OutputFormat's.");
        }
        if (GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME.get(giraphConfiguration) == null) {
            throw new InterruptedException(GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME.getKey() + " is a mandatory parameter.");
        }
    }

    @Override // org.apache.giraph.io.EdgeOutputFormat
    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new NullOutputCommitter();
    }
}
