package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.TimestampedValue;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.class */
public class PubsubUnboundedSinkTest implements Serializable {
    private static final String DATA = "testData";
    private static final long TIMESTAMP = 1234;
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
    private static final String ID_ATTRIBUTE = "id";
    private static final int NUM_SHARDS = 10;

    @Rule
    public transient TestPipeline p = TestPipeline.create();
    private static final PubsubClient.TopicPath TOPIC = PubsubClient.topicPathFromName("testProject", "testTopic");
    private static final ImmutableMap<String, String> ATTRIBUTES = ImmutableMap.builder().put("a", "b").put("c", "d").build();

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest$Stamp.class */
    private static class Stamp extends DoFn<String, PubsubMessage> {
        private final Map<String, String> attributes;

        private Stamp() {
            this((Map<String, String>) ImmutableMap.of());
        }

        private Stamp(Map<String, String> map) {
            this.attributes = map;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<String, PubsubMessage>.ProcessContext processContext) {
            processContext.outputWithTimestamp(new PubsubMessage(((String) processContext.element()).getBytes(StandardCharsets.UTF_8), this.attributes), new Instant(PubsubUnboundedSinkTest.TIMESTAMP));
        }
    }

    private String getRecordId(String str) {
        return Hashing.murmur3_128().hashBytes(str.getBytes(StandardCharsets.UTF_8)).toString();
    }

    @Test
    public void saneCoder() throws Exception {
        CoderProperties.coderDecodeEncodeEqual(PubsubUnboundedSink.CODER, PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(), TIMESTAMP, getRecordId(DATA), (String) null));
        CoderProperties.coderSerializable(PubsubUnboundedSink.CODER);
    }

    @Test
    public void sendOneMessage() throws IOException {
        PubsubTestClient.PubsubTestClientFactory createFactoryForPublish = PubsubTestClient.createFactoryForPublish(TOPIC, ImmutableList.of(PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).putAllAttributes(ATTRIBUTES).build(), TIMESTAMP, getRecordId(DATA), (String) null)), ImmutableList.of());
        Throwable th = null;
        try {
            try {
                this.p.apply(Create.of(ImmutableList.of(DATA))).apply(ParDo.of(new Stamp(ATTRIBUTES))).apply(new PubsubUnboundedSink(createFactoryForPublish, ValueProvider.StaticValueProvider.of(TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, 1, 1, Duration.standardSeconds(2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC, (String) null));
                this.p.run();
                if (createFactoryForPublish != null) {
                    $closeResource(null, createFactoryForPublish);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createFactoryForPublish != null) {
                $closeResource(th, createFactoryForPublish);
            }
            throw th3;
        }
    }

    @Test
    public void sendOneMessageWithoutAttributes() throws IOException {
        PubsubTestClient.PubsubTestClientFactory createFactoryForPublish = PubsubTestClient.createFactoryForPublish(TOPIC, ImmutableList.of(PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(DATA)).build(), TIMESTAMP, getRecordId(DATA), (String) null)), ImmutableList.of());
        Throwable th = null;
        try {
            try {
                this.p.apply(Create.of(ImmutableList.of(DATA))).apply(ParDo.of(new Stamp(null))).apply(new PubsubUnboundedSink(createFactoryForPublish, ValueProvider.StaticValueProvider.of(TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, 1, 1, Duration.standardSeconds(2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC, (String) null));
                this.p.run();
                if (createFactoryForPublish != null) {
                    $closeResource(null, createFactoryForPublish);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createFactoryForPublish != null) {
                $closeResource(th, createFactoryForPublish);
            }
            throw th3;
        }
    }

    @Test
    public void testDynamicTopics() throws IOException {
        ImmutableList of = ImmutableList.of(PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testData0")).build(), TIMESTAMP, getRecordId("testData0"), "topic1"), PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testData1")).build(), 1235L, getRecordId("testData1"), "topic1"), PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testData2")).build(), 1236L, getRecordId("testData2"), "topic2"), PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8("testData3")).build(), 1237L, getRecordId("testData3"), "topic2"));
        PubsubTestClient.PubsubTestClientFactory createFactoryForPublish = PubsubTestClient.createFactoryForPublish((PubsubClient.TopicPath) null, of, ImmutableList.of());
        Throwable th = null;
        try {
            try {
                this.p.apply(Create.timestamped((List) of.stream().map(outgoingMessage -> {
                    return TimestampedValue.of(new PubsubMessage(outgoingMessage.getMessage().getData().toByteArray(), (Map) null).withTopic(outgoingMessage.topic()), Instant.ofEpochMilli(outgoingMessage.getTimestampMsSinceEpoch()));
                }).collect(Collectors.toList())).withCoder(new PubsubMessageWithTopicCoder())).apply(new PubsubUnboundedSink(createFactoryForPublish, (ValueProvider) null, TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, 1, 1, Duration.standardSeconds(2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC, (String) null));
                this.p.run();
                if (createFactoryForPublish != null) {
                    $closeResource(null, createFactoryForPublish);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createFactoryForPublish != null) {
                $closeResource(th, createFactoryForPublish);
            }
            throw th3;
        }
    }

    @Test
    public void sendMoreThanOneBatchByNumMessages() throws IOException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (int i = 0; i < 2 * NUM_SHARDS; i++) {
            String valueOf = String.valueOf(i);
            arrayList.add(PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(valueOf)).build(), TIMESTAMP, getRecordId(valueOf), (String) null));
            arrayList2.add(valueOf);
        }
        PubsubTestClient.PubsubTestClientFactory createFactoryForPublish = PubsubTestClient.createFactoryForPublish(TOPIC, arrayList, ImmutableList.of());
        Throwable th = null;
        try {
            try {
                this.p.apply(Create.of(arrayList2)).apply(ParDo.of(new Stamp())).apply(new PubsubUnboundedSink(createFactoryForPublish, ValueProvider.StaticValueProvider.of(TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, 2, 1000, Duration.standardSeconds(2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC, (String) null));
                this.p.run();
                if (createFactoryForPublish != null) {
                    $closeResource(null, createFactoryForPublish);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createFactoryForPublish != null) {
                $closeResource(th, createFactoryForPublish);
            }
            throw th3;
        }
    }

    @Test
    public void sendMoreThanOneBatchByByteSize() throws IOException {
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        int i = 0;
        while (true) {
            int i2 = i;
            if (i2 >= NUM_SHARDS * NUM_SHARDS) {
                break;
            }
            StringBuilder sb = new StringBuilder();
            for (int i3 = 0; i3 < NUM_SHARDS; i3++) {
                sb.append(String.valueOf(i2));
            }
            String sb2 = sb.toString();
            arrayList.add(PubsubClient.OutgoingMessage.of(PubsubMessage.newBuilder().setData(ByteString.copyFromUtf8(sb2)).build(), TIMESTAMP, getRecordId(sb2), (String) null));
            arrayList2.add(sb2);
            i = i2 + sb2.length();
        }
        PubsubTestClient.PubsubTestClientFactory createFactoryForPublish = PubsubTestClient.createFactoryForPublish(TOPIC, arrayList, ImmutableList.of());
        Throwable th = null;
        try {
            try {
                this.p.apply(Create.of(arrayList2)).apply(ParDo.of(new Stamp())).apply(new PubsubUnboundedSink(createFactoryForPublish, ValueProvider.StaticValueProvider.of(TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, 100, NUM_SHARDS, Duration.standardSeconds(2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC, (String) null));
                this.p.run();
                if (createFactoryForPublish != null) {
                    $closeResource(null, createFactoryForPublish);
                }
            } catch (Throwable th2) {
                th = th2;
                throw th2;
            }
        } catch (Throwable th3) {
            if (createFactoryForPublish != null) {
                $closeResource(th, createFactoryForPublish);
            }
            throw th3;
        }
    }

    private static /* synthetic */ void $closeResource(Throwable th, AutoCloseable autoCloseable) {
        if (th == null) {
            autoCloseable.close();
            return;
        }
        try {
            autoCloseable.close();
        } catch (Throwable th2) {
            th.addSuppressed(th2);
        }
    }
}
