package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.io.Reader;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Random;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.KafkaThread;
import org.apache.kafka.common.utils.Shell;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker.class */
public class RestProxyProduceV3CurlWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(RestProxyProduceV3CurlWorker.class);
    private static final Time TIME = Time.SYSTEM;
    public static final List<String> MESSAGE_TYPES = Collections.unmodifiableList(Arrays.asList("BINARY", "JSON"));
    public static final ObjectMapper RESPONSE_OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    public static final String STREAMING_PROCESS_TYPE = "STREAMING";
    public static final String NON_STREAMING_PROCESS_TYPE = "NON_STREAMING";
    private final String id;
    private final RestProxyProduceV3CurlWorkloadSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private Future<?> statusUpdaterFuture;
    private ExecutorService workerExecutor;
    private ScheduledExecutorService statusUpdaterExecutor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    volatile List<RestProxyProduceV3Process> clientProcesses;

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$MessageProvider.class */
    private static class MessageProvider {
        private static final Random RNG = new Random();
        private static final String MESSAGE_TEMPLATE = "{ \"value\" : { \"type\" : \"%s\", \"data\" : \"%s\" }}\n";

        private MessageProvider() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static String getMessage(String str, int i) {
            if (!str.equals("BINARY")) {
                return str.equals("JSON") ? String.format(MESSAGE_TEMPLATE, "JSON", ((StringBuilder) RNG.ints(97, 123).limit(i).collect(StringBuilder::new, (v0, v1) -> {
                    v0.appendCodePoint(v1);
                }, (v0, v1) -> {
                    v0.append(v1);
                })).toString()) : "";
            }
            byte[] bArr = new byte[i];
            RNG.nextBytes(bArr);
            return String.format(MESSAGE_TEMPLATE, "BINARY", Base64.getEncoder().encodeToString(bArr));
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$ProcessCoordinator.class */
    class ProcessCoordinator implements Runnable {
        ProcessCoordinator() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                setup();
                ArrayList arrayList = new ArrayList();
                RestProxyProduceV3CurlWorker.this.clientProcesses = Collections.unmodifiableList((List) IntStream.range(0, RestProxyProduceV3CurlWorker.this.spec.numClients()).mapToObj(i -> {
                    try {
                        if (RestProxyProduceV3CurlWorker.this.spec.processType().equals(RestProxyProduceV3CurlWorker.STREAMING_PROCESS_TYPE)) {
                            RestProxyProduceV3CurlWorker.log.info("Creating Streaming process");
                            RestProxyProduceV3Process startStreamingProcess = RestProxyProduceV3CurlWorker.this.startStreamingProcess(getStreamingCommand());
                            Thread thread = new Thread(startStreamingProcess);
                            thread.start();
                            arrayList.add(thread);
                            return startStreamingProcess;
                        }
                        if (!RestProxyProduceV3CurlWorker.this.spec.processType().equals(RestProxyProduceV3CurlWorker.NON_STREAMING_PROCESS_TYPE)) {
                            return null;
                        }
                        RestProxyProduceV3CurlWorker.log.info("Creating Non Streaming process");
                        RestProxyProduceV3Process startProcess = RestProxyProduceV3CurlWorker.this.startProcess(getNonStreamingCommand());
                        Thread thread2 = new Thread(startProcess);
                        thread2.start();
                        arrayList.add(thread2);
                        return startProcess;
                    } catch (IOException e) {
                        WorkerUtils.abort(RestProxyProduceV3CurlWorker.log, toString(), e, RestProxyProduceV3CurlWorker.this.doneFuture);
                        return null;
                    }
                }).filter(restProxyProduceV3Process -> {
                    return restProxyProduceV3Process != null;
                }).collect(Collectors.toList()));
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    ((Thread) it.next()).join();
                }
            } catch (Throwable th) {
                WorkerUtils.abort(RestProxyProduceV3CurlWorker.log, toString(), th, RestProxyProduceV3CurlWorker.this.doneFuture);
            }
            RestProxyProduceV3CurlWorker.this.doneFuture.complete("");
        }

        private String[] getStreamingCommand() {
            String format = String.format("%s/v3/clusters/%s/topics/%s/records", RestProxyProduceV3CurlWorker.this.spec.restProxyUrl(), RestProxyProduceV3CurlWorker.this.spec.clusterId(), RestProxyProduceV3CurlWorker.this.spec.topicName());
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(Arrays.asList("curl", "-s", "-N", "-X", "POST", "-H", "Content-Type: application/json", "-H", "Transfer-Encoding: chunked"));
            if (!RestProxyProduceV3CurlWorker.this.spec.userCredential().isEmpty()) {
                arrayList.addAll(Arrays.asList("-u", RestProxyProduceV3CurlWorker.this.spec.userCredential()));
            }
            arrayList.addAll(Arrays.asList(format, "-T-"));
            return (String[]) arrayList.toArray(new String[0]);
        }

        private String[] getNonStreamingCommand() {
            String format = String.format("%s/v3/clusters/%s/topics/%s/records", RestProxyProduceV3CurlWorker.this.spec.restProxyUrl(), RestProxyProduceV3CurlWorker.this.spec.clusterId(), RestProxyProduceV3CurlWorker.this.spec.topicName());
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(Arrays.asList("curl", "-s", "-N", "-X", "POST", "-H", "Content-Type: application/json"));
            if (!RestProxyProduceV3CurlWorker.this.spec.userCredential().isEmpty()) {
                arrayList.addAll(Arrays.asList("-u", RestProxyProduceV3CurlWorker.this.spec.userCredential()));
            }
            arrayList.addAll(Arrays.asList(format, "-d"));
            return (String[]) arrayList.toArray(new String[0]);
        }

        private void setup() throws IOException {
            RestProxyProduceV3CurlWorker.log.info(this + " Creating topic {} with replication factor {}  and {} partitions.", new Object[]{RestProxyProduceV3CurlWorker.this.spec.topicName(), Integer.valueOf(RestProxyProduceV3CurlWorker.this.spec.numReplicas()), Integer.valueOf(RestProxyProduceV3CurlWorker.this.spec.numPartitions())});
            Shell.execCommand(getTopicCreateCommand());
        }

        private String[] getTopicCreateCommand() throws JsonProcessingException {
            String format = String.format("%s/v3/clusters/%s/topics", RestProxyProduceV3CurlWorker.this.spec.restProxyUrl(), RestProxyProduceV3CurlWorker.this.spec.clusterId());
            String writeValueAsString = new ObjectMapper().writeValueAsString(new HashMap<String, String>() { // from class: org.apache.kafka.trogdor.workload.RestProxyProduceV3CurlWorker.ProcessCoordinator.1
                {
                    put("topic_name", RestProxyProduceV3CurlWorker.this.spec.topicName());
                    if (RestProxyProduceV3CurlWorker.this.spec.numPartitions() != -1) {
                        put("partitions_count", String.valueOf(RestProxyProduceV3CurlWorker.this.spec.numPartitions()));
                    }
                    if (RestProxyProduceV3CurlWorker.this.spec.numReplicas() != -1) {
                        put("replication_factor", String.valueOf(RestProxyProduceV3CurlWorker.this.spec.numReplicas()));
                    }
                }
            });
            ArrayList arrayList = new ArrayList();
            arrayList.addAll(Arrays.asList("curl", "-X", "POST", "-H", "Content-Type:application/json", "-d", writeValueAsString));
            if (!RestProxyProduceV3CurlWorker.this.spec.userCredential().isEmpty()) {
                arrayList.addAll(Arrays.asList("-u", RestProxyProduceV3CurlWorker.this.spec.userCredential()));
            }
            arrayList.add(format);
            return (String[]) arrayList.toArray(new String[0]);
        }

        public String toString() {
            return "ProcessCoordinator{}";
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$ProduceResponse.class */
    public static class ProduceResponse {

        @JsonProperty("offset")
        private long offset = -1;

        @JsonProperty("error_code")
        private int errorCode = -1;

        private ProduceResponse() {
        }

        public void setOffset(long j) {
            this.offset = j;
        }

        public void setErrorCode(int i) {
            this.errorCode = i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$ReaderThreadProvider.class */
    public static class ReaderThreadProvider {
        private ReaderThreadProvider() {
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Thread getStderrReader(BufferedReader bufferedReader, String str, String str2, AtomicBoolean atomicBoolean) {
            KafkaThread nonDaemon = KafkaThread.nonDaemon("kafka-rest-produce-err-thread-" + str2, () -> {
                try {
                    String readLine = bufferedReader.readLine();
                    while (readLine != null) {
                        if (Thread.currentThread().isInterrupted() || atomicBoolean.get()) {
                            break;
                        }
                        RestProxyProduceV3CurlWorker.log.error(str + " clientid: " + str2 + " curl stderr:" + readLine);
                        readLine = bufferedReader.readLine();
                    }
                } catch (IOException e) {
                    RestProxyProduceV3CurlWorker.log.warn(str + " Error reading stderr", e);
                }
            });
            nonDaemon.start();
            return nonDaemon;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static Thread getStdoutReader(BufferedReader bufferedReader, String str, String str2, AtomicBoolean atomicBoolean, AtomicLong atomicLong, AtomicLong atomicLong2, AtomicLong atomicLong3, AtomicLong atomicLong4, AtomicLong atomicLong5) {
            KafkaThread nonDaemon = KafkaThread.nonDaemon("kafka-rest-produce-out-thread-" + str2, () -> {
                try {
                    String readLine = bufferedReader.readLine();
                    while (readLine != null) {
                        if (Thread.currentThread().isInterrupted() || atomicBoolean.get()) {
                            break;
                        }
                        RestProxyProduceV3CurlWorker.log.trace(readLine);
                        atomicLong.getAndIncrement();
                        if (atomicLong2.get() == -1) {
                            atomicLong2.set(RestProxyProduceV3CurlWorker.TIME.milliseconds());
                        }
                        atomicLong3.set(RestProxyProduceV3CurlWorker.TIME.milliseconds());
                        ProduceResponse produceResponse = (ProduceResponse) RestProxyProduceV3CurlWorker.RESPONSE_OBJECT_MAPPER.readValue(readLine, ProduceResponse.class);
                        if (produceResponse.errorCode != -1) {
                            atomicLong4.getAndIncrement();
                        }
                        if (produceResponse.offset != -1) {
                            atomicLong5.getAndIncrement();
                        }
                        readLine = bufferedReader.readLine();
                    }
                } catch (IOException e) {
                    RestProxyProduceV3CurlWorker.log.warn(str + " error reading stdout", e);
                }
            });
            nonDaemon.start();
            return nonDaemon;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$RestProxyProduceV3NonStreamingProcess.class */
    public class RestProxyProduceV3NonStreamingProcess extends RestProxyProduceV3Process {
        public RestProxyProduceV3NonStreamingProcess(String str, String[] strArr) throws IOException {
            super();
            this.clientId = str;
            this.completed = new AtomicBoolean(false);
            this.execString = strArr;
        }

        private boolean produce(String str) throws IOException, InterruptedException {
            String[] strArr = (String[]) Arrays.copyOf(this.execString, this.execString.length + 1);
            strArr[this.execString.length] = str;
            ProcessBuilder processBuilder = new ProcessBuilder(strArr);
            for (Reader reader : Arrays.asList(this.errReader, this.outReader)) {
                if (reader != null) {
                    reader.close();
                }
            }
            for (Thread thread : Arrays.asList(this.errThread, this.outThread)) {
                if (thread != null) {
                    thread.interrupt();
                    try {
                        thread.join();
                    } catch (InterruptedException e) {
                    }
                }
            }
            try {
                this.process = processBuilder.start();
                this.errReader = new BufferedReader(new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8));
                this.outReader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8));
                this.errThread = ReaderThreadProvider.getStderrReader(this.errReader, toString(), this.clientId, this.completed);
                this.outThread = ReaderThreadProvider.getStdoutReader(this.outReader, toString(), this.clientId, this.completed, this.responses, this.responseStartTimeMs, this.responseEndTimeMs, this.failedCalls, this.completedCalls);
                this.process.waitFor(10000L, TimeUnit.MILLISECONDS);
                try {
                    this.errThread.join();
                } catch (InterruptedException e2) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " errThread interrupted while reading stream", e2);
                }
                try {
                    this.outThread.join();
                } catch (InterruptedException e3) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " outThread interrupted while reading stream", e3);
                }
                return this.process.exitValue() == 0;
            } finally {
                this.errThread.interrupt();
                this.outThread.interrupt();
                try {
                    this.errReader.close();
                } catch (IOException e4) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " error while closing stream", e4);
                }
                try {
                    this.outReader.close();
                } catch (IOException e5) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " error while closing stream", e5);
                }
            }
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Throttle throttle = new Throttle(WorkerUtils.perSecToPerPeriod(RestProxyProduceV3CurlWorker.this.spec.targetCallsPerSec(), RestProxyProduceV3CurlWorker.this.spec.throttlePeriodMs()), RestProxyProduceV3CurlWorker.this.spec.throttlePeriodMs());
                this.submissionsStartTimeMs.set(RestProxyProduceV3CurlWorker.TIME.milliseconds());
                for (int i = 0; i < RestProxyProduceV3CurlWorker.this.spec.numMessages(); i++) {
                    if (!produce(MessageProvider.getMessage(RestProxyProduceV3CurlWorker.this.spec.messageType(), RestProxyProduceV3CurlWorker.this.spec.messageSize()))) {
                        this.failedCalls.getAndIncrement();
                    }
                    this.submittedCalls.getAndIncrement();
                    throttle.increment();
                }
                this.submissionEndTimeMs.set(RestProxyProduceV3CurlWorker.TIME.milliseconds());
                completeProcess();
            } catch (Throwable th) {
                WorkerUtils.abort(RestProxyProduceV3CurlWorker.log, toString(), th, RestProxyProduceV3CurlWorker.this.doneFuture);
            }
        }

        protected void completeProcess() {
            this.completed.set(true);
        }

        public String toString() {
            return String.format("RestProxyProduceV3Process{clientId='%s'}", this.clientId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$RestProxyProduceV3Process.class */
    public abstract class RestProxyProduceV3Process implements Runnable {
        String[] execString;
        String clientId;
        Process process;
        BufferedReader errReader;
        BufferedReader outReader;
        Thread errThread;
        Thread outThread;
        AtomicBoolean completed;
        AtomicLong submissionsStartTimeMs;
        AtomicLong submissionEndTimeMs;
        AtomicLong responseStartTimeMs;
        AtomicLong responseEndTimeMs;
        AtomicLong submittedCalls;
        AtomicLong responses;
        AtomicLong completedCalls;
        AtomicLong failedCalls;

        private RestProxyProduceV3Process() {
            this.submissionsStartTimeMs = new AtomicLong(-1L);
            this.submissionEndTimeMs = new AtomicLong(-1L);
            this.responseStartTimeMs = new AtomicLong(-1L);
            this.responseEndTimeMs = new AtomicLong(-1L);
            this.submittedCalls = new AtomicLong();
            this.responses = new AtomicLong();
            this.completedCalls = new AtomicLong();
            this.failedCalls = new AtomicLong();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$RestProxyProduceV3StreamingProcess.class */
    public class RestProxyProduceV3StreamingProcess extends RestProxyProduceV3Process {
        private BufferedWriter inWriter;
        private Timer timeoutTimer;
        private final long processStartTime;

        public RestProxyProduceV3StreamingProcess(String str, String[] strArr) throws IOException {
            super();
            this.timeoutTimer = new Timer();
            this.clientId = str;
            this.completed = new AtomicBoolean(false);
            this.execString = strArr;
            this.processStartTime = RestProxyProduceV3CurlWorker.TIME.milliseconds();
            establishConnection();
        }

        private void establishConnection() throws IOException {
            ProcessBuilder processBuilder = new ProcessBuilder(this.execString);
            for (Reader reader : Arrays.asList(this.errReader, this.outReader)) {
                if (reader != null) {
                    reader.close();
                }
            }
            if (this.inWriter != null) {
                this.inWriter.close();
                this.inWriter = null;
            }
            for (Thread thread : Arrays.asList(this.errThread, this.outThread)) {
                if (thread != null) {
                    thread.interrupt();
                    try {
                        thread.join();
                    } catch (InterruptedException e) {
                    }
                }
            }
            this.process = processBuilder.start();
            this.timeoutTimer.cancel();
            this.timeoutTimer = null;
            this.timeoutTimer = new Timer();
            this.timeoutTimer.schedule(new ShellTimeoutTimerTask(this.process, this.completed), (this.processStartTime + RestProxyProduceV3CurlWorker.this.spec.durationMs()) - RestProxyProduceV3CurlWorker.TIME.milliseconds());
            this.errReader = new BufferedReader(new InputStreamReader(this.process.getErrorStream(), StandardCharsets.UTF_8));
            this.outReader = new BufferedReader(new InputStreamReader(this.process.getInputStream(), StandardCharsets.UTF_8));
            this.inWriter = new BufferedWriter(new OutputStreamWriter(this.process.getOutputStream(), StandardCharsets.UTF_8));
            this.errThread = ReaderThreadProvider.getStderrReader(this.errReader, toString(), this.clientId, this.completed);
            this.outThread = ReaderThreadProvider.getStdoutReader(this.outReader, toString(), this.clientId, this.completed, this.responses, this.responseStartTimeMs, this.responseEndTimeMs, this.failedCalls, this.completedCalls);
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                Throttle throttle = new Throttle(WorkerUtils.perSecToPerPeriod(RestProxyProduceV3CurlWorker.this.spec.targetCallsPerSec(), RestProxyProduceV3CurlWorker.this.spec.throttlePeriodMs()), RestProxyProduceV3CurlWorker.this.spec.throttlePeriodMs());
                this.submissionsStartTimeMs.set(RestProxyProduceV3CurlWorker.TIME.milliseconds());
                for (int i = 0; i < RestProxyProduceV3CurlWorker.this.spec.numMessages(); i++) {
                    if (!produce(MessageProvider.getMessage(RestProxyProduceV3CurlWorker.this.spec.messageType(), RestProxyProduceV3CurlWorker.this.spec.messageSize()))) {
                        this.failedCalls.getAndIncrement();
                    }
                    this.submittedCalls.getAndIncrement();
                    throttle.increment();
                }
                this.submissionEndTimeMs.set(RestProxyProduceV3CurlWorker.TIME.milliseconds());
                Throttle throttle2 = new Throttle(1, 1000);
                for (int i2 = 0; i2 < RestProxyProduceV3CurlWorker.this.spec.responseGracePeriodSeconds(); i2++) {
                    this.inWriter.write("\n");
                    this.inWriter.flush();
                    throttle2.increment();
                }
                completeProcess();
            } catch (Throwable th) {
                WorkerUtils.abort(RestProxyProduceV3CurlWorker.log, toString(), th, RestProxyProduceV3CurlWorker.this.doneFuture);
            }
        }

        private boolean produce(String str) {
            try {
                if (!this.process.isAlive()) {
                    establishConnection();
                }
                this.inWriter.write(str);
                this.inWriter.flush();
                return true;
            } catch (IOException e) {
                RestProxyProduceV3CurlWorker.log.info(this + " could not produce", e);
                return false;
            }
        }

        protected void completeProcess() {
            try {
                this.completed.set(true);
                this.process.destroy();
                try {
                    this.errThread.join();
                } catch (InterruptedException e) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " errThread interrupted while reading stream", e);
                }
                try {
                    this.outThread.join();
                } catch (InterruptedException e2) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " outThread interrupted while reading stream", e2);
                }
            } finally {
                if (this.timeoutTimer != null) {
                    this.timeoutTimer.cancel();
                }
                if (!this.completed.get()) {
                    this.errThread.interrupt();
                    this.outThread.interrupt();
                }
                try {
                    this.errReader.close();
                } catch (IOException e3) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " error while closing stream", e3);
                }
                try {
                    this.outReader.close();
                } catch (IOException e4) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " error while closing stream", e4);
                }
                try {
                    this.inWriter.close();
                } catch (IOException e5) {
                    RestProxyProduceV3CurlWorker.log.warn(this + " error while closing stream", e5);
                }
            }
        }

        public String toString() {
            return String.format("RestProxyProduceV3StreamingProcess{clientId='%s'}", this.clientId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$ShellTimeoutTimerTask.class */
    public static class ShellTimeoutTimerTask extends TimerTask {
        private final Process process;
        private final AtomicBoolean completed;

        public ShellTimeoutTimerTask(Process process, AtomicBoolean atomicBoolean) {
            this.process = process;
            this.completed = atomicBoolean;
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            try {
                this.process.exitValue();
            } catch (Exception e) {
                if (this.process == null || this.completed.get()) {
                    return;
                }
                this.process.destroy();
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$StatusData.class */
    public static class StatusData {
        private final long totalSubmittedCalls;
        private final long totalCompletedCalls;
        private final long totalFailedCalls;
        private final long submissionStartTime;
        private final long submissionEndTime;
        private final long responseStartTime;
        private final long responseEndTime;
        private final double submissionsPerSec;
        private final double responsesPerSec;
        private final String restProxyUrl;

        @JsonCreator
        StatusData(@JsonProperty("restProxyUrl") String str, @JsonProperty("responseEndTime") long j, @JsonProperty("responsesPerSec") double d, @JsonProperty("responseStartTime") long j2, @JsonProperty("submissionEndTime") long j3, @JsonProperty("submissionsPerSec") double d2, @JsonProperty("submissionStartTime") long j4, @JsonProperty("totalCompletedCalls") long j5, @JsonProperty("totalFailedCalls") long j6, @JsonProperty("totalSubmittedCalls") long j7) {
            this.restProxyUrl = str;
            this.totalSubmittedCalls = j7;
            this.totalCompletedCalls = j5;
            this.totalFailedCalls = j6;
            this.submissionsPerSec = d2;
            this.submissionStartTime = j4;
            this.submissionEndTime = j3;
            this.responseStartTime = j2;
            this.responseEndTime = j;
            this.responsesPerSec = d;
        }

        @JsonProperty
        public String restProxyUrl() {
            return this.restProxyUrl;
        }

        @JsonProperty
        public long totalSubmittedCalls() {
            return this.totalSubmittedCalls;
        }

        @JsonProperty
        public long totalCompletedCalls() {
            return this.totalCompletedCalls;
        }

        @JsonProperty
        public long totalFailedCalls() {
            return this.totalFailedCalls;
        }

        @JsonProperty
        public double submissionsPerSec() {
            return this.submissionsPerSec;
        }

        @JsonProperty
        public long submissionStartTime() {
            return this.submissionStartTime;
        }

        @JsonProperty
        public long submissionEndTime() {
            return this.submissionEndTime;
        }

        @JsonProperty
        public long responseStartTime() {
            return this.responseStartTime;
        }

        @JsonProperty
        public long responseEndTime() {
            return this.responseEndTime;
        }

        @JsonProperty
        public double responsesPerSec() {
            return this.responsesPerSec;
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/RestProxyProduceV3CurlWorker$StatusUpdater.class */
    private class StatusUpdater implements Runnable {
        private StatusUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (RestProxyProduceV3CurlWorker.this.clientProcesses == null || RestProxyProduceV3CurlWorker.this.clientProcesses.isEmpty()) {
                    return;
                }
                long j = Long.MAX_VALUE;
                long j2 = -1;
                long j3 = Long.MAX_VALUE;
                long j4 = -1;
                long j5 = 0;
                long j6 = 0;
                long j7 = 0;
                long j8 = 0;
                for (RestProxyProduceV3Process restProxyProduceV3Process : RestProxyProduceV3CurlWorker.this.clientProcesses) {
                    j = Math.min(restProxyProduceV3Process.submissionsStartTimeMs.get(), j);
                    j2 = Math.max(restProxyProduceV3Process.submissionEndTimeMs.get(), j2);
                    j3 = Math.min(restProxyProduceV3Process.responseStartTimeMs.get(), j3);
                    j4 = Math.max(restProxyProduceV3Process.responseEndTimeMs.get(), j4);
                    j5 += restProxyProduceV3Process.submittedCalls.get();
                    j6 += restProxyProduceV3Process.responses.get();
                    j7 += restProxyProduceV3Process.completedCalls.get();
                    j8 += restProxyProduceV3Process.failedCalls.get();
                }
                RestProxyProduceV3CurlWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree(new StatusData(RestProxyProduceV3CurlWorker.this.spec.restProxyUrl(), j4, (j6 * 1000.0d) / ((j4 == -1 ? Time.SYSTEM.milliseconds() : j4) - j3), j3, j2, (j5 * 1000.0d) / ((j2 == -1 ? Time.SYSTEM.milliseconds() : j2) - j), j, j7, j8, j5)));
            } catch (Exception e) {
                RestProxyProduceV3CurlWorker.log.warn(this + " failed to update status, this will be retried", e);
            }
        }

        public String toString() {
            return "StatusUpdater{}";
        }
    }

    public RestProxyProduceV3CurlWorker(String str, RestProxyProduceV3CurlWorkloadSpec restProxyProduceV3CurlWorkloadSpec) {
        this.id = (String) Objects.requireNonNull(str);
        this.spec = (RestProxyProduceV3CurlWorkloadSpec) Objects.requireNonNull(restProxyProduceV3CurlWorkloadSpec);
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException(this + " is already running");
        }
        log.info("{}: Activating with {}", this, this.spec);
        try {
            validateConfigs();
        } catch (ConfigException e) {
            WorkerUtils.abort(log, toString(), e, kafkaFutureImpl);
        }
        try {
            this.status = workerStatusTracker;
            this.doneFuture = kafkaFutureImpl;
            this.workerExecutor = Executors.newSingleThreadExecutor(ThreadUtils.createThreadFactory("RestProxyWorker%d", false));
            this.workerExecutor.submit(new ProcessCoordinator());
            this.statusUpdaterExecutor = Executors.newSingleThreadScheduledExecutor(ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
            this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(new StatusUpdater(), 30L, 30L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            this.workerExecutor.shutdown();
            this.workerExecutor = null;
            this.statusUpdaterExecutor.shutdown();
            this.statusUpdaterExecutor = null;
            WorkerUtils.abort(log, toString(), th, kafkaFutureImpl);
        }
    }

    private void validateConfigs() {
        if (this.spec.targetCallsPerSec() <= 0) {
            throw new ConfigException("Can't have targetCallsPerSec <= 0.");
        }
        if (this.spec.restProxyUrl() == null || this.spec.restProxyUrl().length() == 0) {
            throw new ConfigException("restProxyUrl can't be empty.");
        }
        if (this.spec.numMessages() <= 0) {
            throw new ConfigException("Can't have numMessages <= 0.");
        }
        if (this.spec.numClients() <= 0) {
            throw new ConfigException("Can't have numClients <= 0.");
        }
        if (this.spec.messageSize() < 1) {
            throw new ConfigException("Can't have messageSize < 1.");
        }
        if (!MESSAGE_TYPES.contains(this.spec.messageType())) {
            throw new ConfigException("Unknown message type: " + this.spec.messageType());
        }
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException(this + " is not running.");
        }
        log.info("{}: Deactivating", this);
        this.doneFuture.complete("");
        this.statusUpdaterFuture.cancel(false);
        this.statusUpdaterExecutor.shutdown();
        this.statusUpdaterExecutor.awaitTermination(1L, TimeUnit.MINUTES);
        this.statusUpdaterExecutor = null;
        this.workerExecutor.shutdownNow();
        this.workerExecutor.awaitTermination(1L, TimeUnit.MINUTES);
        new StatusUpdater().run();
        this.workerExecutor = null;
        this.doneFuture = null;
        log.info("{}: Deactivated.", this);
    }

    public String toString() {
        return String.format("RestProxyProduceV3Worker{id='%s'}", this.id);
    }

    protected RestProxyProduceV3Process startStreamingProcess(String[] strArr) throws IOException {
        return new RestProxyProduceV3StreamingProcess(UUID.randomUUID().toString(), strArr);
    }

    protected RestProxyProduceV3Process startProcess(String[] strArr) throws IOException {
        return new RestProxyProduceV3NonStreamingProcess(UUID.randomUUID().toString(), strArr);
    }
}
