package org.apache.asterix.experiment.client;

import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.Socket;
import java.util.HashMap;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation;
import org.apache.commons.lang3.tuple.Pair;

/* loaded from: input_file:org/apache/asterix/experiment/client/SocketTweetGenerator.class */
public class SocketTweetGenerator {
    private final ExecutorService threadPool = Executors.newCachedThreadPool(new ThreadFactory() { // from class: org.apache.asterix.experiment.client.SocketTweetGenerator.1
        private final AtomicInteger count = new AtomicInteger();

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(Runnable runnable) {
            Thread thread = new Thread(runnable, "DataGeneratorThread: " + this.count.getAndIncrement());
            thread.setDaemon(true);
            return thread;
        }
    });
    private final int partitionRangeStart;
    private final int dataGenDuration;
    private final int queryGenDuration;
    private final long startDataInterval;
    private final int nDataIntervals;
    private final String orchHost;
    private final int orchPort;
    private final List<Pair<String, Integer>> receiverAddresses;
    private final String openStreetMapFilePath;
    private final int locationSampleInterval;
    private final int recordCountPerBatchDuringIngestionOnly;
    private final int recordCountPerBatchDuringQuery;
    private final long dataGenSleepTimeDuringIngestionOnly;
    private final long dataGenSleepTimeDuringQuery;
    private final Mode mode;

    /* loaded from: input_file:org/apache/asterix/experiment/client/SocketTweetGenerator$CircularByteArrayOutputStream.class */
    private static class CircularByteArrayOutputStream extends OutputStream {
        private final byte[] buf = new byte[32768];
        private int index = 0;

        @Override // java.io.OutputStream
        public void write(byte[] bArr, int i, int i2) throws IOException {
            if (bArr == null) {
                throw new NullPointerException();
            }
            if (i < 0 || i > bArr.length || i2 < 0 || i + i2 > bArr.length || i + i2 < 0) {
                throw new IndexOutOfBoundsException();
            }
            if (i2 == 0) {
                return;
            }
            int i3 = i2;
            int i4 = i;
            while (i3 > 0) {
                int length = this.buf.length - this.index;
                System.arraycopy(bArr, i4, this.buf, this.index, length);
                i4 += length;
                i3 -= length;
                this.index = (this.index + length) % this.buf.length;
            }
        }

        @Override // java.io.OutputStream
        public void write(int i) throws IOException {
            this.buf[this.index] = (byte) i;
            this.index = (this.index + 1) % this.buf.length;
        }
    }

    /* loaded from: input_file:org/apache/asterix/experiment/client/SocketTweetGenerator$DataGenerator.class */
    public static class DataGenerator implements Runnable {
        private static final Logger LOGGER = Logger.getLogger(DataGenerator.class.getName());
        private final Mode m;
        private final Semaphore sem;
        private final String host;
        private final int port;
        private final int partition;
        private final int dataGenDuration;
        private final int queryGenDuration;
        private final int nDataIntervals;
        private final String orchHost;
        private final int orchPort;
        private long nextStopInterval;
        private final long dataSizeInterval;
        private final String openStreetMapFilePath;
        private final int locationSampleInterval;
        private final int recordCountPerBatchDuringIngestionOnly;
        private final int recordCountPerBatchDuringQuery;
        private final long dataGenSleepTimeDuringIngestionOnly;
        private final long dataGenSleepTimeDuringQuery;
        private int currentInterval = 0;
        private final boolean flagStopResume = false;

        public DataGenerator(Mode mode, Semaphore semaphore, String str, int i, int i2, int i3, int i4, int i5, long j, String str2, int i6, String str3, int i7, int i8, int i9, long j2, long j3) {
            this.m = mode;
            this.sem = semaphore;
            this.host = str;
            this.port = i;
            this.partition = i2;
            this.dataGenDuration = i3;
            this.queryGenDuration = i4;
            this.nDataIntervals = i5;
            this.dataSizeInterval = j;
            this.nextStopInterval = j;
            this.orchHost = str2;
            this.orchPort = i6;
            this.openStreetMapFilePath = str3;
            this.locationSampleInterval = i7 + ((i2 + 1) * (i2 <= 4 ? 7 : 9));
            this.recordCountPerBatchDuringIngestionOnly = i8;
            this.recordCountPerBatchDuringQuery = i9;
            this.dataGenSleepTimeDuringIngestionOnly = j2;
            this.dataGenSleepTimeDuringQuery = j3;
        }

        /* JADX WARN: Failed to calculate best type for var: r8v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.calculateFromBounds(FixTypesVisitor.java:156)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.setBestType(FixTypesVisitor.java:133)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.deduceType(FixTypesVisitor.java:238)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.tryDeduceTypes(FixTypesVisitor.java:221)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Failed to calculate best type for var: r8v0 ??
        java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.InsnArg.getType()" because "changeArg" is null
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.moveListener(TypeUpdate.java:439)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.runListeners(TypeUpdate.java:232)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.requestUpdate(TypeUpdate.java:212)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeForSsaVar(TypeUpdate.java:183)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.updateTypeChecked(TypeUpdate.java:112)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:83)
        	at jadx.core.dex.visitors.typeinference.TypeUpdate.apply(TypeUpdate.java:56)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.calculateFromBounds(TypeInferenceVisitor.java:145)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.setBestType(TypeInferenceVisitor.java:123)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.lambda$runTypePropagation$2(TypeInferenceVisitor.java:101)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.runTypePropagation(TypeInferenceVisitor.java:101)
        	at jadx.core.dex.visitors.typeinference.TypeInferenceVisitor.visit(TypeInferenceVisitor.java:75)
         */
        /* JADX WARN: Multi-variable type inference failed. Error: java.lang.NullPointerException: Cannot invoke "jadx.core.dex.instructions.args.RegisterArg.getSVar()" because the return value of "jadx.core.dex.nodes.InsnNode.getResult()" is null
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.collectRelatedVars(AbstractTypeConstraint.java:31)
        	at jadx.core.dex.visitors.typeinference.AbstractTypeConstraint.<init>(AbstractTypeConstraint.java:19)
        	at jadx.core.dex.visitors.typeinference.TypeSearch$1.<init>(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeMoveConstraint(TypeSearch.java:376)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.makeConstraint(TypeSearch.java:361)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.collectConstraints(TypeSearch.java:341)
        	at java.base/java.util.ArrayList.forEach(ArrayList.java:1596)
        	at jadx.core.dex.visitors.typeinference.TypeSearch.run(TypeSearch.java:60)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.runMultiVariableSearch(FixTypesVisitor.java:116)
        	at jadx.core.dex.visitors.typeinference.FixTypesVisitor.visit(FixTypesVisitor.java:91)
         */
        /* JADX WARN: Not initialized variable reg: 8, insn: 0x044c: MOVE (r0 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) = (r8 I:??[int, float, boolean, short, byte, char, OBJECT, ARRAY]) A[TRY_LEAVE], block:B:112:0x044c */
        /* JADX WARN: Type inference failed for: r8v0, types: [java.net.Socket] */
        @Override // java.lang.Runnable
        public void run() {
            ?? r8;
            LOGGER.info("\nDataGen[" + this.partition + "] running with the following parameters: \ndataGenDuration : " + this.dataGenDuration + "\nqueryGenDuration : " + this.queryGenDuration + "\nnDataIntervals : " + this.nDataIntervals + "\ndataSizeInterval : " + this.dataSizeInterval + "\nrecordCountPerBatchDuringIngestionOnly : " + this.recordCountPerBatchDuringIngestionOnly + "\nrecordCountPerBatchDuringQuery : " + this.recordCountPerBatchDuringQuery + "\ndataGenSleepTimeDuringIngestionOnly : " + this.dataGenSleepTimeDuringIngestionOnly + "\ndataGenSleepTimeDuringQuery : " + this.dataGenSleepTimeDuringQuery + "\nlocationSampleInterval : " + this.locationSampleInterval);
            try {
                try {
                    try {
                        Socket socket = new Socket(this.host, this.port);
                        try {
                            Socket socket2 = null;
                            if (this.m == Mode.DATA && this.orchHost != null) {
                                socket2 = new Socket(this.orchHost, this.orchPort);
                            }
                            TweetGeneratorForSpatialIndexEvaluation tweetGeneratorForSpatialIndexEvaluation = null;
                            try {
                                HashMap hashMap = new HashMap();
                                hashMap.put("duration", String.valueOf(this.m == Mode.TIME ? String.valueOf(this.dataGenDuration) : "0"));
                                if (this.openStreetMapFilePath != null) {
                                    hashMap.put("open-street-map-filepath", this.openStreetMapFilePath);
                                    hashMap.put("location-sample-interval", String.valueOf(this.locationSampleInterval));
                                }
                                tweetGeneratorForSpatialIndexEvaluation = new TweetGeneratorForSpatialIndexEvaluation(hashMap, this.partition, "adm-string", socket.getOutputStream());
                                long currentTimeMillis = System.currentTimeMillis();
                                long j = currentTimeMillis;
                                int i = 0;
                                while (tweetGeneratorForSpatialIndexEvaluation.setNextRecordBatch(this.recordCountPerBatchDuringIngestionOnly)) {
                                    if (this.m == Mode.DATA && tweetGeneratorForSpatialIndexEvaluation.getNumFlushedTweets() >= this.nextStopInterval) {
                                        if (socket2 != null) {
                                            if (this.flagStopResume) {
                                                sendStopped(socket2);
                                            } else {
                                                sendReached(socket2);
                                            }
                                        }
                                        this.nextStopInterval += this.dataSizeInterval;
                                        int i2 = this.currentInterval + 1;
                                        this.currentInterval = i2;
                                        if (i2 >= this.nDataIntervals) {
                                            break;
                                        } else if (socket2 != null && this.flagStopResume) {
                                            receiveResume(socket2);
                                        }
                                    }
                                    long currentTimeMillis2 = System.currentTimeMillis();
                                    if (LOGGER.isLoggable(Level.INFO)) {
                                        i++;
                                        if ((i * this.recordCountPerBatchDuringIngestionOnly) % 100000 == 0) {
                                            System.out.println("DataGen[" + this.partition + "][During ingestion only][TimeToInsert100000] " + (currentTimeMillis2 - j) + " in milliseconds");
                                            i = 0;
                                            j = currentTimeMillis2;
                                        }
                                    }
                                    if (this.dataGenSleepTimeDuringIngestionOnly > 0) {
                                        Thread.sleep(this.dataGenSleepTimeDuringIngestionOnly);
                                    }
                                }
                                if (LOGGER.isLoggable(Level.INFO)) {
                                    LOGGER.info("DataGen[" + this.partition + "][During ingestion only][InsertCount] Num tweets flushed = " + tweetGeneratorForSpatialIndexEvaluation.getNumFlushedTweets() + " in " + ((System.currentTimeMillis() - currentTimeMillis) / 1000) + " seconds from " + InetAddress.getLocalHost() + " to " + this.host + ":" + this.port);
                                }
                                if (socket2 != null && this.queryGenDuration > 0) {
                                    receiveResume(socket2);
                                    tweetGeneratorForSpatialIndexEvaluation.resetDurationAndFlushedTweetCount(this.queryGenDuration);
                                    long currentTimeMillis3 = System.currentTimeMillis();
                                    int i3 = 0;
                                    while (tweetGeneratorForSpatialIndexEvaluation.setNextRecordBatch(this.recordCountPerBatchDuringQuery)) {
                                        long currentTimeMillis4 = System.currentTimeMillis();
                                        if (LOGGER.isLoggable(Level.INFO)) {
                                            i3++;
                                            if ((i3 * this.recordCountPerBatchDuringQuery) % 100000 == 0) {
                                                System.out.println("DataGen[" + this.partition + "][During ingestion + queries][TimeToInsert100000] " + (currentTimeMillis4 - currentTimeMillis3) + " in milliseconds");
                                                i3 = 0;
                                                currentTimeMillis3 = currentTimeMillis4;
                                            }
                                        }
                                        if (this.dataGenSleepTimeDuringQuery > 0) {
                                            Thread.sleep(this.dataGenSleepTimeDuringQuery);
                                        }
                                    }
                                    if (LOGGER.isLoggable(Level.INFO)) {
                                        LOGGER.info("DataGen[" + this.partition + "][During ingestion + queries][InsertCount] Num tweets flushed = " + tweetGeneratorForSpatialIndexEvaluation.getNumFlushedTweets() + " in " + this.queryGenDuration + " seconds from " + InetAddress.getLocalHost() + " to " + this.host + ":" + this.port);
                                    }
                                    sendReached(socket2);
                                }
                                if (socket2 != null) {
                                    socket2.close();
                                }
                                if (LOGGER.isLoggable(Level.INFO)) {
                                    LOGGER.info("Num tweets flushed = " + tweetGeneratorForSpatialIndexEvaluation.getNumFlushedTweets() + " in " + this.dataGenDuration + " seconds from " + InetAddress.getLocalHost() + " to " + this.host + ":" + this.port);
                                }
                                socket.close();
                            } catch (Throwable th) {
                                if (socket2 != null) {
                                    socket2.close();
                                }
                                if (LOGGER.isLoggable(Level.INFO)) {
                                    LOGGER.info("Num tweets flushed = " + tweetGeneratorForSpatialIndexEvaluation.getNumFlushedTweets() + " in " + this.dataGenDuration + " seconds from " + InetAddress.getLocalHost() + " to " + this.host + ":" + this.port);
                                }
                                throw th;
                            }
                        } catch (Throwable th2) {
                            th2.printStackTrace();
                            socket.close();
                        }
                        this.sem.release();
                    } catch (Throwable th3) {
                        System.err.println("Error connecting to " + this.host + ":" + this.port);
                        th3.printStackTrace();
                        this.sem.release();
                    }
                } catch (Throwable th4) {
                    r8.close();
                    throw th4;
                }
            } catch (Throwable th5) {
                this.sem.release();
                throw th5;
            }
        }

        private void sendReached(Socket socket) throws IOException {
            new DataOutputStream(socket.getOutputStream()).writeInt(OrchestratorDGProtocol.REACHED.ordinal());
            socket.getOutputStream().flush();
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Sent " + OrchestratorDGProtocol.REACHED + " to " + socket.getRemoteSocketAddress());
            }
        }

        private void receiveResume(Socket socket) throws IOException {
            OrchestratorDGProtocol orchestratorDGProtocol = OrchestratorDGProtocol.values()[new DataInputStream(socket.getInputStream()).readInt()];
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Received " + orchestratorDGProtocol + " from " + socket.getRemoteSocketAddress());
            }
            if (orchestratorDGProtocol != OrchestratorDGProtocol.RESUME) {
                throw new IllegalStateException("Unknown protocol message received: " + orchestratorDGProtocol);
            }
        }

        private void sendStopped(Socket socket) throws IOException {
            new DataOutputStream(socket.getOutputStream()).writeInt(OrchestratorDGProtocol.STOPPED.ordinal());
            socket.getOutputStream().flush();
            if (LOGGER.isLoggable(Level.INFO)) {
                LOGGER.info("Sent " + OrchestratorDGProtocol.STOPPED + " to " + socket.getRemoteSocketAddress());
            }
        }
    }

    /* loaded from: input_file:org/apache/asterix/experiment/client/SocketTweetGenerator$Mode.class */
    private enum Mode {
        TIME,
        DATA
    }

    public SocketTweetGenerator(SocketTweetGeneratorConfig socketTweetGeneratorConfig) {
        this.partitionRangeStart = socketTweetGeneratorConfig.getPartitionRangeStart();
        this.dataGenDuration = socketTweetGeneratorConfig.getDataGenDuration();
        this.queryGenDuration = socketTweetGeneratorConfig.getQueryGenDuration();
        this.startDataInterval = socketTweetGeneratorConfig.getDataInterval();
        this.nDataIntervals = socketTweetGeneratorConfig.getNIntervals();
        this.orchHost = socketTweetGeneratorConfig.getOrchestratorHost();
        this.orchPort = socketTweetGeneratorConfig.getOrchestratorPort();
        this.receiverAddresses = socketTweetGeneratorConfig.getAddresses();
        this.mode = this.startDataInterval > 0 ? Mode.DATA : Mode.TIME;
        this.openStreetMapFilePath = socketTweetGeneratorConfig.getOpenStreetMapFilePath();
        this.locationSampleInterval = socketTweetGeneratorConfig.getLocationSampleInterval();
        this.recordCountPerBatchDuringIngestionOnly = socketTweetGeneratorConfig.getRecordCountPerBatchDuringIngestionOnly();
        this.recordCountPerBatchDuringQuery = socketTweetGeneratorConfig.getRecordCountPerBatchDuringQuery();
        this.dataGenSleepTimeDuringIngestionOnly = socketTweetGeneratorConfig.getDataGenSleepTimeDuringIngestionOnly();
        this.dataGenSleepTimeDuringQuery = socketTweetGeneratorConfig.getDataGenSleepTimeDuringQuery();
    }

    public void start() throws Exception {
        Semaphore semaphore = new Semaphore((this.receiverAddresses.size() - 1) * (-1));
        int i = 0;
        for (Pair<String, Integer> pair : this.receiverAddresses) {
            this.threadPool.submit(new DataGenerator(this.mode, semaphore, (String) pair.getLeft(), ((Integer) pair.getRight()).intValue(), i + this.partitionRangeStart, this.dataGenDuration, this.queryGenDuration, this.nDataIntervals, this.startDataInterval, this.orchHost, this.orchPort, this.openStreetMapFilePath, this.locationSampleInterval, this.recordCountPerBatchDuringIngestionOnly, this.recordCountPerBatchDuringQuery, this.dataGenSleepTimeDuringIngestionOnly, this.dataGenSleepTimeDuringQuery));
            i++;
        }
        semaphore.acquire();
    }
}
