package org.apache.beam.runners.samza.adapter;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.io.Source;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.joda.time.Instant;
import org.junit.Assert;

/* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers.class */
public class TestSourceHelpers {

    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$ElementEvent.class */
    static class ElementEvent<T> implements Event<T> {
        final T element;
        final Instant timestamp;

        private ElementEvent(T t, Instant instant) {
            this.element = t;
            this.timestamp = instant;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$Event.class */
    public interface Event<T> {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$ExceptionEvent.class */
    public static class ExceptionEvent<T> implements Event<T> {
        final IOException exception;

        private ExceptionEvent(IOException iOException) {
            this.exception = iOException;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$LatchEvent.class */
    public static class LatchEvent<T> implements Event<T> {
        final CountDownLatch latch;

        private LatchEvent(CountDownLatch countDownLatch) {
            this.latch = countDownLatch;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$NoElementEvent.class */
    public static class NoElementEvent<T> implements Event<T> {
        NoElementEvent() {
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$SourceBuilder.class */
    public static abstract class SourceBuilder<T, W extends Source<T>> {
        private final List<Event<T>> events = new ArrayList();
        private Instant currentTimestamp = BoundedWindow.TIMESTAMP_MIN_VALUE;

        @SafeVarargs
        public final SourceBuilder<T, W> addElements(T... tArr) {
            for (T t : tArr) {
                this.events.add(new ElementEvent(t, this.currentTimestamp));
            }
            return this;
        }

        public SourceBuilder<T, W> addException(IOException iOException) {
            this.events.add(new ExceptionEvent(iOException));
            return this;
        }

        public SourceBuilder<T, W> addLatch(CountDownLatch countDownLatch) {
            this.events.add(new LatchEvent(countDownLatch));
            return this;
        }

        public SourceBuilder<T, W> setTimestamp(Instant instant) {
            Assert.assertTrue("Expected " + instant + " to be greater than or equal to " + this.currentTimestamp, instant.isEqual(this.currentTimestamp) || instant.isAfter(this.currentTimestamp));
            this.currentTimestamp = instant;
            return this;
        }

        public SourceBuilder<T, W> advanceWatermarkTo(Instant instant) {
            this.events.add(new WatermarkEvent(instant));
            return this;
        }

        public SourceBuilder<T, W> noElements() {
            this.events.add(new NoElementEvent());
            return this;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public List<Event<T>> getEvents() {
            return this.events;
        }

        /* renamed from: build */
        public abstract W mo2build();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/samza/adapter/TestSourceHelpers$WatermarkEvent.class */
    public static class WatermarkEvent<T> implements Event<T> {
        final Instant watermark;

        private WatermarkEvent(Instant instant) {
            this.watermark = instant;
        }
    }

    private TestSourceHelpers() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static IncomingMessageEnvelope createElementMessage(SystemStreamPartition systemStreamPartition, String str, String str2, Instant instant) {
        return new IncomingMessageEnvelope(systemStreamPartition, str, (Object) null, OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(str2, instant)));
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static IncomingMessageEnvelope createWatermarkMessage(SystemStreamPartition systemStreamPartition, Instant instant) {
        return IncomingMessageEnvelope.buildWatermarkEnvelope(systemStreamPartition, instant.getMillis());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static IncomingMessageEnvelope createEndOfStreamMessage(SystemStreamPartition systemStreamPartition) {
        return IncomingMessageEnvelope.buildEndOfStreamEnvelope(systemStreamPartition);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static <T> void expectWrappedException(Exception exc, Callable<T> callable) throws Exception {
        try {
            callable.call();
            Assert.fail("Expected exception (" + exc + "), but no exception was thrown");
        } catch (Exception e) {
            Throwable th = e;
            while (true) {
                Throwable th2 = th;
                if (th2 == null) {
                    Assert.assertEquals(exc, e);
                    return;
                } else if (th2.equals(exc)) {
                    return;
                } else {
                    th = th2.getCause();
                }
            }
        }
    }
}
