package org.apache.gobblin.writer.test;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.eventbus.EventBus;
import com.google.common.eventbus.Subscribe;
import java.beans.ConstructorProperties;
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.gobblin.writer.test.TestingEventBuses;

/* loaded from: input_file:org/apache/gobblin/writer/test/TestingEventBusAsserter.class */
public class TestingEventBusAsserter implements Closeable {
    private final EventBus _eventBus;
    private final BlockingDeque<TestingEventBuses.Event> _events = new LinkedBlockingDeque();
    private long _defaultTimeoutValue = 1;
    private TimeUnit _defaultTimeoutUnit = TimeUnit.SECONDS;

    /* loaded from: input_file:org/apache/gobblin/writer/test/TestingEventBusAsserter$StaticMessage.class */
    public static class StaticMessage implements Function<TestingEventBuses.Event, String> {
        private final String message;

        public String apply(TestingEventBuses.Event event) {
            return this.message;
        }

        @ConstructorProperties({"message"})
        public StaticMessage(String str) {
            this.message = str;
        }
    }

    public TestingEventBusAsserter(String str) {
        this._eventBus = TestingEventBuses.getEventBus(str);
        this._eventBus.register(this);
    }

    @Subscribe
    public void processEvent(TestingEventBuses.Event event) {
        this._events.offer(event);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        this._eventBus.unregister(this);
    }

    public BlockingDeque<TestingEventBuses.Event> getEvents() {
        return this._events;
    }

    public void clear() {
        this._events.clear();
    }

    public TestingEventBusAsserter withTimeout(long j, TimeUnit timeUnit) {
        this._defaultTimeoutValue = j;
        this._defaultTimeoutUnit = timeUnit;
        return this;
    }

    public TestingEventBuses.Event assertNext(Predicate<TestingEventBuses.Event> predicate, Function<TestingEventBuses.Event, String> function) throws InterruptedException, TimeoutException {
        TestingEventBuses.Event pollFirst = this._events.pollFirst(this._defaultTimeoutValue, this._defaultTimeoutUnit);
        if (null == pollFirst) {
            throw new TimeoutException();
        }
        if (predicate.apply(pollFirst)) {
            return pollFirst;
        }
        throw new AssertionError(function.apply(pollFirst));
    }

    public TestingEventBuses.Event assertNext(Predicate<TestingEventBuses.Event> predicate, String str) throws InterruptedException, TimeoutException {
        return assertNext(predicate, new StaticMessage(str));
    }

    public <T> TestingEventBuses.Event assertNextValue(final Predicate<T> predicate, Function<TestingEventBuses.Event, String> function) throws InterruptedException, TimeoutException {
        return assertNext(new Predicate<TestingEventBuses.Event>() { // from class: org.apache.gobblin.writer.test.TestingEventBusAsserter.1
            public boolean apply(@Nonnull TestingEventBuses.Event event) {
                return predicate.apply(event.getTypedValue());
            }
        }, function);
    }

    public <T> TestingEventBuses.Event assertNextValue(final Predicate<T> predicate, String str) throws InterruptedException, TimeoutException {
        return assertNext(new Predicate<TestingEventBuses.Event>() { // from class: org.apache.gobblin.writer.test.TestingEventBusAsserter.2
            public boolean apply(@Nonnull TestingEventBuses.Event event) {
                return predicate.apply(event.getTypedValue());
            }
        }, str);
    }

    public <T> TestingEventBuses.Event assertNextValueEq(final T t) throws InterruptedException, TimeoutException {
        return assertNextValue(Predicates.equalTo(t), new Function<TestingEventBuses.Event, String>() { // from class: org.apache.gobblin.writer.test.TestingEventBusAsserter.3
            public String apply(@Nonnull TestingEventBuses.Event event) {
                return "Event value mismatch: " + event.getValue() + " != " + t;
            }
        });
    }

    public <T> void assertNextValuesEq(Collection<T> collection) throws InterruptedException, TimeoutException {
        final HashSet hashSet = new HashSet(collection);
        Predicate<T> predicate = new Predicate<T>() { // from class: org.apache.gobblin.writer.test.TestingEventBusAsserter.4
            public boolean apply(@Nonnull T t) {
                if (!hashSet.contains(t)) {
                    return false;
                }
                hashSet.remove(t);
                return true;
            }
        };
        while (hashSet.size() > 0) {
            assertNextValue(predicate, new Function<TestingEventBuses.Event, String>() { // from class: org.apache.gobblin.writer.test.TestingEventBusAsserter.5
                public String apply(@Nonnull TestingEventBuses.Event event) {
                    return "Event value " + event.getValue() + " not in set " + hashSet;
                }
            });
        }
    }
}
