/*
 * Decompiled with CFR 0.152.
 */
package de.otto.eventsourcing.query;

import de.otto.eventsourcing.event.Key;
import de.otto.eventsourcing.event.Payload;
import de.otto.eventsourcing.query.ConsumerRecordCallback;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class LatchedCallback
implements ConsumerRecordCallback {
    private static final Logger LOG = LoggerFactory.getLogger(LatchedCallback.class);
    private final CountDownLatch latch;
    private volatile ConsumerRecord<Key, Payload> lastEvent;

    public LatchedCallback() {
        this.latch = new CountDownLatch(1);
    }

    public LatchedCallback(int count) {
        this.latch = new CountDownLatch(count);
    }

    public void onFailure(Throwable ex) {
        LOG.debug("received onFailure {}", (Object)ex.getMessage());
    }

    public void onSuccess(ConsumerRecord<Key, Payload> result) {
        LOG.trace("Received onSuccess {}", result);
        this.lastEvent = result;
        this.latch.countDown();
    }

    public ConsumerRecord<Key, Payload> await() throws InterruptedException, TimeoutException {
        return this.await(1, TimeUnit.SECONDS);
    }

    public ConsumerRecord<Key, Payload> await(int timeout, TimeUnit timeUnit) throws InterruptedException, TimeoutException {
        LOG.trace("Waiting for latch...");
        if (this.latch.await(timeout, timeUnit)) {
            LOG.trace("...got it. Returning with {}", this.lastEvent);
            return this.lastEvent;
        }
        throw new TimeoutException(String.format("Did no receive onSuccess within %s%s", new Object[]{timeout, timeUnit}));
    }
}

