/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.trogdor.workload;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.JsonNode;
import java.util.Collections;
import java.util.HashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.internals.KafkaFutureImpl;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.utils.ThreadUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.trogdor.common.JsonUtil;
import org.apache.kafka.trogdor.common.Platform;
import org.apache.kafka.trogdor.common.WorkerUtils;
import org.apache.kafka.trogdor.task.TaskWorker;
import org.apache.kafka.trogdor.task.WorkerStatusTracker;
import org.apache.kafka.trogdor.workload.AclWorkloadSpec;
import org.apache.kafka.trogdor.workload.Throttle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AclBenchWorker
implements TaskWorker {
    private static final Logger log = LoggerFactory.getLogger(AclBenchWorker.class);
    private static final int THROTTLE_PERIOD_MS = 100;
    private static final Time TIME = Time.SYSTEM;
    private static final AtomicLong COUNTER = new AtomicLong(0L);
    private final String id;
    private final AclWorkloadSpec spec;
    private final AtomicBoolean running = new AtomicBoolean(false);
    private Future<?> statusUpdaterFuture;
    private ExecutorService workerExecutor;
    private ScheduledExecutorService statusUpdaterExecutor;
    private WorkerStatusTracker status;
    private KafkaFutureImpl<String> doneFuture;
    private long totalCalls;
    private long startTimeMs;

    public AclBenchWorker(String id, AclWorkloadSpec spec) {
        this.id = id;
        this.spec = spec;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void start(Platform platform, WorkerStatusTracker status, KafkaFutureImpl<String> doneFuture) {
        if (!this.running.compareAndSet(false, true)) {
            throw new IllegalStateException("AclWorkloadSpec is already running.");
        }
        AclBenchWorker aclBenchWorker = this;
        synchronized (aclBenchWorker) {
            this.totalCalls = 0L;
            this.startTimeMs = TIME.milliseconds();
        }
        log.info("{}: Activating AclWorkloadSpec.", (Object)this.id);
        try {
            this.status = status;
            this.doneFuture = doneFuture;
            this.validateConfigs();
            this.workerExecutor = Executors.newFixedThreadPool(this.spec.noOfThreads(), ThreadUtils.createThreadFactory((String)"AclBenchWorker%d", (boolean)false));
            for (int i = 0; i < this.spec.noOfThreads(); ++i) {
                this.workerExecutor.submit(new Worker());
            }
            this.statusUpdaterExecutor = Executors.newScheduledThreadPool(1, ThreadUtils.createThreadFactory((String)"StatusUpdaterWorkerThread%d", (boolean)false));
            this.statusUpdaterFuture = this.statusUpdaterExecutor.scheduleAtFixedRate(new StatusUpdater(), 30L, 10L, TimeUnit.MILLISECONDS);
        }
        catch (Throwable e) {
            WorkerUtils.abort(log, "AclBenchWorker", e, doneFuture);
        }
    }

    private void validateConfigs() {
        if (this.spec.targetOperationsPerSec() <= 0) {
            throw new ConfigException("Can't have targetOperationsPerSec <= 0.");
        }
    }

    private int runAclOperations(AdminClient adminClient, long index) throws ExecutionException, InterruptedException {
        AclBinding aclBinding = new AclBinding(new ResourcePattern(ResourceType.TOPIC, "Topic-" + index, PatternType.PREFIXED), new AccessControlEntry("User:User-" + index, "*", AclOperation.WRITE, AclPermissionType.ALLOW));
        log.debug("Adding aclBinding: {} ", (Object)aclBinding);
        adminClient.createAcls(Collections.singletonList(aclBinding)).all().get();
        if (this.spec.aclDeletes()) {
            log.debug("Deleting aclBinding: {} ", (Object)aclBinding);
            adminClient.deleteAcls(Collections.singletonList(aclBinding.toFilter())).all().get();
            return 2;
        }
        return 1;
    }

    private AdminClient createAdminClient() {
        HashMap<String, String> props = new HashMap<String, String>();
        props.put("bootstrap.servers", this.spec.bootstrapServers());
        if (this.spec.adminJaasConfig() != null && !this.spec.adminJaasConfig().isEmpty()) {
            props.put("sasl.jaas.config", this.spec.adminJaasConfig());
        }
        if (this.spec.saslMechanism() != null && !this.spec.saslMechanism().isEmpty()) {
            props.put("sasl.mechanism", this.spec.saslMechanism());
        }
        if (this.spec.securityProtocol() != null && !this.spec.securityProtocol().isEmpty()) {
            props.put("security.protocol", this.spec.securityProtocol());
        }
        if (this.spec.saslMechanism() != null && this.spec.saslMechanism().equals("OAUTHBEARER")) {
            props.put("sasl.login.callback.handler.class", "io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler");
        }
        return AdminClient.create(props);
    }

    @Override
    public void stop(Platform platform) throws Exception {
        if (!this.running.compareAndSet(true, false)) {
            throw new IllegalStateException("AclBenchWorker is not running.");
        }
        log.info("{}: Deactivating AclBenchWorker.", (Object)this.id);
        this.doneFuture.complete((Object)"");
        this.statusUpdaterFuture.cancel(false);
        this.statusUpdaterExecutor.shutdown();
        this.statusUpdaterExecutor.awaitTermination(1L, TimeUnit.DAYS);
        this.statusUpdaterExecutor = null;
        this.workerExecutor.shutdownNow();
        this.workerExecutor.awaitTermination(1L, TimeUnit.DAYS);
        new StatusUpdater().run();
        this.workerExecutor = null;
        this.doneFuture = null;
        long lastTimeMs = Time.SYSTEM.milliseconds();
        double callsPerSec = (double)this.totalCalls * 1000.0 / (double)(lastTimeMs - this.startTimeMs);
        log.info("Achieved CallsPerSec : {}, minSupportedOpsPerSec : {}", (Object)callsPerSec, (Object)this.spec.minSupportedOpsPerSec());
        if (callsPerSec < (double)this.spec.minSupportedOpsPerSec()) {
            throw new RuntimeException("Minimum supported operations/sec is " + this.spec.minSupportedOpsPerSec() + " , but got " + callsPerSec);
        }
        log.info("{}: Deactivated AclBenchWorker.", (Object)this.id);
    }

    public static class StatusData {
        private final double callsPerSec;
        private final double totalCalls;

        @JsonCreator
        StatusData(@JsonProperty(value="totalCalls") double totalCalls, @JsonProperty(value="callsPerSec") double callsPerSec) {
            this.callsPerSec = callsPerSec;
            this.totalCalls = totalCalls;
        }

        @JsonProperty
        public double callsPerSec() {
            return this.callsPerSec;
        }

        @JsonProperty
        public double totalCalls() {
            return this.totalCalls;
        }
    }

    private class StatusUpdater
    implements Runnable {
        private StatusUpdater() {
        }

        @Override
        public void run() {
            try {
                long lastTimeMs = Time.SYSTEM.milliseconds();
                JsonNode node = JsonUtil.JSON_SERDE.valueToTree((Object)new StatusData(AclBenchWorker.this.totalCalls, (double)AclBenchWorker.this.totalCalls * 1000.0 / (double)(lastTimeMs - AclBenchWorker.this.startTimeMs)));
                AclBenchWorker.this.status.update(node);
            }
            catch (Exception e) {
                WorkerUtils.abort(log, "StatusUpdater", e, (KafkaFutureImpl<String>)AclBenchWorker.this.doneFuture);
            }
        }
    }

    class Worker
    implements Runnable {
        Worker() {
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         * Enabled aggressive block sorting
         * Enabled unnecessary exception pruning
         * Enabled aggressive exception aggregation
         */
        @Override
        public void run() {
            int perPeriod = WorkerUtils.perSecToPerPeriod((float)AclBenchWorker.this.spec.targetOperationsPerSec() / (float)AclBenchWorker.this.spec.noOfThreads(), 100L);
            Throttle throttle = new Throttle(perPeriod, 100);
            AdminClient adminClient = null;
            try {
                adminClient = AclBenchWorker.this.createAdminClient();
                while (!AclBenchWorker.this.doneFuture.isDone()) {
                    throttle.increment();
                    if (AclBenchWorker.this.spec.aclDeletes()) {
                        throttle.increment();
                    }
                    int noOfOps = AclBenchWorker.this.runAclOperations(adminClient, COUNTER.incrementAndGet());
                    AclBenchWorker aclBenchWorker = AclBenchWorker.this;
                    synchronized (aclBenchWorker) {
                        AclBenchWorker.this.totalCalls = AclBenchWorker.this.totalCalls + (long)noOfOps;
                    }
                }
                return;
            }
            catch (Throwable e) {
                Utils.closeQuietly((AutoCloseable)adminClient, (String)"AdminClient");
                WorkerUtils.abort(log, "AclBenchWorker#Worker", e, (KafkaFutureImpl<String>)AclBenchWorker.this.doneFuture);
            }
        }
    }
}

