/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.v3;

import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.broker.admin.impl.TransactionsBase;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.shade.io.swagger.annotations.Api;
import org.apache.pulsar.shade.io.swagger.annotations.ApiOperation;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponse;
import org.apache.pulsar.shade.io.swagger.annotations.ApiResponses;
import org.apache.pulsar.shade.javax.ws.rs.Consumes;
import org.apache.pulsar.shade.javax.ws.rs.DefaultValue;
import org.apache.pulsar.shade.javax.ws.rs.Encoded;
import org.apache.pulsar.shade.javax.ws.rs.GET;
import org.apache.pulsar.shade.javax.ws.rs.POST;
import org.apache.pulsar.shade.javax.ws.rs.Path;
import org.apache.pulsar.shade.javax.ws.rs.PathParam;
import org.apache.pulsar.shade.javax.ws.rs.Produces;
import org.apache.pulsar.shade.javax.ws.rs.QueryParam;
import org.apache.pulsar.shade.javax.ws.rs.container.AsyncResponse;
import org.apache.pulsar.shade.javax.ws.rs.container.Suspended;
import org.apache.pulsar.shade.javax.ws.rs.core.Response;
import org.apache.pulsar.shade.org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.shade.org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/transactions")
@Produces(value={"application/json"})
@Consumes(value={"application/json"})
@Api(value="/transactions", description="Transactions admin apis", tags={"transactions"})
public class Transactions
extends TransactionsBase {
    private static final Logger log = LoggerFactory.getLogger(Transactions.class);

    @GET
    @Path(value="/coordinatorStats")
    @ApiOperation(value="Get transaction coordinator stats.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=404, message="Transaction coordinator not found"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getCoordinatorStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @QueryParam(value="coordinatorId") Integer coordinatorId) {
        this.checkTransactionCoordinatorEnabled();
        this.internalGetCoordinatorStats(asyncResponse, authoritative, coordinatorId);
    }

    @GET
    @Path(value="/transactionInBufferStats/{tenant}/{namespace}/{topic}/{mostSigBits}/{leastSigBits}")
    @ApiOperation(value="Get transaction state in transaction buffer.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getTransactionInBufferStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @PathParam(value="mostSigBits") String mostSigBits, @PathParam(value="leastSigBits") String leastSigBits) {
        try {
            this.checkTransactionCoordinatorEnabled();
            this.validateTopicName(tenant, namespace, encodedTopic);
            ((CompletableFuture)this.internalGetTransactionInBufferStats(authoritative, Long.parseLong(mostSigBits), Long.parseLong(leastSigBits)).thenAccept(asyncResponse::resume)).exceptionally(ex -> {
                if (!Transactions.isRedirectException(ex)) {
                    log.error("[{}] Failed to get transaction state in transaction buffer {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
        catch (Exception ex2) {
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex2);
        }
    }

    @GET
    @Path(value="/transactionInPendingAckStats/{tenant}/{namespace}/{topic}/{subName}/{mostSigBits}/{leastSigBits}")
    @ApiOperation(value="Get transaction state in pending ack.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getTransactionInPendingAckStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @PathParam(value="mostSigBits") String mostSigBits, @PathParam(value="leastSigBits") String leastSigBits, @PathParam(value="subName") String subName) {
        try {
            this.checkTransactionCoordinatorEnabled();
            this.validateTopicName(tenant, namespace, encodedTopic);
            ((CompletableFuture)this.internalGetTransactionInPendingAckStats(authoritative, Long.parseLong(mostSigBits), Long.parseLong(leastSigBits), subName).thenAccept(asyncResponse::resume)).exceptionally(ex -> {
                if (!Transactions.isRedirectException(ex)) {
                    log.error("[{}] Failed to get transaction state in pending ack {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
        catch (Exception ex2) {
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex2);
        }
    }

    @GET
    @Path(value="/transactionBufferStats/{tenant}/{namespace}/{topic}")
    @ApiOperation(value="Get transaction buffer stats in topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getTransactionBufferStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @QueryParam(value="lowWaterMarks") @DefaultValue(value="false") boolean lowWaterMarks) {
        try {
            this.checkTransactionCoordinatorEnabled();
            this.validateTopicName(tenant, namespace, encodedTopic);
            ((CompletableFuture)this.internalGetTransactionBufferStats(authoritative, lowWaterMarks).thenAccept(asyncResponse::resume)).exceptionally(ex -> {
                if (!Transactions.isRedirectException(ex)) {
                    log.error("[{}] Failed to get transaction buffer stats in topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
        catch (Exception ex2) {
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex2);
        }
    }

    @GET
    @Path(value="/pendingAckStats/{tenant}/{namespace}/{topic}/{subName}")
    @ApiOperation(value="Get transaction pending ack stats in topic.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic or subName doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getPendingAckStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @PathParam(value="subName") String subName, @QueryParam(value="lowWaterMarks") @DefaultValue(value="false") boolean lowWaterMarks) {
        try {
            this.checkTransactionCoordinatorEnabled();
            this.validateTopicName(tenant, namespace, encodedTopic);
            ((CompletableFuture)this.internalGetPendingAckStats(authoritative, subName, lowWaterMarks).thenAccept(asyncResponse::resume)).exceptionally(ex -> {
                if (!Transactions.isRedirectException(ex)) {
                    log.error("[{}] Failed to get transaction pending ack stats in topic {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
        catch (Exception ex2) {
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex2);
        }
    }

    @GET
    @Path(value="/transactionMetadata/{mostSigBits}/{leastSigBits}")
    @ApiOperation(value="Get transaction metadata")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic or coordinator or transaction doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getTransactionMetadata(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="mostSigBits") String mostSigBits, @PathParam(value="leastSigBits") String leastSigBits) {
        this.checkTransactionCoordinatorEnabled();
        this.internalGetTransactionMetadata(asyncResponse, authoritative, Integer.parseInt(mostSigBits), Long.parseLong(leastSigBits));
    }

    @GET
    @Path(value="/slowTransactions/{timeout}")
    @ApiOperation(value="Get slow transactions.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic or coordinator or transaction doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic don't owner by this broker!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getSlowTransactions(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="timeout") String timeout, @QueryParam(value="coordinatorId") Integer coordinatorId) {
        this.checkTransactionCoordinatorEnabled();
        this.internalGetSlowTransactions(asyncResponse, authoritative, Long.parseLong(timeout), coordinatorId);
    }

    @GET
    @Path(value="/coordinatorInternalStats/{coordinatorId}")
    @ApiOperation(value="Get coordinator internal stats.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=404, message="Transaction coordinator not found"), @ApiResponse(code=405, message="Broker don't use MLTransactionMetadataStore!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getCoordinatorInternalStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="coordinatorId") String coordinatorId, @QueryParam(value="metadata") @DefaultValue(value="false") boolean metadata) {
        this.checkTransactionCoordinatorEnabled();
        this.internalGetCoordinatorInternalStats(asyncResponse, authoritative, metadata, Integer.parseInt(coordinatorId));
    }

    @GET
    @Path(value="/pendingAckInternalStats/{tenant}/{namespace}/{topic}/{subName}")
    @ApiOperation(value="Get transaction pending ack internal stats.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic or subscription name doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=405, message="Pending ack handle don't use managedLedger!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getPendingAckInternalStats(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @PathParam(value="subName") String subName, @QueryParam(value="metadata") @DefaultValue(value="false") boolean metadata) {
        try {
            this.checkTransactionCoordinatorEnabled();
            this.validateTopicName(tenant, namespace, encodedTopic);
            ((CompletableFuture)this.internalGetPendingAckInternalStats(authoritative, subName, metadata).thenAccept(asyncResponse::resume)).exceptionally(ex -> {
                Throwable cause;
                if (!Transactions.isRedirectException(ex)) {
                    log.error("[{}] Failed to get pending ack internal stats {}", new Object[]{this.clientAppId(), this.topicName, ex});
                }
                if ((cause = FutureUtil.unwrapCompletionException(ex)) instanceof BrokerServiceException.ServiceUnitNotReadyException) {
                    asyncResponse.resume(new RestException(Response.Status.SERVICE_UNAVAILABLE, cause));
                } else if (cause instanceof BrokerServiceException.NotAllowedException) {
                    asyncResponse.resume(new RestException(Response.Status.METHOD_NOT_ALLOWED, cause));
                } else if (cause instanceof BrokerServiceException.SubscriptionNotFoundException) {
                    asyncResponse.resume(new RestException(Response.Status.NOT_FOUND, cause));
                } else {
                    asyncResponse.resume(new RestException(cause));
                }
                return null;
            });
        }
        catch (Exception ex2) {
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex2);
        }
    }

    @POST
    @Path(value="/transactionCoordinator/replicas")
    @ApiResponses(value={@ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=406, message="The number of replicas should be more than the current number of transaction coordinator replicas"), @ApiResponse(code=401, message="This operation requires super-user access")})
    public void scaleTransactionCoordinators(@Suspended AsyncResponse asyncResponse, int replicas) {
        try {
            this.checkTransactionCoordinatorEnabled();
            ((CompletableFuture)this.internalScaleTransactionCoordinators(replicas).thenRun(() -> asyncResponse.resume(Response.noContent().build()))).exceptionally(e -> {
                Transactions.resumeAsyncResponseExceptionally(asyncResponse, e);
                return null;
            });
        }
        catch (Exception e2) {
            log.warn("{} Failed to update the scale of transaction coordinators", (Object)this.clientAppId());
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, e2);
        }
    }

    @GET
    @Path(value="/positionStatsInPendingAck/{tenant}/{namespace}/{topic}/{subName}/{ledgerId}/{entryId}")
    @ApiOperation(value="Get position stats in pending ack.")
    @ApiResponses(value={@ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Tenant or cluster or namespace or topic or subscription name doesn't exist"), @ApiResponse(code=503, message="This Broker is not configured with transactionCoordinatorEnabled=true."), @ApiResponse(code=307, message="Topic is not owned by this broker!"), @ApiResponse(code=405, message="Pending ack handle don't use managedLedger!"), @ApiResponse(code=400, message="Topic is not a persistent topic!"), @ApiResponse(code=409, message="Concurrent modification")})
    public void getPositionStatsInPendingAck(@Suspended AsyncResponse asyncResponse, @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @PathParam(value="tenant") String tenant, @PathParam(value="namespace") String namespace, @PathParam(value="topic") @Encoded String encodedTopic, @PathParam(value="subName") String subName, @PathParam(value="ledgerId") Long ledgerId, @PathParam(value="entryId") Long entryId, @QueryParam(value="batchIndex") Integer batchIndex) {
        try {
            this.checkTransactionCoordinatorEnabled();
            this.validateTopicName(tenant, namespace, encodedTopic);
            PositionImpl position = new PositionImpl(ledgerId, entryId);
            ((CompletableFuture)this.internalGetPositionStatsPendingAckStats(authoritative, subName, position, batchIndex).thenAccept(asyncResponse::resume)).exceptionally(ex -> {
                log.warn("{} Failed to check position [{}] stats for topic [{}], subscription [{}]", new Object[]{this.clientAppId(), position, this.topicName, subName, ex});
                Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex);
                return null;
            });
        }
        catch (Exception ex2) {
            log.warn("Failed to get position stats in pending ack", (Throwable)ex2);
            Transactions.resumeAsyncResponseExceptionally(asyncResponse, ex2);
        }
    }
}

