package org.apache.beam.runners.samza.translation;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.beam.runners.core.serialization.Base64Serializer;
import org.apache.beam.runners.samza.runtime.OpMessage;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.samza.Partition;
import org.apache.samza.SamzaException;
import org.apache.samza.config.Config;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemAdmin;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemFactory;
import org.apache.samza.system.SystemProducer;
import org.apache.samza.system.SystemStreamMetadata;
import org.apache.samza.system.SystemStreamPartition;

/* loaded from: input_file:org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory.class */
public class SamzaTestStreamSystemFactory implements SystemFactory {
    private static final String DUMMY_OFFSET = "0";

    /* loaded from: input_file:org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory$SamzaTestStreamSystemAdmin.class */
    public static class SamzaTestStreamSystemAdmin implements SystemAdmin {
        public Map<SystemStreamPartition, String> getOffsetsAfter(Map<SystemStreamPartition, String> map) {
            return (Map) map.keySet().stream().collect(Collectors.toMap(Function.identity(), systemStreamPartition -> {
                return SamzaTestStreamSystemFactory.DUMMY_OFFSET;
            }));
        }

        public Map<String, SystemStreamMetadata> getSystemStreamMetadata(Set<String> set) {
            return (Map) set.stream().collect(Collectors.toMap(Function.identity(), str -> {
                return new SystemStreamMetadata(str, Collections.singletonMap(new Partition(0), new SystemStreamMetadata.SystemStreamPartitionMetadata(SamzaTestStreamSystemFactory.DUMMY_OFFSET, SamzaTestStreamSystemFactory.DUMMY_OFFSET, SamzaTestStreamSystemFactory.DUMMY_OFFSET)));
            }));
        }

        public Integer offsetComparator(String str, String str2) {
            return 0;
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/samza/translation/SamzaTestStreamSystemFactory$SamzaTestStreamSystemConsumer.class */
    public static class SamzaTestStreamSystemConsumer<T> implements SystemConsumer {
        TestStream<T> testStream;

        public SamzaTestStreamSystemConsumer(TestStream<T> testStream) {
            this.testStream = testStream;
        }

        public void start() {
        }

        public void stop() {
        }

        public void register(SystemStreamPartition systemStreamPartition, String str) {
        }

        public Map<SystemStreamPartition, List<IncomingMessageEnvelope>> poll(Set<SystemStreamPartition> set, long j) {
            SystemStreamPartition next = set.iterator().next();
            ArrayList arrayList = new ArrayList();
            Iterator it = this.testStream.getEvents().iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                TestStream.WatermarkEvent watermarkEvent = (TestStream.Event) it.next();
                if (!watermarkEvent.getType().equals(TestStream.EventType.ELEMENT)) {
                    if (!watermarkEvent.getType().equals(TestStream.EventType.WATERMARK)) {
                        if (watermarkEvent.getType().equals(TestStream.EventType.PROCESSING_TIME)) {
                            throw new UnsupportedOperationException("Advancing Processing time is not supported by the Samza Runner.");
                        }
                        throw new SamzaException("Unknown event type " + watermarkEvent.getType());
                    }
                    long millis = watermarkEvent.getWatermark().getMillis();
                    arrayList.add(IncomingMessageEnvelope.buildWatermarkEnvelope(next, millis));
                    if (millis == BoundedWindow.TIMESTAMP_MAX_VALUE.getMillis()) {
                        arrayList.add(IncomingMessageEnvelope.buildEndOfStreamEnvelope(next));
                        break;
                    }
                } else {
                    for (TimestampedValue timestampedValue : ((TestStream.ElementEvent) watermarkEvent).getElements()) {
                        arrayList.add(new IncomingMessageEnvelope(next, SamzaTestStreamSystemFactory.DUMMY_OFFSET, (Object) null, OpMessage.ofElement(WindowedValue.timestampedValueInGlobalWindow(timestampedValue.getValue(), timestampedValue.getTimestamp()))));
                    }
                }
            }
            return ImmutableMap.of(next, arrayList);
        }
    }

    public SystemConsumer getConsumer(String str, Config config, MetricsRegistry metricsRegistry) {
        return new SamzaTestStreamSystemConsumer(getTestStream(config.subset(String.format("systems.%s.", str), true)));
    }

    public SystemProducer getProducer(String str, Config config, MetricsRegistry metricsRegistry) {
        throw new UnsupportedOperationException("SamzaTestStreamSystem doesn't support producing");
    }

    public SystemAdmin getAdmin(String str, Config config) {
        return new SamzaTestStreamSystemAdmin();
    }

    private static <T> TestStream<T> getTestStream(Config config) {
        return (TestStream) ((SerializableFunction) Base64Serializer.deserializeUnchecked((String) config.get(SamzaTestStreamTranslator.TEST_STREAM_DECODER), SerializableFunction.class)).apply((String) config.get(SamzaTestStreamTranslator.ENCODED_TEST_STREAM));
    }
}
