package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/kafka/trogdor/workload/SchemaRegistryWorker.class */
public class SchemaRegistryWorker implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(SchemaRegistryWorker.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Time TIME = Time.SYSTEM;
    private final String id;
    private final SchemaRegistryWorkloadSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private Future<?> statusUpdaterFuture;
    private ExecutorService workerExecutor;
    private ScheduledExecutorService statusUpdaterExecutor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private Platform platform;
    private Set<Integer> skippedIdentifiers;
    private AtomicLong totalCalls;
    private AtomicLong totalFailedCalls;
    private AtomicLong startTimeMs;

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/SchemaRegistryWorker$RegisterSchemaResponse.class */
    private static class RegisterSchemaResponse {
        private int id;

        @JsonCreator
        public RegisterSchemaResponse(@JsonProperty("id") int i) {
            this.id = i;
        }

        public int getId() {
            return this.id;
        }

        public void setId(int i) {
            this.id = i;
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/SchemaRegistryWorker$StatusData.class */
    public static class StatusData {
        private final long totalCalls;
        private final long totalFailedCalls;
        private final double callsPerSec;
        private final String schemaRegistryUrl;

        @JsonCreator
        StatusData(@JsonProperty("schemaRegistryUrl") String str, @JsonProperty("totalCalls") long j, @JsonProperty("totalFailedCalls") long j2, @JsonProperty("callsPerSec") double d) {
            this.schemaRegistryUrl = str;
            this.totalCalls = j;
            this.totalFailedCalls = j2;
            this.callsPerSec = d;
        }

        @JsonProperty
        public String schemaRegistryUrl() {
            return this.schemaRegistryUrl;
        }

        @JsonProperty
        public long totalCalls() {
            return this.totalCalls;
        }

        @JsonProperty
        public long totalFailedCalls() {
            return this.totalFailedCalls;
        }

        @JsonProperty
        public double callsPecSec() {
            return this.callsPerSec;
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/SchemaRegistryWorker$StatusUpdater.class */
    private class StatusUpdater implements Runnable {
        private StatusUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                SchemaRegistryWorker.this.status.update(JsonUtil.JSON_SERDE.valueToTree(new StatusData(SchemaRegistryWorker.this.spec.schemaRegistryUrl(), SchemaRegistryWorker.this.totalCalls.get(), SchemaRegistryWorker.this.totalFailedCalls.get(), (SchemaRegistryWorker.this.totalCalls.get() * 1000.0d) / (Time.SYSTEM.milliseconds() - SchemaRegistryWorker.this.startTimeMs.get()))));
            } catch (Exception e) {
                WorkerUtils.abort(SchemaRegistryWorker.log, "StatusUpdater", e, SchemaRegistryWorker.this.doneFuture);
            }
        }
    }

    /* loaded from: input_file:org/apache/kafka/trogdor/workload/SchemaRegistryWorker$Worker.class */
    class Worker implements Runnable {
        Worker() {
        }

        @Override // java.lang.Runnable
        public void run() {
            for (int i = 0; i < SchemaRegistryWorker.this.spec.numSchemas(); i++) {
                try {
                    String registerSchema = SchemaRegistryWorker.this.registerSchema(i);
                    boolean z = true;
                    SchemaRegistryWorker.log.info(registerSchema);
                    if (SchemaRegistryWorker.this.spec.parseResult()) {
                        try {
                            SchemaRegistryWorker.log.info("ID is {}", Integer.valueOf(((RegisterSchemaResponse) SchemaRegistryWorker.OBJECT_MAPPER.readValue(registerSchema, RegisterSchemaResponse.class)).getId()));
                        } catch (Exception e) {
                            SchemaRegistryWorker.this.skippedIdentifiers.add(Integer.valueOf(i));
                            z = false;
                        }
                    }
                    SchemaRegistryWorker.this.totalCalls.getAndIncrement();
                    if (!z) {
                        SchemaRegistryWorker.this.totalFailedCalls.getAndIncrement();
                    }
                } catch (Throwable th) {
                    WorkerUtils.abort(SchemaRegistryWorker.log, "RegisterSchemas", th, SchemaRegistryWorker.this.doneFuture);
                }
            }
            for (int i2 = 0; i2 < SchemaRegistryWorker.this.spec.numSchemas(); i2++) {
                if (!SchemaRegistryWorker.this.skippedIdentifiers.contains(Integer.valueOf(i2))) {
                    boolean z2 = true;
                    String deleteSchema = SchemaRegistryWorker.this.deleteSchema(i2);
                    SchemaRegistryWorker.log.info(deleteSchema);
                    if (SchemaRegistryWorker.this.spec.parseResult()) {
                        try {
                            Integer.parseInt(deleteSchema);
                        } catch (Exception e2) {
                            z2 = false;
                        }
                    }
                    SchemaRegistryWorker.this.totalCalls.getAndIncrement();
                    if (!z2) {
                        SchemaRegistryWorker.this.totalFailedCalls.getAndIncrement();
                    }
                }
            }
            SchemaRegistryWorker.this.doneFuture.complete("");
        }
    }

    public SchemaRegistryWorker(String str, SchemaRegistryWorkloadSpec schemaRegistryWorkloadSpec) {
        this.id = str;
        this.spec = schemaRegistryWorkloadSpec;
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void start(Platform platform, WorkerStatusTracker workerStatusTracker, KafkaFutureImpl<String> kafkaFutureImpl) {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("SchemaRegistryWorker is already running");
        }
        this.totalCalls = new AtomicLong(0L);
        this.totalFailedCalls = new AtomicLong(0L);
        this.startTimeMs = new AtomicLong(TIME.milliseconds());
        this.skippedIdentifiers = ConcurrentHashMap.newKeySet();
        log.info("{}: Activating SchemaRegistryWorker.", this.id);
        try {
            this.platform = platform;
            this.status = workerStatusTracker;
            this.doneFuture = kafkaFutureImpl;
            validateConfigs();
            this.workerExecutor = Executors.newFixedThreadPool(1, ThreadUtils.createThreadFactory("SchemaRegistryWorker%d", false));
            this.workerExecutor.submit(new Worker());
            this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory("StatusUpdaterWorkerThread%d", false));
            this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(new StatusUpdater(), 30L, 30L, TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            WorkerUtils.abort(log, "SchemaRegistryWorker", th, kafkaFutureImpl);
        }
    }

    private void validateConfigs() {
        if (this.spec.targetCallsPerSec() <= 0) {
            throw new ConfigException("Can't have targetCallsPerSec <= 0.");
        }
        if (this.spec.schemaRegistryUrl() == null || this.spec.schemaRegistryUrl().length() == 0) {
            throw new ConfigException("schemaRegistryUrl can't be empty.");
        }
        if (this.spec.numSchemas() <= 0) {
            throw new ConfigException("Can't have numSchemas <= 0.");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String registerSchema(int i) throws Throwable {
        String format = String.format("%s/subjects/test-subject-%d/versions", this.spec.schemaRegistryUrl(), Integer.valueOf(i));
        log.info("Registering new schema to test-subject-{} at {}", Integer.valueOf(i), format);
        return this.platform.runCommand(new String[]{"curl", "-X", "POST", "-H", "Content-Type:application/vnd.schemaregistry.v1+json", "--data", OBJECT_MAPPER.writeValueAsString(Collections.singletonMap("schema", String.format("{\"type\": \"record\", \"name\": \"test%d\", \"fields\": [{\"type\": \"string\", \"name\": \"f%d\"}]}", Integer.valueOf(i), Integer.valueOf(i)))), format});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public String deleteSchema(int i) throws Throwable {
        String format = String.format("%s/subjects/test-subject-%d/versions/1", this.spec.schemaRegistryUrl(), Integer.valueOf(i));
        log.info("Deleting new schema to test-subject-{} at {}", Integer.valueOf(i), format);
        return this.platform.runCommand(new String[]{"curl", "-X", "DELETE", format});
    }

    @Override // org.apache.kafka.trogdor.task.TaskWorker
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("SchemaRegistryWorker is not running.");
        }
        log.info("{}: Deactivating SchemaRegistryWorker", this.id);
        this.doneFuture.complete("");
        this.statusUpdaterFuture.cancel(false);
        this.statusUpdaterExecutor.shutdown();
        this.statusUpdaterExecutor.awaitTermination(1L, TimeUnit.DAYS);
        this.statusUpdaterExecutor = null;
        this.workerExecutor.shutdownNow();
        this.workerExecutor.awaitTermination(1L, TimeUnit.DAYS);
        new StatusUpdater().run();
        this.workerExecutor = null;
        this.doneFuture = null;
        log.info("{}: Deactivated SchemaRegistryWorker.", this.id);
    }
}
