package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
import java.io.IOException;
import java.io.Serializable;
import java.time.Instant;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.JavaFieldSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn.class */
public class StorageApiFlushAndFinalizeDoFn extends DoFn<KV<String, Operation>, Void> {
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiFlushAndFinalizeDoFn.class);
    private final BigQueryServices bqServices;

    @Nullable
    private BigQueryServices.DatasetService datasetService = null;
    private final Counter flushOperationsSent = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsSent");
    private final Counter flushOperationsSucceeded = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsSucceeded");
    private final Counter flushOperationsFailed = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsFailed");
    private final Counter flushOperationsAlreadyExists = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsAlreadyExists");
    private final Counter flushOperationsInvalidArgument = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "flushOperationsInvalidArgument");
    private final Distribution flushLatencyDistribution = Metrics.distribution(StorageApiFlushAndFinalizeDoFn.class, "flushOperationLatencyMs");
    private final Counter finalizeOperationsSent = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsSent");
    private final Counter finalizeOperationsSucceeded = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsSucceeded");
    private final Counter finalizeOperationsFailed = Metrics.counter(StorageApiFlushAndFinalizeDoFn.class, "finalizeOperationsFailed");

    @DefaultSchema(JavaFieldSchema.class)
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiFlushAndFinalizeDoFn$Operation.class */
    static class Operation implements Comparable<Operation>, Serializable {
        final long flushOffset;
        final boolean finalizeStream;

        @SchemaCreate
        public Operation(long j, boolean z) {
            this.flushOffset = j;
            this.finalizeStream = z;
        }

        public boolean equals(@Nullable Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            Operation operation = (Operation) obj;
            return this.flushOffset == operation.flushOffset && this.finalizeStream == operation.finalizeStream;
        }

        public int hashCode() {
            return Objects.hash(Long.valueOf(this.flushOffset), Boolean.valueOf(this.finalizeStream));
        }

        @Override // java.lang.Comparable
        public int compareTo(Operation operation) {
            int compare = Long.compare(this.flushOffset, operation.flushOffset);
            if (compare == 0) {
                compare = Boolean.compare(this.finalizeStream, operation.finalizeStream);
            }
            return compare;
        }
    }

    public StorageApiFlushAndFinalizeDoFn(BigQueryServices bigQueryServices) {
        this.bqServices = bigQueryServices;
    }

    private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
        if (this.datasetService == null) {
            this.datasetService = this.bqServices.getDatasetService((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
        }
        return this.datasetService;
    }

    @DoFn.Teardown
    public void onTeardown() {
        try {
            if (this.datasetService != null) {
                this.datasetService.close();
                this.datasetService = null;
            }
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @DoFn.ProcessElement
    public void process(PipelineOptions pipelineOptions, @DoFn.Element KV<String, Operation> kv) throws Exception {
        String str = (String) kv.getKey();
        Operation operation = (Operation) kv.getValue();
        BigQueryServices.DatasetService datasetService = getDatasetService(pipelineOptions);
        long j = operation.flushOffset;
        if (j >= 0) {
            Instant now = Instant.now();
            RetryManager retryManager = new RetryManager(Duration.standardSeconds(1L), Duration.standardMinutes(1L), 3);
            retryManager.addOperation(context -> {
                try {
                    this.flushOperationsSent.inc();
                    return datasetService.flush(str, j);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }, iterable -> {
                ApiException error = ((RetryManager.Operation.Context) Iterables.getFirst(iterable, (Object) null)).getError();
                LOG.warn("Flush of stream " + str + " to offset " + j + " failed with " + error);
                this.flushOperationsFailed.inc();
                if (error instanceof ApiException) {
                    StatusCode.Code code = error.getStatusCode().getCode();
                    if (code.equals(StatusCode.Code.ALREADY_EXISTS)) {
                        this.flushOperationsAlreadyExists.inc();
                        return RetryManager.RetryType.DONT_RETRY;
                    }
                    if (code.equals(StatusCode.Code.INVALID_ARGUMENT)) {
                        this.flushOperationsInvalidArgument.inc();
                        return RetryManager.RetryType.DONT_RETRY;
                    }
                }
                return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
            }, context2 -> {
                this.flushOperationsSucceeded.inc();
            }, new RetryManager.Operation.Context());
            retryManager.run(true);
            this.flushLatencyDistribution.update(java.time.Duration.between(now, Instant.now()).toMillis());
        }
        if (operation.finalizeStream) {
            RetryManager retryManager2 = new RetryManager(Duration.standardSeconds(1L), Duration.standardMinutes(1L), 3);
            retryManager2.addOperation(context3 -> {
                this.finalizeOperationsSent.inc();
                return datasetService.finalizeWriteStream(str);
            }, iterable2 -> {
                LOG.warn("Finalize of stream " + str + " failed with " + ((RetryManager.Operation.Context) Iterables.getFirst(iterable2, (Object) null)).getError());
                this.finalizeOperationsFailed.inc();
                return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
            }, context4 -> {
                this.finalizeOperationsSucceeded.inc();
            }, new RetryManager.Operation.Context());
            retryManager2.run(true);
        }
    }
}
