package org.apache.beam.sdk.io.gcp.spanner;

import com.google.cloud.spanner.DatabaseAdminClient;
import com.google.cloud.spanner.DatabaseId;
import com.google.cloud.spanner.Mutation;
import com.google.cloud.spanner.Options;
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Spanner;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.spanner.Statement;
import java.io.Serializable;
import java.util.Collections;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.spanner.SpannerIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.testing.TestPipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicate;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Predicates;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.hamcrest.MatcherAssert;
import org.hamcrest.Matchers;
import org.hamcrest.TypeSafeMatcher;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
/* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT.class */
public class SpannerWriteIT {
    private static final int MAX_DB_NAME_LENGTH = 30;

    @Rule
    public final transient TestPipeline p = TestPipeline.create();

    @Rule
    public transient ExpectedException thrown = ExpectedException.none();
    private Spanner spanner;
    private DatabaseAdminClient databaseAdminClient;
    private SpannerTestPipelineOptions options;
    private String databaseName;
    private String project;

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT$DivBy2.class */
    private static class DivBy2 implements Predicate<Long>, Serializable {
        private DivBy2() {
        }

        public boolean apply(Long l) {
            return l.longValue() % 2 == 0;
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT$GenerateMutations.class */
    private static class GenerateMutations extends DoFn<Long, Mutation> {
        private final String table;
        private final int valueSize = 100;
        private final Predicate<Long> injectError;

        public GenerateMutations(String str, Predicate<Long> predicate) {
            this.valueSize = 100;
            this.table = str;
            this.injectError = predicate;
        }

        public GenerateMutations(String str) {
            this(str, Predicates.alwaysFalse());
        }

        @DoFn.ProcessElement
        public void processElement(DoFn<Long, Mutation>.ProcessContext processContext) {
            Mutation.WriteBuilder newInsertOrUpdateBuilder = Mutation.newInsertOrUpdateBuilder(this.table);
            Long l = (Long) processContext.element();
            newInsertOrUpdateBuilder.set("Key").to(l);
            newInsertOrUpdateBuilder.set("Value").to(this.injectError.apply(l) ? null : RandomUtils.randomAlphaNumeric(100));
            processContext.output(newInsertOrUpdateBuilder.build());
        }
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT$SpannerTestPipelineOptions.class */
    public interface SpannerTestPipelineOptions extends TestPipelineOptions {
        @Description("Project that hosts Spanner instance")
        String getInstanceProjectId();

        void setInstanceProjectId(String str);

        @Default.String("beam-test")
        @Description("Instance ID to write to in Spanner")
        String getInstanceId();

        void setInstanceId(String str);

        @Default.String("beam-testdb")
        @Description("Database ID prefix to write to in Spanner")
        String getDatabaseIdPrefix();

        void setDatabaseIdPrefix(String str);

        @Default.String("users")
        @Description("Table name")
        String getTable();

        void setTable(String str);
    }

    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/spanner/SpannerWriteIT$StackTraceContainsString.class */
    static class StackTraceContainsString extends TypeSafeMatcher<Exception> {
        private String str;

        public StackTraceContainsString(String str) {
            this.str = str;
        }

        public void describeTo(org.hamcrest.Description description) {
            description.appendText("stack trace contains string '" + this.str + "'");
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public boolean matchesSafely(Exception exc) {
            return Throwables.getStackTraceAsString(exc).contains(this.str);
        }
    }

    @Before
    public void setUp() throws Exception {
        PipelineOptionsFactory.register(SpannerTestPipelineOptions.class);
        this.options = TestPipeline.testingPipelineOptions().as(SpannerTestPipelineOptions.class);
        this.project = this.options.getInstanceProjectId();
        if (this.project == null) {
            this.project = this.options.as(GcpOptions.class).getProject();
        }
        this.spanner = SpannerOptions.newBuilder().setProjectId(this.project).build().getService();
        this.databaseName = generateDatabaseName();
        this.databaseAdminClient = this.spanner.getDatabaseAdminClient();
        this.databaseAdminClient.dropDatabase(this.options.getInstanceId(), this.databaseName);
        this.databaseAdminClient.createDatabase(this.options.getInstanceId(), this.databaseName, Collections.singleton("CREATE TABLE " + this.options.getTable() + " (  Key           INT64,  Value         STRING(MAX) NOT NULL,) PRIMARY KEY (Key)")).get();
    }

    private String generateDatabaseName() {
        return this.options.getDatabaseIdPrefix() + "-" + RandomUtils.randomAlphaNumeric(29 - this.options.getDatabaseIdPrefix().length());
    }

    @Test
    public void testWrite() throws Exception {
        this.p.apply(GenerateSequence.from(0L).to(100)).apply(ParDo.of(new GenerateMutations(this.options.getTable()))).apply(SpannerIO.write().withProjectId(this.project).withInstanceId(this.options.getInstanceId()).withDatabaseId(this.databaseName));
        PipelineResult run = this.p.run();
        run.waitUntilFinish();
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.DONE));
        MatcherAssert.assertThat(Long.valueOf(countNumberOfRecords()), Matchers.equalTo(Long.valueOf(100)));
    }

    @Test
    public void testSequentialWrite() throws Exception {
        this.p.apply("second step", GenerateSequence.from(100).to(2 * 100)).apply("Gen mutations", ParDo.of(new GenerateMutations(this.options.getTable()))).apply(Wait.on(new PCollection[]{this.p.apply("first step", GenerateSequence.from(0L).to(100)).apply(ParDo.of(new GenerateMutations(this.options.getTable()))).apply(SpannerIO.write().withProjectId(this.project).withInstanceId(this.options.getInstanceId()).withDatabaseId(this.databaseName)).getOutput()})).apply("write to table2", SpannerIO.write().withProjectId(this.project).withInstanceId(this.options.getInstanceId()).withDatabaseId(this.databaseName));
        PipelineResult run = this.p.run();
        run.waitUntilFinish();
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.DONE));
        MatcherAssert.assertThat(Long.valueOf(countNumberOfRecords()), Matchers.equalTo(Long.valueOf(2 * 100)));
    }

    @Test
    public void testReportFailures() throws Exception {
        this.p.apply(GenerateSequence.from(0L).to(2 * 100)).apply(ParDo.of(new GenerateMutations(this.options.getTable(), new DivBy2()))).apply(SpannerIO.write().withProjectId(this.project).withInstanceId(this.options.getInstanceId()).withDatabaseId(this.databaseName).withFailureMode(SpannerIO.FailureMode.REPORT_FAILURES));
        PipelineResult run = this.p.run();
        run.waitUntilFinish();
        MatcherAssert.assertThat(run.getState(), Matchers.is(PipelineResult.State.DONE));
        MatcherAssert.assertThat(Long.valueOf(countNumberOfRecords()), Matchers.equalTo(Long.valueOf(100)));
    }

    @Test
    public void testFailFast() throws Exception {
        this.thrown.expect(new StackTraceContainsString("SpannerException"));
        this.thrown.expect(new StackTraceContainsString("Value must not be NULL in table users"));
        this.p.apply(GenerateSequence.from(0L).to(2 * 100)).apply(ParDo.of(new GenerateMutations(this.options.getTable(), new DivBy2()))).apply(SpannerIO.write().withProjectId(this.project).withInstanceId(this.options.getInstanceId()).withDatabaseId(this.databaseName));
        this.p.run().waitUntilFinish();
    }

    @After
    public void tearDown() throws Exception {
        this.databaseAdminClient.dropDatabase(this.options.getInstanceId(), this.databaseName);
        this.spanner.close();
    }

    private long countNumberOfRecords() {
        ResultSet executeQuery = this.spanner.getDatabaseClient(DatabaseId.of(this.project, this.options.getInstanceId(), this.databaseName)).singleUse().executeQuery(Statement.of("SELECT COUNT(*) FROM " + this.options.getTable()), new Options.QueryOption[0]);
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.next()), Matchers.is(true));
        long j = executeQuery.getLong(0);
        MatcherAssert.assertThat(Boolean.valueOf(executeQuery.next()), Matchers.is(false));
        return j;
    }
}
