/*
 * Decompiled with CFR 0.152.
 */
package org.apache.beam.sdk.io.gcp.pubsub;

import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubTestClient;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubUnboundedSink;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.hash.Hashing;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(value=JUnit4.class)
public class PubsubUnboundedSinkTest
implements Serializable {
    private static final PubsubClient.TopicPath TOPIC = PubsubClient.topicPathFromName((String)"testProject", (String)"testTopic");
    private static final String DATA = "testData";
    private static final ImmutableMap<String, String> ATTRIBUTES = ImmutableMap.builder().put((Object)"a", (Object)"b").put((Object)"c", (Object)"d").build();
    private static final long TIMESTAMP = 1234L;
    private static final String TIMESTAMP_ATTRIBUTE = "timestamp";
    private static final String ID_ATTRIBUTE = "id";
    private static final int NUM_SHARDS = 10;
    @Rule
    public transient TestPipeline p = TestPipeline.create();

    private String getRecordId(String data) {
        return Hashing.murmur3_128().hashBytes(data.getBytes(StandardCharsets.UTF_8)).toString();
    }

    @Test
    public void saneCoder() throws Exception {
        PubsubClient.OutgoingMessage message = new PubsubClient.OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), (Map)ImmutableMap.of(), 1234L, this.getRecordId(DATA));
        CoderProperties.coderDecodeEncodeEqual((Coder)PubsubUnboundedSink.CODER, (Object)message);
        CoderProperties.coderSerializable((Coder)PubsubUnboundedSink.CODER);
    }

    @Test
    public void sendOneMessage() throws IOException {
        ImmutableList outgoing = ImmutableList.of((Object)new PubsubClient.OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, 1234L, this.getRecordId(DATA)));
        int batchSize = 1;
        int batchBytes = 1;
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish((PubsubClient.TopicPath)TOPIC, (Iterable)outgoing, (Iterable)ImmutableList.of());){
            PubsubUnboundedSink sink = new PubsubUnboundedSink((PubsubClient.PubsubClientFactory)factory, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, batchSize, batchBytes, Duration.standardSeconds((long)2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC);
            ((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Iterable)ImmutableList.of((Object)DATA)))).apply((PTransform)ParDo.of((DoFn)new Stamp((Map)ATTRIBUTES)))).apply((PTransform)sink);
            this.p.run();
        }
    }

    @Test
    public void sendOneMessageWithoutAttributes() throws IOException {
        ImmutableList outgoing = ImmutableList.of((Object)new PubsubClient.OutgoingMessage(DATA.getBytes(StandardCharsets.UTF_8), null, 1234L, this.getRecordId(DATA)));
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish((PubsubClient.TopicPath)TOPIC, (Iterable)outgoing, (Iterable)ImmutableList.of());){
            PubsubUnboundedSink sink = new PubsubUnboundedSink((PubsubClient.PubsubClientFactory)factory, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, 1, 1, Duration.standardSeconds((long)2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC);
            ((PCollection)((PCollection)this.p.apply((PTransform)Create.of((Iterable)ImmutableList.of((Object)DATA)))).apply((PTransform)ParDo.of((DoFn)new Stamp(null)))).apply((PTransform)sink);
            this.p.run();
        }
    }

    @Test
    public void sendMoreThanOneBatchByNumMessages() throws IOException {
        ArrayList<PubsubClient.OutgoingMessage> outgoing = new ArrayList<PubsubClient.OutgoingMessage>();
        ArrayList<String> data = new ArrayList<String>();
        int batchSize = 2;
        int batchBytes = 1000;
        for (int i = 0; i < batchSize * 10; ++i) {
            String str = String.valueOf(i);
            outgoing.add(new PubsubClient.OutgoingMessage(str.getBytes(StandardCharsets.UTF_8), (Map)ImmutableMap.of(), 1234L, this.getRecordId(str)));
            data.add(str);
        }
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish((PubsubClient.TopicPath)TOPIC, outgoing, (Iterable)ImmutableList.of());){
            PubsubUnboundedSink sink = new PubsubUnboundedSink((PubsubClient.PubsubClientFactory)factory, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, batchSize, batchBytes, Duration.standardSeconds((long)2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC);
            ((PCollection)((PCollection)this.p.apply((PTransform)Create.of(data))).apply((PTransform)ParDo.of((DoFn)new Stamp()))).apply((PTransform)sink);
            this.p.run();
        }
    }

    @Test
    public void sendMoreThanOneBatchByByteSize() throws IOException {
        String str;
        ArrayList<PubsubClient.OutgoingMessage> outgoing = new ArrayList<PubsubClient.OutgoingMessage>();
        ArrayList<String> data = new ArrayList<String>();
        int batchSize = 100;
        int batchBytes = 10;
        for (int n = 0; n < batchBytes * 10; n += str.length()) {
            StringBuilder sb = new StringBuilder();
            for (int i = 0; i < batchBytes; ++i) {
                sb.append(String.valueOf(n));
            }
            str = sb.toString();
            outgoing.add(new PubsubClient.OutgoingMessage(str.getBytes(StandardCharsets.UTF_8), (Map)ImmutableMap.of(), 1234L, this.getRecordId(str)));
            data.add(str);
        }
        try (PubsubTestClient.PubsubTestClientFactory factory = PubsubTestClient.createFactoryForPublish((PubsubClient.TopicPath)TOPIC, outgoing, (Iterable)ImmutableList.of());){
            PubsubUnboundedSink sink = new PubsubUnboundedSink((PubsubClient.PubsubClientFactory)factory, (ValueProvider)ValueProvider.StaticValueProvider.of((Object)TOPIC), TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 10, batchSize, batchBytes, Duration.standardSeconds((long)2L), PubsubUnboundedSink.RecordIdMethod.DETERMINISTIC);
            ((PCollection)((PCollection)this.p.apply((PTransform)Create.of(data))).apply((PTransform)ParDo.of((DoFn)new Stamp()))).apply((PTransform)sink);
            this.p.run();
        }
    }

    private static class Stamp
    extends DoFn<String, PubsubMessage> {
        private final Map<String, String> attributes;

        private Stamp() {
            this((Map<String, String>)ImmutableMap.of());
        }

        private Stamp(Map<String, String> attributes) {
            this.attributes = attributes;
        }

        @DoFn.ProcessElement
        public void processElement(DoFn.ProcessContext c) {
            c.outputWithTimestamp((Object)new PubsubMessage(((String)c.element()).getBytes(StandardCharsets.UTF_8), this.attributes), new Instant(1234L));
        }
    }
}

