package org.apache.accumulo.tserver.replication;

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.File;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.security.tokens.AuthenticationToken;
import org.apache.accumulo.core.client.security.tokens.KerberosToken;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.clientImpl.ClientContext;
import org.apache.accumulo.core.clientImpl.ClientInfo;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.ClientProperty;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.crypto.CryptoEnvironmentImpl;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.protobuf.ProtobufUtil;
import org.apache.accumulo.core.replication.ReplicationTarget;
import org.apache.accumulo.core.replication.thrift.ReplicationServicer;
import org.apache.accumulo.core.replication.thrift.WalEdits;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.rpc.clients.ThriftClientTypes;
import org.apache.accumulo.core.securityImpl.thrift.TCredentials;
import org.apache.accumulo.core.singletons.SingletonReservation;
import org.apache.accumulo.core.spi.crypto.CryptoEnvironment;
import org.apache.accumulo.core.trace.TraceUtil;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.replication.ReplicaSystem;
import org.apache.accumulo.server.replication.ReplicaSystemHelper;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.tserver.log.DfsLogger;
import org.apache.accumulo.tserver.logger.LogFileKey;
import org.apache.accumulo.tserver.logger.LogFileValue;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.thrift.TException;
import org.apache.thrift.TServiceClient;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Deprecated
/* loaded from: input_file:org/apache/accumulo/tserver/replication/AccumuloReplicaSystem.class */
public class AccumuloReplicaSystem implements ReplicaSystem {
    private static final Logger log = LoggerFactory.getLogger(AccumuloReplicaSystem.class);
    private static final String RFILE_SUFFIX = ".rf";
    private String instanceName;
    private String zookeepers;
    private AccumuloConfiguration conf;
    private ServerContext context;

    /* JADX INFO: Access modifiers changed from: protected */
    @Deprecated
    /* loaded from: input_file:org/apache/accumulo/tserver/replication/AccumuloReplicaSystem$RFileClientExecReturn.class */
    public static class RFileClientExecReturn implements ThriftClientTypes.Exec<ReplicationStats, ReplicationServicer.Client> {
        protected RFileClientExecReturn() {
        }

        public ReplicationStats execute(ReplicationServicer.Client client) {
            return new ReplicationStats(0L, 0L, 0L);
        }
    }

    protected void setConf(AccumuloConfiguration accumuloConfiguration) {
        this.conf = accumuloConfiguration;
    }

    public static String buildConfiguration(String str, String str2) {
        return str + "," + str2;
    }

    public void configure(ServerContext serverContext, String str) {
        Objects.requireNonNull(str);
        int indexOf = str.indexOf(44);
        if (indexOf == -1) {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            throw new IllegalArgumentException("Expected comma in configuration string");
        }
        this.instanceName = str.substring(0, indexOf);
        this.zookeepers = str.substring(indexOf + 1);
        this.conf = serverContext.getConfiguration();
        this.context = serverContext;
    }

    @SuppressFBWarnings(value = {"PATH_TRAVERSAL_IN"}, justification = "path provided by admin")
    public Replication.Status replicate(Path path, Replication.Status status, ReplicationTarget replicationTarget, ReplicaSystemHelper replicaSystemHelper) {
        File file;
        String password;
        AccumuloConfiguration accumuloConfiguration = this.conf;
        log.debug("Replication RPC timeout is {}", accumuloConfiguration.get(Property.REPLICATION_RPC_TIMEOUT));
        String principal = getPrincipal(accumuloConfiguration, replicationTarget);
        if (accumuloConfiguration.getBoolean(Property.INSTANCE_RPC_SASL_ENABLED)) {
            String keytab = getKeytab(accumuloConfiguration, replicationTarget);
            file = new File(keytab);
            if (!file.exists() || !file.isFile()) {
                log.error("{} is not a regular file. Cannot login to replicate", keytab);
                return status;
            }
            password = null;
        } else {
            file = null;
            password = getPassword(accumuloConfiguration, replicationTarget);
        }
        if (file == null) {
            return _replicate(path, status, replicationTarget, replicaSystemHelper, accumuloConfiguration, getContextForPeer(accumuloConfiguration, replicationTarget, principal, new PasswordToken(password)), null);
        }
        try {
            UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
            File file2 = file;
            return (Replication.Status) UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, file.getAbsolutePath()).doAs(() -> {
                try {
                    return _replicate(path, status, replicationTarget, replicaSystemHelper, accumuloConfiguration, getContextForPeer(accumuloConfiguration, replicationTarget, principal, new KerberosToken(principal, file2)), currentUser);
                } catch (IOException e) {
                    log.error("Failed to create KerberosToken", e);
                    return status;
                }
            });
        } catch (IOException e) {
            log.error("Failed to perform local login", e);
            return status;
        }
    }

    /* JADX WARN: Finally extract failed */
    private Replication.Status _replicate(Path path, Replication.Status status, ReplicationTarget replicationTarget, ReplicaSystemHelper replicaSystemHelper, AccumuloConfiguration accumuloConfiguration, ClientContext clientContext, UserGroupInformation userGroupInformation) {
        Replication.Status replicateLogs;
        Span startSpan = TraceUtil.startSpan(getClass(), "_replicate");
        try {
            try {
                Scope makeCurrent = startSpan.makeCurrent();
                try {
                    String remoteIdentifier = replicationTarget.getRemoteIdentifier();
                    int count = accumuloConfiguration.getCount(Property.REPLICATION_WORK_ATTEMPTS);
                    for (int i = 0; i < count; i++) {
                        log.debug("Attempt {}", Integer.valueOf(i));
                        log.debug("Fetching peer tserver address");
                        startSpan = TraceUtil.startSpan(getClass(), "_replicate::Fetch peer tserver");
                        try {
                            try {
                                Scope makeCurrent2 = startSpan.makeCurrent();
                                try {
                                    String str = (String) ThriftClientTypes.REPLICATION_COORDINATOR.execute(clientContext, client -> {
                                        return client.getServicerAddress(remoteIdentifier, clientContext.rpcCreds());
                                    });
                                    if (makeCurrent2 != null) {
                                        makeCurrent2.close();
                                    }
                                    startSpan.end();
                                    if (str == null) {
                                        log.warn("Did not receive tserver from manager at {}, cannot proceed with replication. Will retry.", replicationTarget);
                                    } else {
                                        HostAndPort fromString = HostAndPort.fromString(str);
                                        long timeInMillis = accumuloConfiguration.getTimeInMillis(Property.REPLICATION_RPC_TIMEOUT);
                                        long asBytes = this.conf.getAsBytes(Property.REPLICATION_MAX_UNIT_SIZE);
                                        try {
                                            if (path.getName().endsWith(RFILE_SUFFIX)) {
                                                try {
                                                    Span startSpan2 = TraceUtil.startSpan(getClass(), "_replicate::RFile replication");
                                                    try {
                                                        Scope makeCurrent3 = startSpan2.makeCurrent();
                                                        try {
                                                            replicateLogs = replicateRFiles(clientContext, fromString, replicationTarget, path, status, timeInMillis);
                                                            if (makeCurrent3 != null) {
                                                                makeCurrent3.close();
                                                            }
                                                            startSpan2.end();
                                                        } finally {
                                                            if (makeCurrent3 != null) {
                                                                try {
                                                                    makeCurrent3.close();
                                                                } catch (Throwable th) {
                                                                    th.addSuppressed(th);
                                                                }
                                                            }
                                                        }
                                                    } catch (RuntimeException e) {
                                                        TraceUtil.setException(startSpan2, e, true);
                                                        throw e;
                                                    }
                                                } finally {
                                                }
                                            } else {
                                                try {
                                                    Span startSpan3 = TraceUtil.startSpan(getClass(), "_replicate::WAL replication");
                                                    try {
                                                        makeCurrent2 = startSpan3.makeCurrent();
                                                        try {
                                                            replicateLogs = replicateLogs(clientContext, fromString, replicationTarget, path, status, asBytes, remoteIdentifier, clientContext.rpcCreds(), replicaSystemHelper, userGroupInformation, timeInMillis);
                                                            if (makeCurrent2 != null) {
                                                                makeCurrent2.close();
                                                            }
                                                            startSpan3.end();
                                                        } finally {
                                                            if (makeCurrent2 != null) {
                                                                try {
                                                                    makeCurrent2.close();
                                                                } catch (Throwable th2) {
                                                                    th.addSuppressed(th2);
                                                                }
                                                            }
                                                        }
                                                    } catch (RuntimeException e2) {
                                                        TraceUtil.setException(startSpan3, e2, true);
                                                        throw e2;
                                                    }
                                                } finally {
                                                }
                                            }
                                            log.debug("New status for {} after replicating to {} is {}", new Object[]{path, clientContext.getInstanceName(), ProtobufUtil.toString(replicateLogs)});
                                            Replication.Status status2 = replicateLogs;
                                            if (makeCurrent != null) {
                                                makeCurrent.close();
                                            }
                                            startSpan.end();
                                            return status2;
                                        } catch (TTransportException | AccumuloException | AccumuloSecurityException e3) {
                                            Span e4 = e3;
                                            log.warn("Could not connect to remote server {}, will retry", str, e4);
                                            TraceUtil.setException(startSpan, e4, false);
                                            UtilWaitThread.sleepUninterruptibly(1L, TimeUnit.SECONDS);
                                        }
                                    }
                                } catch (Throwable th3) {
                                    throw th3;
                                    break;
                                }
                            } catch (Throwable th4) {
                                throw th4;
                            }
                        } catch (AccumuloException | AccumuloSecurityException e5) {
                            log.error("Could not connect to manager at {}, cannot proceed with replication. Will retry", replicationTarget, e5);
                            TraceUtil.setException(startSpan, e5, false);
                            startSpan.end();
                        } catch (RuntimeException e6) {
                            TraceUtil.setException(startSpan, e6, true);
                            throw e6;
                        }
                    }
                    log.info("No progress was made after {} attempts to replicate {}, returning so file can be re-queued", Integer.valueOf(count), path);
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                    startSpan.end();
                    return status;
                } finally {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    }
                }
            } catch (RuntimeException e7) {
                TraceUtil.setException(startSpan, e7, true);
                throw e7;
            }
        } finally {
            startSpan.end();
        }
    }

    protected Replication.Status replicateRFiles(ClientContext clientContext, HostAndPort hostAndPort, ReplicationTarget replicationTarget, Path path, Replication.Status status, long j) throws TTransportException, AccumuloException, AccumuloSecurityException {
        Replication.Status status2 = status;
        Replication.Status status3 = status;
        while (true) {
            long begin = status3.getBegin() + ((ReplicationStats) executeServicerWithReturn(clientContext, hostAndPort, new RFileClientExecReturn(), j)).entriesConsumed;
            if (begin < 0) {
                begin = Long.MAX_VALUE;
            }
            status3 = Replication.Status.newBuilder(status3).setBegin(begin).build();
            log.debug("Sent batch for replication of {} to {}, with new Status {}", new Object[]{path, replicationTarget, ProtobufUtil.toString(status3)});
            if (status3.equals(status2)) {
                log.debug("Did not replicate any new data for {} to {}, (state was {})", new Object[]{path, replicationTarget, ProtobufUtil.toString(status2)});
                return status;
            }
            if (!StatusUtil.isWorkRequired(status3)) {
                return status3;
            }
            status2 = status3;
        }
    }

    /* JADX WARN: Finally extract failed */
    protected Replication.Status replicateLogs(ClientContext clientContext, HostAndPort hostAndPort, ReplicationTarget replicationTarget, Path path, Replication.Status status, long j, String str, TCredentials tCredentials, ReplicaSystemHelper replicaSystemHelper, UserGroupInformation userGroupInformation, long j2) throws TTransportException, AccumuloException, AccumuloSecurityException {
        log.debug("Replication WAL to peer tserver");
        try {
            FSDataInputStream open = this.context.getVolumeManager().open(path);
            try {
                DataInputStream walStream = getWalStream(path, open);
                try {
                    log.debug("Skipping unwanted data in WAL");
                    Span startSpan = TraceUtil.startSpan(getClass(), "replicateLogs::Consume WAL prefix");
                    try {
                        try {
                            Scope makeCurrent = startSpan.makeCurrent();
                            try {
                                startSpan.setAttribute("file", path.toString());
                                Set<Integer> consumeWalPrefix = consumeWalPrefix(replicationTarget, walStream, status);
                                if (makeCurrent != null) {
                                    makeCurrent.close();
                                }
                                startSpan.end();
                                log.debug("Sending batches of data to peer tserver");
                                Replication.Status status2 = status;
                                Replication.Status status3 = status;
                                AtomicReference atomicReference = new AtomicReference();
                                while (true) {
                                    startSpan = TraceUtil.startSpan(getClass(), "replicateLogs::Replicate WAL batch");
                                    try {
                                        try {
                                            makeCurrent = startSpan.makeCurrent();
                                            try {
                                                startSpan.setAttribute("Batch size (bytes)", Long.toString(j));
                                                startSpan.setAttribute("File", path.toString());
                                                startSpan.setAttribute("Peer instance name", clientContext.getInstanceName());
                                                startSpan.setAttribute("Peer tserver", hostAndPort.toString());
                                                startSpan.setAttribute("Remote table ID", str);
                                                ReplicationStats replicationStats = (ReplicationStats) executeServicerWithReturn(clientContext, hostAndPort, new WalClientExecReturn(this, replicationTarget, walStream, path, status3, j, str, tCredentials, consumeWalPrefix), j2);
                                                if (makeCurrent != null) {
                                                    makeCurrent.close();
                                                }
                                                startSpan.end();
                                                long begin = status3.getBegin() + replicationStats.entriesConsumed;
                                                if (begin < 0) {
                                                    begin = Long.MAX_VALUE;
                                                }
                                                status3 = Replication.Status.newBuilder(status3).setBegin(begin).build();
                                                log.debug("Sent batch for replication of {} to {}, with new Status {}", new Object[]{path, replicationTarget, ProtobufUtil.toString(status3)});
                                                if (status3.equals(status2)) {
                                                    log.debug("Did not replicate any new data for {} to {}, (state was {})", new Object[]{path, replicationTarget, ProtobufUtil.toString(status2)});
                                                    if (walStream != null) {
                                                        walStream.close();
                                                    }
                                                    if (open != null) {
                                                        open.close();
                                                    }
                                                    return status;
                                                }
                                                Span startSpan2 = TraceUtil.startSpan(getClass(), "replicateLogs::Update replication table");
                                                try {
                                                    try {
                                                        Scope makeCurrent2 = startSpan2.makeCurrent();
                                                        if (userGroupInformation != null) {
                                                            try {
                                                                userGroupInformation.doAs(() -> {
                                                                    try {
                                                                        replicaSystemHelper.recordNewStatus(path, status3, replicationTarget);
                                                                        return null;
                                                                    } catch (TableNotFoundException | AccumuloException | RuntimeException e) {
                                                                        atomicReference.set(e);
                                                                        return null;
                                                                    }
                                                                });
                                                                TableNotFoundException tableNotFoundException = (Exception) atomicReference.get();
                                                                if (tableNotFoundException != null) {
                                                                    if (tableNotFoundException instanceof TableNotFoundException) {
                                                                        throw tableNotFoundException;
                                                                    }
                                                                    if (tableNotFoundException instanceof AccumuloSecurityException) {
                                                                        throw ((AccumuloSecurityException) tableNotFoundException);
                                                                    }
                                                                    if (tableNotFoundException instanceof AccumuloException) {
                                                                        throw ((AccumuloException) tableNotFoundException);
                                                                    }
                                                                    throw new RuntimeException("Received unexpected exception", tableNotFoundException);
                                                                }
                                                            } finally {
                                                                if (makeCurrent2 != null) {
                                                                    try {
                                                                        makeCurrent2.close();
                                                                    } catch (Throwable th) {
                                                                        th.addSuppressed(th);
                                                                    }
                                                                }
                                                            }
                                                        } else {
                                                            replicaSystemHelper.recordNewStatus(path, status3, replicationTarget);
                                                        }
                                                        if (makeCurrent2 != null) {
                                                            makeCurrent2.close();
                                                        }
                                                        startSpan2.end();
                                                        log.debug("Recorded updated status for {}: {}", path, ProtobufUtil.toString(status3));
                                                        if (!StatusUtil.isWorkRequired(status3)) {
                                                            if (walStream != null) {
                                                                walStream.close();
                                                            }
                                                            if (open != null) {
                                                                open.close();
                                                            }
                                                            return status3;
                                                        }
                                                        status2 = status3;
                                                    } finally {
                                                        startSpan2.end();
                                                    }
                                                } catch (TableNotFoundException e) {
                                                    log.error("Tried to update status in replication table for {} as {}, but the table did not exist", new Object[]{path, ProtobufUtil.toString(status3), e});
                                                    TraceUtil.setException(startSpan2, e, true);
                                                    throw new RuntimeException("Replication table did not exist, will retry", e);
                                                } catch (RuntimeException e2) {
                                                    TraceUtil.setException(startSpan2, e2, true);
                                                    throw e2;
                                                }
                                            } finally {
                                            }
                                        } catch (RuntimeException e3) {
                                            log.error("Caught exception replicating data to {} at {}", new Object[]{clientContext.getInstanceName(), hostAndPort, e3});
                                            TraceUtil.setException(startSpan, e3, true);
                                            throw e3;
                                        }
                                    } finally {
                                        startSpan.end();
                                    }
                                }
                            } finally {
                            }
                        } catch (Throwable th2) {
                            startSpan.end();
                            throw th2;
                        }
                    } catch (IOException e4) {
                        log.warn("Unexpected error consuming file.");
                        TraceUtil.setException(startSpan, e4, false);
                        if (walStream != null) {
                            walStream.close();
                        }
                        if (open != null) {
                            open.close();
                        }
                        return status;
                    } catch (RuntimeException e5) {
                        TraceUtil.setException(startSpan, e5, true);
                        throw e5;
                    }
                } catch (Throwable th3) {
                    if (walStream != null) {
                        try {
                            walStream.close();
                        } catch (Throwable th4) {
                            th3.addSuppressed(th4);
                        }
                    }
                    throw th3;
                }
            } catch (Throwable th5) {
                if (open != null) {
                    try {
                        open.close();
                    } catch (Throwable th6) {
                        th5.addSuppressed(th6);
                    }
                }
                throw th5;
            }
        } catch (IOException e6) {
            log.error("Could not create stream for WAL", e6);
            return status;
        } catch (DfsLogger.LogHeaderIncompleteException e7) {
            log.warn("Could not read header from {}, assuming that there is no data present in the WAL, therefore replication is complete", path);
            Replication.Status build = status.getInfiniteEnd() ? Replication.Status.newBuilder(status).setBegin(Long.MAX_VALUE).build() : Replication.Status.newBuilder(status).setBegin(status.getEnd()).build();
            Span startSpan3 = TraceUtil.startSpan(getClass(), "replicateLogs::Update replication table");
            try {
                try {
                    Scope makeCurrent3 = startSpan3.makeCurrent();
                    try {
                        replicaSystemHelper.recordNewStatus(path, build, replicationTarget);
                        if (makeCurrent3 != null) {
                            makeCurrent3.close();
                        }
                        startSpan3.end();
                        return build;
                    } finally {
                        if (makeCurrent3 != null) {
                            try {
                                makeCurrent3.close();
                            } catch (Throwable th7) {
                                th.addSuppressed(th7);
                            }
                        }
                    }
                } finally {
                    startSpan3.end();
                }
            } catch (TableNotFoundException e8) {
                log.error("Tried to update status in replication table for {} as {}, but the table did not exist", new Object[]{path, ProtobufUtil.toString(build), e7});
                TraceUtil.setException(startSpan3, e7, true);
                throw new RuntimeException("Replication table did not exist, will retry", e7);
            } catch (RuntimeException e9) {
                TraceUtil.setException(startSpan3, e7, true);
                throw e9;
            }
        }
    }

    protected String getPassword(AccumuloConfiguration accumuloConfiguration, ReplicationTarget replicationTarget) {
        Objects.requireNonNull(accumuloConfiguration);
        Objects.requireNonNull(replicationTarget);
        String str = (String) accumuloConfiguration.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_PASSWORD).get(Property.REPLICATION_PEER_PASSWORD.getKey() + replicationTarget.getPeerName());
        if (str == null) {
            throw new IllegalArgumentException("Cannot get password for " + replicationTarget.getPeerName());
        }
        return str;
    }

    protected String getKeytab(AccumuloConfiguration accumuloConfiguration, ReplicationTarget replicationTarget) {
        Objects.requireNonNull(accumuloConfiguration);
        Objects.requireNonNull(replicationTarget);
        String str = (String) accumuloConfiguration.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_KEYTAB).get(Property.REPLICATION_PEER_KEYTAB.getKey() + replicationTarget.getPeerName());
        if (str == null) {
            throw new IllegalArgumentException("Cannot get keytab for " + replicationTarget.getPeerName());
        }
        return str;
    }

    protected String getPrincipal(AccumuloConfiguration accumuloConfiguration, ReplicationTarget replicationTarget) {
        Objects.requireNonNull(accumuloConfiguration);
        Objects.requireNonNull(replicationTarget);
        String str = (String) accumuloConfiguration.getAllPropertiesWithPrefix(Property.REPLICATION_PEER_USER).get(Property.REPLICATION_PEER_USER.getKey() + replicationTarget.getPeerName());
        if (str == null) {
            throw new IllegalArgumentException("Cannot get user for " + replicationTarget.getPeerName());
        }
        return str;
    }

    protected ClientContext getContextForPeer(AccumuloConfiguration accumuloConfiguration, ReplicationTarget replicationTarget, String str, AuthenticationToken authenticationToken) {
        Objects.requireNonNull(accumuloConfiguration);
        Objects.requireNonNull(replicationTarget);
        Objects.requireNonNull(str);
        Objects.requireNonNull(authenticationToken);
        Properties properties = new Properties();
        properties.setProperty(ClientProperty.INSTANCE_NAME.getKey(), this.instanceName);
        properties.setProperty(ClientProperty.INSTANCE_ZOOKEEPERS.getKey(), this.zookeepers);
        properties.setProperty(ClientProperty.AUTH_PRINCIPAL.getKey(), str);
        ClientProperty.setAuthenticationToken(properties, authenticationToken);
        return new ClientContext(SingletonReservation.noop(), ClientInfo.from(properties, authenticationToken), accumuloConfiguration, Threads.UEH);
    }

    protected Set<Integer> consumeWalPrefix(ReplicationTarget replicationTarget, DataInputStream dataInputStream, Replication.Status status) throws IOException {
        LogFileKey logFileKey = new LogFileKey();
        LogFileValue logFileValue = new LogFileValue();
        HashSet hashSet = new HashSet();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= status.getBegin()) {
                return hashSet;
            }
            logFileKey.readFields(dataInputStream);
            logFileValue.readFields(dataInputStream);
            switch (logFileKey.event) {
                case DEFINE_TABLET:
                    if (!replicationTarget.getSourceTableId().equals(logFileKey.tablet.tableId())) {
                        break;
                    } else {
                        hashSet.add(Integer.valueOf(logFileKey.tabletId));
                        break;
                    }
            }
            j = j2 + 1;
        }
    }

    public DataInputStream getWalStream(Path path, FSDataInputStream fSDataInputStream) throws DfsLogger.LogHeaderIncompleteException, IOException {
        Span startSpan = TraceUtil.startSpan(getClass(), "getWalStream::Read WAL header");
        try {
            try {
                Scope makeCurrent = startSpan.makeCurrent();
                try {
                    startSpan.setAttribute("file", path.toString());
                    DataInputStream decryptingStream = DfsLogger.getDecryptingStream(fSDataInputStream, this.context.getCryptoFactory().getService(new CryptoEnvironmentImpl(CryptoEnvironment.Scope.WAL), this.conf.getAllCryptoProperties()));
                    if (makeCurrent != null) {
                        makeCurrent.close();
                    }
                    return decryptingStream;
                } catch (Throwable th) {
                    if (makeCurrent != null) {
                        try {
                            makeCurrent.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    }
                    throw th;
                }
            } catch (RuntimeException e) {
                TraceUtil.setException(startSpan, e, true);
                throw e;
            }
        } finally {
            startSpan.end();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public WalReplication getWalEdits(ReplicationTarget replicationTarget, DataInputStream dataInputStream, Path path, Replication.Status status, long j, Set<Integer> set) {
        WalEdits walEdits = new WalEdits();
        walEdits.edits = new ArrayList();
        long j2 = 0;
        long j3 = 0;
        long j4 = 0;
        LogFileKey logFileKey = new LogFileKey();
        LogFileValue logFileValue = new LogFileValue();
        while (j2 < j) {
            try {
                logFileKey.readFields(dataInputStream);
                logFileValue.readFields(dataInputStream);
                j3++;
                switch (AnonymousClass1.$SwitchMap$org$apache$accumulo$tserver$logger$LogEvents[logFileKey.event.ordinal()]) {
                    case 1:
                        if (!replicationTarget.getSourceTableId().equals(logFileKey.tablet.tableId())) {
                            break;
                        } else {
                            set.add(Integer.valueOf(logFileKey.tabletId));
                            break;
                        }
                    case LogFileKey.VERSION /* 2 */:
                    case 3:
                        if (set.contains(Integer.valueOf(logFileKey.tabletId))) {
                            try {
                                ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                                try {
                                    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
                                    try {
                                        logFileKey.write(dataOutputStream);
                                        j4 += writeValueAvoidingReplicationCycles(dataOutputStream, logFileValue, replicationTarget);
                                        dataOutputStream.flush();
                                        byte[] byteArray = byteArrayOutputStream.toByteArray();
                                        dataOutputStream.close();
                                        byteArrayOutputStream.close();
                                        j2 += byteArray.length;
                                        walEdits.addToEdits(ByteBuffer.wrap(byteArray));
                                        break;
                                    } finally {
                                    }
                                } finally {
                                }
                            } catch (IOException e) {
                                log.debug("Unexpected IOException writing to a byte array output stream", e);
                                throw new UncheckedIOException(e);
                            }
                        } else {
                            continue;
                        }
                    default:
                        log.trace("Ignoring WAL entry which doesn't contain mutations, should not have received such entries");
                        break;
                }
            } catch (EOFException e2) {
                log.debug("Caught EOFException reading {}", path, e2);
                if (status.getInfiniteEnd() && status.getClosed()) {
                    log.debug("{} is closed and has unknown length, assuming entire file has been consumed", path);
                    j3 = Long.MAX_VALUE;
                }
            } catch (IOException e3) {
                log.debug("Unexpected IOException reading {}", path, e3);
                throw new UncheckedIOException(e3);
            }
        }
        return new WalReplication(walEdits, j2, j3, j4);
    }

    protected long writeValueAvoidingReplicationCycles(DataOutputStream dataOutputStream, LogFileValue logFileValue, ReplicationTarget replicationTarget) throws IOException {
        int i = 0;
        Iterator<Mutation> it = logFileValue.mutations.iterator();
        while (it.hasNext()) {
            if (!it.next().getReplicationSources().contains(replicationTarget.getPeerName())) {
                i++;
            }
        }
        int size = logFileValue.mutations.size() - i;
        if (size > 0) {
            log.debug("Removing {} mutations from WAL entry as they have already been replicated to {}", Integer.valueOf(size), replicationTarget.getPeerName());
        }
        String str = this.conf.get(Property.REPLICATION_NAME);
        if (str.isBlank()) {
            throw new IllegalArgumentException("Local system has no replication name configured");
        }
        dataOutputStream.writeInt(i);
        for (Mutation mutation : logFileValue.mutations) {
            if (!mutation.getReplicationSources().contains(replicationTarget.getPeerName())) {
                mutation.addReplicationSource(str);
                mutation.write(dataOutputStream);
            }
        }
        return i;
    }

    private static <T> T executeServicerWithReturn(ClientContext clientContext, HostAndPort hostAndPort, ThriftClientTypes.Exec<T, ReplicationServicer.Client> exec, long j) throws AccumuloException, AccumuloSecurityException {
        TServiceClient tServiceClient = null;
        try {
            try {
                tServiceClient = (ReplicationServicer.Client) ThriftUtil.getClient(ThriftClientTypes.REPLICATION_SERVICER, hostAndPort, clientContext, j);
                T t = (T) exec.execute(tServiceClient);
                if (tServiceClient != null) {
                    ThriftUtil.close(tServiceClient, clientContext);
                }
                return t;
            } catch (ThriftSecurityException e) {
                throw new AccumuloSecurityException(e.user, e.code, e);
            } catch (TException e2) {
                throw new AccumuloException(e2);
            }
        } catch (Throwable th) {
            if (tServiceClient != null) {
                ThriftUtil.close(tServiceClient, clientContext);
            }
            throw th;
        }
    }
}
