package org.apache.beam.it.gcp.datastream;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.datastream.v1.AvroFileFormat;
import com.google.cloud.datastream.v1.BigQueryDestinationConfig;
import com.google.cloud.datastream.v1.BigQueryProfile;
import com.google.cloud.datastream.v1.ConnectionProfile;
import com.google.cloud.datastream.v1.ConnectionProfileName;
import com.google.cloud.datastream.v1.CreateConnectionProfileRequest;
import com.google.cloud.datastream.v1.CreateStreamRequest;
import com.google.cloud.datastream.v1.DatastreamClient;
import com.google.cloud.datastream.v1.DatastreamSettings;
import com.google.cloud.datastream.v1.DeleteConnectionProfileRequest;
import com.google.cloud.datastream.v1.DeleteStreamRequest;
import com.google.cloud.datastream.v1.DestinationConfig;
import com.google.cloud.datastream.v1.GcsDestinationConfig;
import com.google.cloud.datastream.v1.GcsProfile;
import com.google.cloud.datastream.v1.JsonFileFormat;
import com.google.cloud.datastream.v1.LocationName;
import com.google.cloud.datastream.v1.MysqlProfile;
import com.google.cloud.datastream.v1.OracleProfile;
import com.google.cloud.datastream.v1.PostgresqlProfile;
import com.google.cloud.datastream.v1.SourceConfig;
import com.google.cloud.datastream.v1.StaticServiceIpConnectivity;
import com.google.cloud.datastream.v1.Stream;
import com.google.cloud.datastream.v1.StreamName;
import com.google.cloud.datastream.v1.UpdateStreamRequest;
import com.google.protobuf.Duration;
import com.google.protobuf.FieldMask;
import java.io.IOException;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import org.apache.beam.it.common.ResourceManager;
import org.apache.beam.it.gcp.datastream.JDBCSource;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/it/gcp/datastream/DatastreamResourceManager.class */
public final class DatastreamResourceManager implements ResourceManager {
    private static final Logger LOG = LoggerFactory.getLogger(DatastreamResourceManager.class);
    private static final String FIELD_STATE = "state";
    private final DatastreamClient datastreamClient;
    private final String location;
    private final String projectId;
    private final Set<String> createdStreamIds;
    private final Set<String> createdConnectionProfileIds;

    /* loaded from: input_file:org/apache/beam/it/gcp/datastream/DatastreamResourceManager$Builder.class */
    public static final class Builder {
        private final String projectId;
        private final String location;
        private CredentialsProvider credentialsProvider;

        private Builder(String str, String str2, CredentialsProvider credentialsProvider) {
            this.projectId = str;
            this.location = str2;
            this.credentialsProvider = credentialsProvider;
        }

        public Builder setCredentialsProvider(CredentialsProvider credentialsProvider) {
            this.credentialsProvider = credentialsProvider;
            return this;
        }

        public DatastreamResourceManager build() throws IOException {
            return new DatastreamResourceManager(this);
        }
    }

    /* loaded from: input_file:org/apache/beam/it/gcp/datastream/DatastreamResourceManager$DestinationOutputFormat.class */
    enum DestinationOutputFormat {
        AVRO_FILE_FORMAT,
        JSON_FILE_FORMAT
    }

    private DatastreamResourceManager(Builder builder) throws IOException {
        this(DatastreamClient.create(DatastreamSettings.newBuilder().setCredentialsProvider(builder.credentialsProvider).build()), builder);
    }

    @VisibleForTesting
    public DatastreamResourceManager(DatastreamClient datastreamClient, Builder builder) {
        this.datastreamClient = datastreamClient;
        this.location = builder.location;
        this.projectId = builder.projectId;
        this.createdStreamIds = Collections.synchronizedSet(new HashSet());
        this.createdConnectionProfileIds = Collections.synchronizedSet(new HashSet());
    }

    public static Builder builder(String str, String str2, CredentialsProvider credentialsProvider) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "projectID can not be null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2), "location can not be null or empty");
        return new Builder(str, str2, credentialsProvider);
    }

    private synchronized ConnectionProfile createJDBCSourceConnectionProfile(String str, JDBCSource jDBCSource) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "connectionProfileId can not be null or empty");
        LOG.info("Creating JDBC Source Connection Profile {} in project {}.", str, this.projectId);
        ConnectionProfile.Builder newBuilder = ConnectionProfile.newBuilder();
        JDBCSource.SourceType type = jDBCSource.type();
        try {
            switch (type) {
                case MYSQL:
                    MysqlProfile.Builder newBuilder2 = MysqlProfile.newBuilder();
                    newBuilder2.setHostname(jDBCSource.hostname()).setUsername(jDBCSource.username()).setPassword(jDBCSource.password()).setPort(jDBCSource.port());
                    newBuilder.setMysqlProfile(newBuilder2);
                    break;
                case ORACLE:
                    OracleProfile.Builder newBuilder3 = OracleProfile.newBuilder();
                    newBuilder3.setHostname(jDBCSource.hostname()).setUsername(jDBCSource.username()).setPassword(jDBCSource.password()).setPort(jDBCSource.port());
                    newBuilder.setOracleProfile(newBuilder3);
                    break;
                case POSTGRESQL:
                    PostgresqlProfile.Builder newBuilder4 = PostgresqlProfile.newBuilder();
                    newBuilder4.setHostname(jDBCSource.hostname()).setUsername(jDBCSource.username()).setPassword(jDBCSource.password()).setPort(jDBCSource.port()).setDatabase(((PostgresqlSource) jDBCSource).database());
                    newBuilder.setPostgresqlProfile(newBuilder4);
                    break;
                default:
                    throw new DatastreamResourceManagerException("Could not recognize JDBC source type " + type.name());
            }
            ConnectionProfile connectionProfile = (ConnectionProfile) this.datastreamClient.createConnectionProfileAsync(CreateConnectionProfileRequest.newBuilder().setParent(LocationName.of(this.projectId, this.location).toString()).setConnectionProfile(newBuilder.setDisplayName(str).setStaticServiceIpConnectivity(StaticServiceIpConnectivity.getDefaultInstance()).build()).setConnectionProfileId(str).build()).get();
            this.createdConnectionProfileIds.add(str);
            LOG.info("Successfully created JDBC Source Connection Profile {} in project {}.", str, this.projectId);
            return connectionProfile;
        } catch (InterruptedException | ExecutionException e) {
            throw new DatastreamResourceManagerException("Failed to create JDBC source connection profile. ", e);
        }
    }

    public synchronized SourceConfig buildSourceConfig(String str, JDBCSource jDBCSource) {
        createJDBCSourceConnectionProfile(str, jDBCSource);
        SourceConfig.Builder sourceConnectionProfile = SourceConfig.newBuilder().setSourceConnectionProfile(ConnectionProfileName.format(this.projectId, this.location, str));
        switch (jDBCSource.type()) {
            case MYSQL:
                sourceConnectionProfile.setMysqlSourceConfig(jDBCSource.mo23config());
                break;
            case ORACLE:
                sourceConnectionProfile.setOracleSourceConfig(jDBCSource.mo23config());
                break;
            case POSTGRESQL:
                sourceConnectionProfile.setPostgresqlSourceConfig(jDBCSource.mo23config());
                break;
            default:
                throw new DatastreamResourceManagerException("Could not recognize JDBC source type " + jDBCSource.type().name());
        }
        return sourceConnectionProfile.build();
    }

    public synchronized ConnectionProfile createGCSDestinationConnectionProfile(String str, String str2, String str3) {
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "connectionProfileId can not be null or empty");
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str2), "gcsBucketName can not be null or empty");
        Preconditions.checkArgument(str3 != null, "gcsRootPath can not be null");
        Preconditions.checkArgument(str3.isEmpty() || str3.charAt(0) == '/', "gcsRootPath must either be an empty string or start with a '/'");
        LOG.info("Creating GCS Destination Connection Profile {} in project {}.", str, this.projectId);
        try {
            ConnectionProfile connectionProfile = (ConnectionProfile) this.datastreamClient.createConnectionProfileAsync(CreateConnectionProfileRequest.newBuilder().setParent(LocationName.of(this.projectId, this.location).toString()).setConnectionProfile(ConnectionProfile.newBuilder().setDisplayName(str).setStaticServiceIpConnectivity(StaticServiceIpConnectivity.getDefaultInstance()).setGcsProfile(GcsProfile.newBuilder().setBucket(str2).setRootPath(str3))).setConnectionProfileId(str).build()).get();
            this.createdConnectionProfileIds.add(str);
            LOG.info("Successfully created GCS Destination Connection Profile {} in project {}.", str, this.projectId);
            return connectionProfile;
        } catch (InterruptedException | ExecutionException e) {
            throw new DatastreamResourceManagerException("Failed to create GCS source connection profile. ", e);
        }
    }

    public synchronized DestinationConfig buildGCSDestinationConfig(String str, String str2, DestinationOutputFormat destinationOutputFormat) {
        DestinationConfig.Builder destinationConnectionProfile = DestinationConfig.newBuilder().setDestinationConnectionProfile(ConnectionProfileName.format(this.projectId, this.location, str));
        GcsDestinationConfig.Builder path = GcsDestinationConfig.newBuilder().setPath(str2);
        if (destinationOutputFormat == DestinationOutputFormat.AVRO_FILE_FORMAT) {
            path.setAvroFileFormat(AvroFileFormat.getDefaultInstance());
        } else {
            path.setJsonFileFormat(JsonFileFormat.getDefaultInstance());
        }
        destinationConnectionProfile.setGcsDestinationConfig(path);
        return destinationConnectionProfile.build();
    }

    public synchronized ConnectionProfile createBQDestinationConnectionProfile(String str) {
        LOG.info("Creating BQ Destination Connection Profile {} in project {}.", str, this.projectId);
        try {
            ConnectionProfile connectionProfile = (ConnectionProfile) this.datastreamClient.createConnectionProfileAsync(CreateConnectionProfileRequest.newBuilder().setParent(LocationName.of(this.projectId, this.location).toString()).setConnectionProfile(ConnectionProfile.newBuilder().setDisplayName(str).setStaticServiceIpConnectivity(StaticServiceIpConnectivity.getDefaultInstance()).setBigqueryProfile(BigQueryProfile.newBuilder())).setConnectionProfileId(str).build()).get();
            this.createdConnectionProfileIds.add(str);
            LOG.info("Successfully created BQ Destination Connection Profile {} in project {}.", str, this.projectId);
            return connectionProfile;
        } catch (InterruptedException | ExecutionException e) {
            throw new DatastreamResourceManagerException("Failed to create BQ destination connection profile. ", e);
        }
    }

    public synchronized DestinationConfig buildBQDestinationConfig(String str, long j, String str2) {
        DestinationConfig.Builder destinationConnectionProfile = DestinationConfig.newBuilder().setDestinationConnectionProfile(ConnectionProfileName.format(this.projectId, this.location, str));
        destinationConnectionProfile.setBigqueryDestinationConfig(BigQueryDestinationConfig.newBuilder().setDataFreshness(Duration.newBuilder().setSeconds(j).build()).setSingleTargetDataset(BigQueryDestinationConfig.SingleTargetDataset.newBuilder().setDatasetId(str2)));
        return destinationConnectionProfile.build();
    }

    public synchronized Stream createStream(String str, SourceConfig sourceConfig, DestinationConfig destinationConfig) {
        LOG.info("Creating Stream {} in project {}.", str, this.projectId);
        try {
            Stream stream = (Stream) this.datastreamClient.createStreamAsync(CreateStreamRequest.newBuilder().setParent(LocationName.format(this.projectId, this.location)).setStreamId(str).setStream(Stream.newBuilder().setSourceConfig(sourceConfig).setDisplayName(str).setDestinationConfig(destinationConfig).setBackfillAll(Stream.BackfillAllStrategy.getDefaultInstance()).build()).build()).get();
            this.createdStreamIds.add(str);
            LOG.info("Successfully created Stream {} in project {}.", str, this.projectId);
            return stream;
        } catch (InterruptedException | ExecutionException e) {
            throw new DatastreamResourceManagerException("Failed to create stream. ", e);
        }
    }

    public synchronized Stream updateStreamState(String str, Stream.State state) {
        LOG.info("Updating {}'s state to {} in project {}.", new Object[]{str, state.name(), this.projectId});
        try {
            Stream stream = (Stream) this.datastreamClient.updateStreamAsync(UpdateStreamRequest.newBuilder().setStream(Stream.newBuilder().setName(StreamName.format(this.projectId, this.location, str)).setState(state)).setUpdateMask(FieldMask.newBuilder().addPaths(FIELD_STATE)).build()).get();
            LOG.info("Successfully updated {}'s state to {} in project {}.", new Object[]{str, state.name(), this.projectId});
            return stream;
        } catch (InterruptedException | ExecutionException e) {
            throw new DatastreamResourceManagerException("Failed to update stream. ", e);
        }
    }

    public synchronized Stream startStream(String str) {
        LOG.info("Starting Stream {} in project {}.", str, this.projectId);
        return updateStreamState(str, Stream.State.RUNNING);
    }

    public synchronized Stream pauseStream(String str) {
        LOG.info("Pausing Stream {} in project {}.", str, this.projectId);
        return updateStreamState(str, Stream.State.PAUSED);
    }

    public synchronized void cleanupAll() {
        LOG.info("Cleaning up Datastream resource manager.");
        boolean z = false;
        try {
            Iterator<String> it = this.createdStreamIds.iterator();
            while (it.hasNext()) {
                this.datastreamClient.deleteStreamAsync(DeleteStreamRequest.newBuilder().setName(StreamName.format(this.projectId, this.location, it.next())).build()).get();
            }
            LOG.info("Successfully deleted stream(s). ");
        } catch (InterruptedException | ExecutionException e) {
            LOG.error("Failed to delete stream(s).");
            z = true;
        }
        try {
            Iterator<String> it2 = this.createdConnectionProfileIds.iterator();
            while (it2.hasNext()) {
                this.datastreamClient.deleteConnectionProfileAsync(DeleteConnectionProfileRequest.newBuilder().setName(ConnectionProfileName.format(this.projectId, this.location, it2.next())).build()).get();
            }
            LOG.info("Successfully deleted connection profile(s). ");
        } catch (InterruptedException | ExecutionException e2) {
            LOG.error("Failed to delete connection profile(s).");
            z = true;
        }
        try {
            this.datastreamClient.close();
        } catch (Exception e3) {
            LOG.error("Failed to close datastream client. ");
            z = true;
        }
        if (z) {
            throw new DatastreamResourceManagerException("Failed to delete resources. Check above for errors.");
        }
        LOG.info("Successfully cleaned up Datastream resource manager.");
    }
}
