package org.apache.giraph.rexster.io;

import java.io.BufferedWriter;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.net.HttpURLConnection;
import java.nio.charset.Charset;
import org.apache.giraph.conf.GiraphConfiguration;
import org.apache.giraph.conf.ImmutableClassesGiraphConfiguration;
import org.apache.giraph.graph.Vertex;
import org.apache.giraph.io.VertexOutputFormat;
import org.apache.giraph.io.VertexWriter;
import org.apache.giraph.rexster.conf.GiraphRexsterConstants;
import org.apache.giraph.rexster.utils.RexsterUtils;
import org.apache.giraph.zk.ZooKeeperExt;
import org.apache.giraph.zk.ZooKeeperManager;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.json.JSONException;
import org.json.JSONObject;

/* loaded from: input_file:org/apache/giraph/rexster/io/RexsterVertexOutputFormat.class */
public class RexsterVertexOutputFormat<I extends WritableComparable, V extends Writable, E extends Writable> extends VertexOutputFormat<I, V, E> {
    private static final Logger LOG = Logger.getLogger(RexsterVertexOutputFormat.class);

    /* loaded from: input_file:org/apache/giraph/rexster/io/RexsterVertexOutputFormat$NullOutputCommitter.class */
    private static class NullOutputCommitter extends OutputCommitter {
        private NullOutputCommitter() {
        }

        public void abortTask(TaskAttemptContext taskAttemptContext) {
        }

        public void cleanupJob(JobContext jobContext) {
        }

        public void commitTask(TaskAttemptContext taskAttemptContext) {
        }

        public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) {
            return false;
        }

        public void setupJob(JobContext jobContext) {
        }

        public void setupTask(TaskAttemptContext taskAttemptContext) {
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/giraph/rexster/io/RexsterVertexOutputFormat$RexsterVertexWriter.class */
    public class RexsterVertexWriter extends VertexWriter<I, V, E> implements Watcher {
        private static final String BARRIER_PATH = "/_rexsterBarrier";
        private static final String JSON_ARRAY_KEY = "tx";
        private HttpURLConnection rexsterConn;
        private BufferedWriter rexsterBufferedStream;
        private int txsize;
        private String vlabel;
        private I vertexId;
        private boolean isFirstElement = true;
        private ZooKeeperExt zk = null;
        private final Object lock = new Object();
        private int txcounter = 0;

        /* JADX INFO: Access modifiers changed from: protected */
        public RexsterVertexWriter() {
        }

        public void initialize(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            ImmutableClassesGiraphConfiguration conf = getConf();
            this.vlabel = GiraphRexsterConstants.GIRAPH_REXSTER_VLABEL.get(conf);
            this.txsize = GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_V_TXSIZE.get(conf);
            startConnection();
            this.zk = new ZooKeeperExt(conf.getZookeeperList(), conf.getZooKeeperSessionTimeout(), conf.getZookeeperOpsMaxAttempts(), conf.getZookeeperOpsRetryWaitMsecs(), this, taskAttemptContext);
        }

        public void close(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
            stopConnection();
            String taskAttemptID = taskAttemptContext.getTaskAttemptID().toString();
            String str = ZooKeeperManager.getBasePath(getConf()) + "/_hadoopBsp/" + getConf().get("mapred.job.id", "Unknown Job");
            prepareBarrier(str);
            enterBarrier(str, taskAttemptID);
            checkBarrier(str, taskAttemptContext);
        }

        public void writeVertex(Vertex<I, V, E> vertex) throws IOException, InterruptedException {
            if (this.txcounter == this.txsize) {
                this.txcounter = 0;
                this.isFirstElement = true;
                stopConnection();
                startConnection();
            }
            try {
                JSONObject vertex2 = getVertex(vertex);
                vertex2.accumulate("_type", "vertex");
                vertex2.accumulate(this.vlabel, getVertexId().toString());
                String str = ",";
                if (this.isFirstElement) {
                    this.isFirstElement = false;
                    str = "";
                }
                this.rexsterBufferedStream.write(str + vertex2);
                this.txcounter++;
            } catch (JSONException e) {
                throw new InterruptedException("Error writing the vertex: " + e.getMessage());
            }
        }

        public void process(WatchedEvent watchedEvent) {
            if (watchedEvent.getType() == Watcher.Event.EventType.NodeChildrenChanged) {
                if (RexsterVertexOutputFormat.LOG.isDebugEnabled()) {
                    RexsterVertexOutputFormat.LOG.debug("signal: number of children changed.");
                }
                synchronized (this.lock) {
                    this.lock.notify();
                }
            }
        }

        private void prepareBarrier(String str) throws InterruptedException {
            try {
                this.zk.createExt(str + BARRIER_PATH, (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, false);
            } catch (KeeperException e) {
                throw new InterruptedException("RexsterVertexOutputFormat: error while creating the barrier: " + e.getMessage());
            } catch (KeeperException.NodeExistsException e2) {
                if (RexsterVertexOutputFormat.LOG.isInfoEnabled()) {
                    RexsterVertexOutputFormat.LOG.info("rexster barrier znode already exists.");
                }
            }
        }

        private void enterBarrier(String str, String str2) throws InterruptedException {
            try {
                this.zk.createExt(str + BARRIER_PATH + "/" + str2, (byte[]) null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, false);
            } catch (KeeperException e) {
                throw new InterruptedException("RexsterVertexOutputFormat: error while creating the barrier: " + e.getMessage());
            } catch (KeeperException.NodeExistsException e2) {
                if (RexsterVertexOutputFormat.LOG.isInfoEnabled()) {
                    RexsterVertexOutputFormat.LOG.info("rexster barrier znode already exists.");
                }
            }
        }

        private void checkBarrier(String str, TaskAttemptContext taskAttemptContext) throws InterruptedException {
            long mapTasks = getConf().getMapTasks() - 1;
            int i = GiraphRexsterConstants.GIRAPH_REXSTER_OUTPUT_WAIT_TIMEOUT.get(getConf());
            try {
                String str2 = str + BARRIER_PATH;
                while (this.zk.getChildrenExt(str2, true, false, false).size() < mapTasks) {
                    synchronized (this.lock) {
                        this.lock.wait(i);
                    }
                    taskAttemptContext.progress();
                }
            } catch (KeeperException e) {
                throw new InterruptedException("Error while checking the barrier:" + e.getMessage());
            }
        }

        private void startConnection() throws IOException, InterruptedException {
            this.rexsterConn = RexsterUtils.Vertex.openOutputConnection(getConf());
            this.rexsterBufferedStream = new BufferedWriter(new OutputStreamWriter(this.rexsterConn.getOutputStream(), Charset.forName("UTF-8")));
            this.rexsterBufferedStream.write("{ ");
            this.rexsterBufferedStream.write("\"vlabel\" : \"" + this.vlabel + "\",");
            this.rexsterBufferedStream.write("\"tx\"");
            this.rexsterBufferedStream.write(" : [ ");
        }

        private void stopConnection() throws IOException, InterruptedException {
            this.rexsterBufferedStream.write(" ] }");
            this.rexsterBufferedStream.flush();
            this.rexsterBufferedStream.close();
            RexsterUtils.Vertex.handleResponse(this.rexsterConn);
        }

        protected JSONObject getVertex(Vertex<I, V, E> vertex) throws JSONException {
            this.vertexId = (I) vertex.getId();
            String obj = vertex.getValue().toString();
            JSONObject jSONObject = new JSONObject();
            jSONObject.accumulate("value", obj);
            return jSONObject;
        }

        protected I getVertexId() {
            return this.vertexId;
        }
    }

    @Override // 
    /* renamed from: createVertexWriter, reason: merged with bridge method [inline-methods] */
    public RexsterVertexOutputFormat<I, V, E>.RexsterVertexWriter mo8createVertexWriter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new RexsterVertexWriter();
    }

    public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException {
        GiraphConfiguration giraphConfiguration = new GiraphConfiguration(jobContext.getConfiguration());
        if (!giraphConfiguration.hasEdgeOutputFormat()) {
            LOG.error("Rexster OutputFormat usage requires both Edge and Vertex OutputFormat's.");
            throw new InterruptedException("Rexster OutputFormat usage requires both Edge and Vertex OutputFormat's.");
        }
        if (GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME.get(giraphConfiguration) == null) {
            throw new InterruptedException(GiraphRexsterConstants.GIRAPH_REXSTER_HOSTNAME.getKey() + " is a mandatory parameter.");
        }
    }

    public OutputCommitter getOutputCommitter(TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
        return new NullOutputCommitter();
    }
}
