/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.server.metrics;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutorFactory;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.java.util.metrics.StubServiceEmitter;

public class LatchableEmitter
extends StubServiceEmitter {
    private static final Logger log = new Logger(LatchableEmitter.class);
    public static final String TYPE = "latching";
    private final ScheduledExecutorService conditionEvaluateExecutor;
    private final Set<WaitCondition> waitConditions = new HashSet<WaitCondition>();
    private final ReentrantReadWriteLock eventReadWriteLock = new ReentrantReadWriteLock(true);

    public LatchableEmitter(String service, String host, ScheduledExecutorFactory executorFactory) {
        super(service, host);
        this.conditionEvaluateExecutor = executorFactory.create(1, "LatchingEmitter-eval-%d");
    }

    public void emit(Event event) {
        super.emit(event);
        this.triggerConditionEvaluations();
    }

    public void flush() {
        this.eventReadWriteLock.writeLock().lock();
        try {
            super.flush();
        }
        finally {
            this.eventReadWriteLock.writeLock().unlock();
        }
    }

    public void close() {
        this.flush();
    }

    public void waitForEvent(Predicate<Event> condition, long timeoutMillis) {
        WaitCondition waitCondition = new WaitCondition(condition);
        this.waitConditions.add(waitCondition);
        this.triggerConditionEvaluations();
        try {
            long awaitTime;
            long l = awaitTime = timeoutMillis >= 0L ? timeoutMillis : Long.MAX_VALUE;
            if (!waitCondition.countDownLatch.await(awaitTime, TimeUnit.MILLISECONDS)) {
                throw new ISE("Timed out waiting for event", new Object[0]);
            }
        }
        catch (InterruptedException e) {
            throw new RuntimeException(e);
        }
        finally {
            this.waitConditions.remove(waitCondition);
        }
    }

    public ServiceMetricEvent waitForEvent(UnaryOperator<EventMatcher> condition) {
        EventMatcher matcher = (EventMatcher)condition.apply(new EventMatcher());
        this.waitForEvent(event -> event instanceof ServiceMetricEvent && matcher.test((ServiceMetricEvent)event), -1L);
        return matcher.matchingEvent.get();
    }

    public void waitForEventAggregate(UnaryOperator<EventMatcher> condition, UnaryOperator<AggregateMatcher> aggregateCondition) {
        EventMatcher eventMatcher = (EventMatcher)condition.apply(new EventMatcher());
        AggregateMatcher aggregateMatcher = (AggregateMatcher)aggregateCondition.apply(new AggregateMatcher());
        this.waitForEvent(event -> event instanceof ServiceMetricEvent && eventMatcher.test((ServiceMetricEvent)event) && aggregateMatcher.test((ServiceMetricEvent)event), 300000L);
    }

    private void triggerConditionEvaluations() {
        if (this.conditionEvaluateExecutor == null) {
            throw new ISE("Cannot evaluate conditions as the 'conditionEvaluateExecutor' is null.", new Object[0]);
        }
        this.conditionEvaluateExecutor.submit(this::evaluateWaitConditions);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void evaluateWaitConditions() {
        this.eventReadWriteLock.readLock().lock();
        try {
            List<WaitCondition> conditionsToEvaluate = List.copyOf(this.waitConditions);
            if (conditionsToEvaluate.isEmpty()) {
                return;
            }
            List events = this.getEvents();
            for (WaitCondition condition : conditionsToEvaluate) {
                int currentNumberOfEvents = events.size();
                for (int i = condition.processedUntil; i < currentNumberOfEvents; ++i) {
                    if (!condition.predicate.test((Event)events.get(i))) continue;
                    condition.countDownLatch.countDown();
                }
                condition.processedUntil = currentNumberOfEvents;
            }
        }
        catch (Exception e) {
            log.error((Throwable)e, "Error while evaluating wait conditions", new Object[0]);
        }
        finally {
            this.eventReadWriteLock.readLock().unlock();
        }
    }

    private static class WaitCondition {
        private final Predicate<Event> predicate;
        private final CountDownLatch countDownLatch;
        private int processedUntil;

        private WaitCondition(Predicate<Event> predicate) {
            this.predicate = predicate;
            this.countDownLatch = new CountDownLatch(1);
        }
    }

    public static class EventMatcher
    implements Predicate<ServiceMetricEvent> {
        private String metricName;
        private Long metricValue;
        private final Map<String, Object> dimensions = new HashMap<String, Object>();
        private final AtomicReference<ServiceMetricEvent> matchingEvent = new AtomicReference();

        public EventMatcher hasMetricName(String metricName) {
            this.metricName = metricName;
            return this;
        }

        public EventMatcher hasValue(long metricValue) {
            this.metricValue = metricValue;
            return this;
        }

        public EventMatcher hasDimension(String dimension, Object value) {
            this.dimensions.put(dimension, value);
            return this;
        }

        @Override
        public boolean test(ServiceMetricEvent event) {
            if (this.metricName != null && !event.getMetric().equals(this.metricName)) {
                return false;
            }
            if (this.metricValue != null && event.getValue().longValue() != this.metricValue.longValue()) {
                return false;
            }
            boolean matches = this.dimensions.entrySet().stream().allMatch(dimValue -> event.getUserDims().getOrDefault(dimValue.getKey(), "").equals(dimValue.getValue()));
            if (matches) {
                this.matchingEvent.set(event);
                return true;
            }
            return false;
        }
    }

    public static class AggregateMatcher
    implements Predicate<ServiceMetricEvent> {
        private final List<ServiceMetricEvent> matchingEvents = new ArrayList<ServiceMetricEvent>();
        private long sumSoFar;
        private Long targetSum;
        private Long targetCount;

        public AggregateMatcher hasSumAtLeast(long sum) {
            this.targetSum = sum;
            return this;
        }

        public AggregateMatcher hasCountAtLeast(long count) {
            this.targetCount = count;
            return this;
        }

        @Override
        public boolean test(ServiceMetricEvent latestMatchingEvent) {
            this.matchingEvents.add(latestMatchingEvent);
            this.sumSoFar += latestMatchingEvent.getValue().longValue();
            if (this.targetSum != null && this.sumSoFar < this.targetSum) {
                return false;
            }
            return this.targetCount == null || (long)this.matchingEvents.size() >= this.targetCount;
        }
    }
}

