package org.apache.accumulo.gc.replication;

import com.google.common.base.Stopwatch;
import com.google.common.net.HostAndPort;
import com.google.protobuf.InvalidProtocolBufferException;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.accumulo.core.client.BatchScanner;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.MutationsRejectedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.master.thrift.MasterClientService;
import org.apache.accumulo.core.metadata.schema.MetadataSchema;
import org.apache.accumulo.core.replication.ReplicationTable;
import org.apache.accumulo.core.rpc.ThriftUtil;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.tabletserver.thrift.TabletClientService;
import org.apache.accumulo.core.trace.Span;
import org.apache.accumulo.core.trace.Trace;
import org.apache.accumulo.core.trace.thrift.TInfo;
import org.apache.accumulo.server.AccumuloServerContext;
import org.apache.accumulo.server.log.WalStateManager;
import org.apache.accumulo.server.replication.StatusUtil;
import org.apache.accumulo.server.replication.proto.Replication;
import org.apache.accumulo.server.zookeeper.ZooReaderWriter;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.thrift.TException;
import org.apache.thrift.transport.TTransportException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/accumulo/gc/replication/CloseWriteAheadLogReferences.class */
public class CloseWriteAheadLogReferences implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(CloseWriteAheadLogReferences.class);
    private static final String RFILE_SUFFIX = ".rf";
    private final AccumuloServerContext context;

    public CloseWriteAheadLogReferences(AccumuloServerContext accumuloServerContext) {
        this.context = accumuloServerContext;
    }

    @Override // java.lang.Runnable
    public void run() {
        Stopwatch stopwatch = new Stopwatch();
        try {
            Connector connector = this.context.getConnector();
            if (!ReplicationTable.isOnline(connector)) {
                log.debug("Replication table isn't online, not attempting to clean up wals");
                return;
            }
            Span start = Trace.start("findReferencedWals");
            try {
                stopwatch.start();
                HashSet<String> closedLogs = getClosedLogs(connector);
                stopwatch.stop();
                start.stop();
                log.info("Found " + closedLogs.size() + " WALs referenced in metadata in " + stopwatch.toString());
                stopwatch.reset();
                start = Trace.start("updateReplicationTable");
                try {
                    stopwatch.start();
                    long updateReplicationEntries = updateReplicationEntries(connector, closedLogs);
                    stopwatch.stop();
                    start.stop();
                    log.info("Closed " + updateReplicationEntries + " WAL replication references in replication table in " + stopwatch.toString());
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            log.error("Could not create connector", e);
            throw new RuntimeException(e);
        }
    }

    protected HashSet<String> getClosedLogs(Connector connector) {
        WalStateManager walStateManager = new WalStateManager(connector.getInstance(), ZooReaderWriter.getInstance());
        HashSet<String> hashSet = new HashSet<>();
        try {
            for (Map.Entry entry : walStateManager.getAllState().entrySet()) {
                if (entry.getValue() == WalStateManager.WalState.UNREFERENCED || entry.getValue() == WalStateManager.WalState.CLOSED) {
                    Path path = (Path) entry.getKey();
                    log.debug("Found closed WAL " + path.toString());
                    hashSet.add(path.toString());
                }
            }
            return hashSet;
        } catch (WalStateManager.WalMarkerException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    protected long updateReplicationEntries(Connector connector, Set<String> set) {
        BatchScanner<Map.Entry> batchScanner = null;
        BatchWriter batchWriter = null;
        long j = 0;
        try {
            try {
                batchWriter = connector.createBatchWriter("accumulo.metadata", new BatchWriterConfig());
                batchScanner = connector.createBatchScanner("accumulo.metadata", Authorizations.EMPTY, 4);
                batchScanner.setRanges(Collections.singleton(Range.prefix(MetadataSchema.ReplicationSection.getRowPrefix())));
                batchScanner.fetchColumnFamily(MetadataSchema.ReplicationSection.COLF);
                Text text = new Text();
                for (Map.Entry entry : batchScanner) {
                    try {
                        Replication.Status parseFrom = Replication.Status.parseFrom(((Value) entry.getValue()).get());
                        MetadataSchema.ReplicationSection.getFile((Key) entry.getKey(), text);
                        String text2 = text.toString();
                        boolean contains = set.contains(text2);
                        if (!parseFrom.getClosed() && !text2.endsWith(RFILE_SUFFIX) && contains) {
                            try {
                                closeWal(batchWriter, (Key) entry.getKey());
                                j++;
                            } catch (MutationsRejectedException e) {
                                log.error("Failed to submit delete mutation for " + entry.getKey());
                            }
                        }
                    } catch (InvalidProtocolBufferException e2) {
                        log.error("Could not parse Status protobuf for {}", entry.getKey(), e2);
                    }
                }
                if (null != batchScanner) {
                    batchScanner.close();
                }
                if (null != batchWriter) {
                    try {
                        batchWriter.close();
                    } catch (MutationsRejectedException e3) {
                        log.error("Failed to write delete mutations for replication table", e3);
                    }
                }
            } catch (TableNotFoundException e4) {
                log.error("Replication table was deleted", e4);
                if (null != batchScanner) {
                    batchScanner.close();
                }
                if (null != batchWriter) {
                    try {
                        batchWriter.close();
                    } catch (MutationsRejectedException e5) {
                        log.error("Failed to write delete mutations for replication table", e5);
                    }
                }
            }
            return j;
        } catch (Throwable th) {
            if (null != batchScanner) {
                batchScanner.close();
            }
            if (null != batchWriter) {
                try {
                    batchWriter.close();
                } catch (MutationsRejectedException e6) {
                    log.error("Failed to write delete mutations for replication table", e6);
                }
            }
            throw th;
        }
    }

    protected void closeWal(BatchWriter batchWriter, Key key) throws MutationsRejectedException {
        log.debug("Closing unreferenced WAL ({}) in metadata table", key.toStringNoTruncate());
        Mutation mutation = new Mutation(key.getRow());
        mutation.put(key.getColumnFamily(), key.getColumnQualifier(), StatusUtil.fileClosedValue());
        batchWriter.addMutation(mutation);
    }

    private HostAndPort getMasterAddress() {
        try {
            List masterLocations = this.context.getInstance().getMasterLocations();
            if (masterLocations.size() == 0) {
                return null;
            }
            return HostAndPort.fromString((String) masterLocations.get(0));
        } catch (Exception e) {
            log.warn("Failed to obtain master host " + e);
            return null;
        }
    }

    private MasterClientService.Client getMasterConnection() {
        HostAndPort masterAddress = getMasterAddress();
        try {
            if (masterAddress != null) {
                return ThriftUtil.getClient(new MasterClientService.Client.Factory(), masterAddress, this.context);
            }
            log.warn("Could not fetch Master address");
            return null;
        } catch (Exception e) {
            log.warn("Issue with masterConnection (" + masterAddress + ") " + e, e);
            return null;
        }
    }

    protected List<String> getActiveTservers(TInfo tInfo) {
        MasterClientService.Client client = null;
        List<String> list = null;
        try {
            try {
                client = getMasterConnection();
                if (null != client) {
                    list = client.getActiveTservers(tInfo, this.context.rpcCreds());
                }
                ThriftUtil.returnClient(client);
                return list;
            } catch (TException e) {
                log.warn("Failed to fetch active tabletservers from the master", e);
                ThriftUtil.returnClient(client);
                return null;
            }
        } catch (Throwable th) {
            ThriftUtil.returnClient(client);
            throw th;
        }
    }

    protected List<String> getActiveWalsForServer(TInfo tInfo, HostAndPort hostAndPort) {
        TabletClientService.Client client = null;
        try {
            try {
                client = ThriftUtil.getClient(new TabletClientService.Client.Factory(), hostAndPort, this.context);
                List<String> activeLogs = client.getActiveLogs(tInfo, this.context.rpcCreds());
                ThriftUtil.returnClient(client);
                return activeLogs;
            } catch (TTransportException e) {
                log.warn("Failed to fetch active write-ahead logs from " + hostAndPort, e);
                ThriftUtil.returnClient(client);
                return null;
            } catch (TException e2) {
                log.warn("Failed to fetch active write-ahead logs from " + hostAndPort, e2);
                ThriftUtil.returnClient(client);
                return null;
            }
        } catch (Throwable th) {
            ThriftUtil.returnClient(client);
            throw th;
        }
    }
}
