package org.apache.beam.sdk.io.gcp.pubsub;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIOReadTest;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollectionRowTuple;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.tuple.Pair;
import org.joda.time.Instant;
import org.joda.time.format.ISODateTimeFormat;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteSchemaTransformProviderIT.class */
public class PubsubWriteSchemaTransformProviderIT {
    private static final TestPubsubOptions TEST_PUBSUB_OPTIONS = TestPipeline.testingPipelineOptions().as(TestPubsubOptions.class);
    private static final String HAS_NO_SCHEMA = "has-no-schema";
    private static PubsubClient pubsubClient;
    private static PubsubClient.TopicPath hasNoSchemaTopic;
    private static PubsubClient.SubscriptionPath hasNoSchemaSubscription;
    private static final Instant TIMESTAMP;
    private static final String RESOURCE_NAME_POSTFIX;
    private static final int ACK_DEADLINE_SECONDS = 60;
    private static final int AWAIT_TERMINATED_SECONDS = 30;
    private static final AutoValueSchema AUTO_VALUE_SCHEMA;
    private static final TypeDescriptor<PubsubWriteSchemaTransformConfiguration> CONFIGURATION_TYPE_DESCRIPTOR;
    private static final SerializableFunction<PubsubWriteSchemaTransformConfiguration, Row> TO_ROW_FN;

    @Rule
    public transient TestPipeline pipeline = TestPipeline.create();
    private final Schema.Field timestampField = Schema.Field.of("timestamp", Schema.FieldType.DATETIME);
    private final Schema.Field payloadBytesField = Schema.Field.of("payload", Schema.FieldType.BYTES);

    @BeforeClass
    public static void setUp() throws IOException {
        String project = TEST_PUBSUB_OPTIONS.as(PubsubOptions.class).getProject();
        pubsubClient = PubsubGrpcClient.FACTORY.newClient((String) null, (String) null, TEST_PUBSUB_OPTIONS);
        hasNoSchemaTopic = PubsubClient.topicPathFromName(project, HAS_NO_SCHEMA + RESOURCE_NAME_POSTFIX);
        hasNoSchemaSubscription = PubsubClient.subscriptionPathFromName(project, HAS_NO_SCHEMA + RESOURCE_NAME_POSTFIX);
        pubsubClient.createTopic(hasNoSchemaTopic);
        pubsubClient.createSubscription(hasNoSchemaTopic, hasNoSchemaSubscription, ACK_DEADLINE_SECONDS);
    }

    @AfterClass
    public static void tearDown() throws IOException {
        pubsubClient.deleteSubscription(hasNoSchemaSubscription);
        pubsubClient.deleteTopic(hasNoSchemaTopic);
        pubsubClient.close();
    }

    @Test
    public void testWritePayloadBytes() throws IOException {
        Instant ofEpochMilli = Instant.ofEpochMilli(100000L);
        Schema of = Schema.of(new Schema.Field[]{this.payloadBytesField, this.timestampField});
        List singletonList = Collections.singletonList(Row.withSchema(of).attachValues(new Object[]{SpannerIOReadTest.DATABASE_ID.getBytes(StandardCharsets.UTF_8), ofEpochMilli}));
        PCollectionRowTuple.of("input", this.pipeline.apply(Create.of(singletonList).withRowSchema(of))).apply(new PubsubWriteSchemaTransformProvider().from((Row) TO_ROW_FN.apply(PubsubWriteSchemaTransformConfiguration.builder().setSource(PubsubWriteSchemaTransformConfiguration.sourceConfigurationBuilder().setPayloadFieldName(this.payloadBytesField.getName()).setTimestampFieldName(this.timestampField.getName()).build()).setTopic(hasNoSchemaTopic.getPath()).setTarget(PubsubWriteSchemaTransformConfiguration.targetConfigurationBuilder().build()).build())).buildTransform());
        PipelineResult run = this.pipeline.run(TEST_PUBSUB_OPTIONS);
        Instant now = Instant.now();
        Instant ofEpochMilli2 = Instant.ofEpochMilli(now.getMillis() + 30000);
        ArrayList arrayList = new ArrayList();
        while (now.isBefore(ofEpochMilli2)) {
            for (PubsubClient.IncomingMessage incomingMessage : pubsubClient.pull(0L, hasNoSchemaSubscription, 1, true)) {
                arrayList.add(Pair.of(incomingMessage.message().getData().toStringUtf8(), ImmutableMap.of("event_timestamp", (String) incomingMessage.message().getAttributesMap().getOrDefault("event_timestamp", ""))));
            }
            if (arrayList.size() == singletonList.size()) {
                break;
            } else {
                now = Instant.now();
            }
        }
        run.cancel();
        Assert.assertFalse(String.format("messages pulled from %s should not be empty", hasNoSchemaSubscription.getPath()), arrayList.isEmpty());
        Pair pair = (Pair) arrayList.get(0);
        Assert.assertEquals(new String((byte[]) Objects.requireNonNull(((Row) singletonList.get(0)).getBytes(this.payloadBytesField.getName())), StandardCharsets.UTF_8), pair.getLeft());
        Assert.assertEquals(ISODateTimeFormat.dateTime().print(ofEpochMilli), ((Map) pair.getRight()).get("event_timestamp"));
    }

    static {
        TEST_PUBSUB_OPTIONS.setBlockOnRun(false);
        TIMESTAMP = Instant.now();
        RESOURCE_NAME_POSTFIX = "-" + TIMESTAMP.getMillis();
        AUTO_VALUE_SCHEMA = new AutoValueSchema();
        CONFIGURATION_TYPE_DESCRIPTOR = TypeDescriptor.of(PubsubWriteSchemaTransformConfiguration.class);
        TO_ROW_FN = AUTO_VALUE_SCHEMA.toRowFunction(CONFIGURATION_TYPE_DESCRIPTOR);
    }
}
