package org.apache.accumulo.test.replication;

import com.google.common.collect.Iterables;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.client.BatchWriter;
import org.apache.accumulo.core.client.BatchWriterConfig;
import org.apache.accumulo.core.client.Connector;
import org.apache.accumulo.core.client.IteratorSetting;
import org.apache.accumulo.core.client.admin.NewTableConfiguration;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.LongCombiner;
import org.apache.accumulo.core.iterators.user.SummingCombiner;
import org.apache.accumulo.core.security.Authorizations;
import org.apache.accumulo.core.security.TablePermission;
import org.apache.accumulo.minicluster.ServerType;
import org.apache.accumulo.minicluster.impl.MiniAccumuloClusterImpl;
import org.apache.accumulo.minicluster.impl.MiniAccumuloConfigImpl;
import org.apache.accumulo.minicluster.impl.ProcessReference;
import org.apache.accumulo.minicluster.impl.ZooKeeperBindException;
import org.apache.accumulo.server.replication.ReplicaSystemFactory;
import org.apache.accumulo.test.categories.MiniClusterOnlyTests;
import org.apache.accumulo.test.functional.ConfigurableMacBase;
import org.apache.accumulo.tserver.TabletServer;
import org.apache.accumulo.tserver.replication.AccumuloReplicaSystem;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category({MiniClusterOnlyTests.class})
/* loaded from: input_file:org/apache/accumulo/test/replication/CyclicReplicationIT.class */
public class CyclicReplicationIT {
    private static final Logger log = LoggerFactory.getLogger(CyclicReplicationIT.class);

    @Rule
    public TestName testName = new TestName();

    @Rule
    public Timeout getTimeout() {
        int i = 1;
        try {
            i = Integer.parseInt(System.getProperty("timeout.factor"));
        } catch (NumberFormatException e) {
            log.warn("Could not parse timeout.factor, not scaling timeout");
        }
        return new Timeout(i * 10, TimeUnit.MINUTES);
    }

    private File createTestDir(String str) {
        File file = new File(System.getProperty("user.dir") + "/target/mini-tests");
        Assert.assertTrue(file.mkdirs() || file.isDirectory());
        File file2 = new File(file, getClass().getName() + "_" + this.testName.getMethodName() + "_" + str);
        FileUtils.deleteQuietly(file2);
        Assert.assertTrue(file2.mkdir());
        return file2;
    }

    private void setCoreSite(MiniAccumuloClusterImpl miniAccumuloClusterImpl) throws Exception {
        File file = new File(miniAccumuloClusterImpl.getConfig().getConfDir(), "core-site.xml");
        if (file.exists()) {
            throw new RuntimeException(file + " already exist");
        }
        Configuration configuration = new Configuration(false);
        configuration.set("fs.file.impl", RawLocalFileSystem.class.getName());
        BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(new File(miniAccumuloClusterImpl.getConfig().getConfDir(), "core-site.xml")));
        configuration.writeXml(bufferedOutputStream);
        bufferedOutputStream.close();
    }

    private void updatePeerConfigFromPrimary(MiniAccumuloConfigImpl miniAccumuloConfigImpl, MiniAccumuloConfigImpl miniAccumuloConfigImpl2) {
        Map siteConfig = miniAccumuloConfigImpl.getSiteConfig();
        if ("true".equals(siteConfig.get(Property.INSTANCE_RPC_SSL_ENABLED.getKey()))) {
            HashMap hashMap = new HashMap();
            hashMap.put(Property.INSTANCE_RPC_SSL_ENABLED.getKey(), "true");
            String str = (String) siteConfig.get(Property.RPC_SSL_KEYSTORE_PATH.getKey());
            Assert.assertNotNull("Keystore Path was null", str);
            hashMap.put(Property.RPC_SSL_KEYSTORE_PATH.getKey(), str);
            String str2 = (String) siteConfig.get(Property.RPC_SSL_TRUSTSTORE_PATH.getKey());
            Assert.assertNotNull("Truststore Path was null", str2);
            hashMap.put(Property.RPC_SSL_TRUSTSTORE_PATH.getKey(), str2);
            String str3 = (String) siteConfig.get(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey());
            if (null != str3) {
                hashMap.put(Property.RPC_SSL_KEYSTORE_PASSWORD.getKey(), str3);
            }
            String str4 = (String) siteConfig.get(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey());
            if (null != str4) {
                hashMap.put(Property.RPC_SSL_TRUSTSTORE_PASSWORD.getKey(), str4);
            }
            System.out.println("Setting site configuration for peer " + hashMap);
            miniAccumuloConfigImpl2.setSiteConfig(hashMap);
        }
        String str5 = (String) siteConfig.get(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey());
        if (null != str5) {
            Map siteConfig2 = miniAccumuloConfigImpl2.getSiteConfig();
            siteConfig2.put(Property.GENERAL_SECURITY_CREDENTIAL_PROVIDER_PATHS.getKey(), str5);
            miniAccumuloConfigImpl2.setSiteConfig(siteConfig2);
        }
    }

    @Test
    public void dataIsNotOverReplicated() throws Exception {
        MiniAccumuloConfigImpl miniAccumuloConfigImpl;
        MiniAccumuloClusterImpl miniAccumuloClusterImpl;
        MiniAccumuloClusterImpl miniAccumuloClusterImpl2;
        File createTestDir = createTestDir("master1");
        File createTestDir2 = createTestDir("master2");
        while (true) {
            miniAccumuloConfigImpl = new MiniAccumuloConfigImpl(createTestDir, "password");
            miniAccumuloConfigImpl.setNumTservers(1);
            miniAccumuloConfigImpl.setInstanceName("master1");
            ConfigurableMacBase.configureForEnvironment(miniAccumuloConfigImpl, getClass(), ConfigurableMacBase.getSslDir(createTestDir));
            miniAccumuloConfigImpl.setProperty(Property.REPLICATION_NAME, miniAccumuloConfigImpl.getInstanceName());
            miniAccumuloConfigImpl.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
            miniAccumuloConfigImpl.setProperty(Property.REPLICATION_THREADCHECK, "5m");
            miniAccumuloConfigImpl.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
            miniAccumuloConfigImpl.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
            miniAccumuloClusterImpl = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl);
            setCoreSite(miniAccumuloClusterImpl);
            try {
                miniAccumuloClusterImpl.start();
                break;
            } catch (ZooKeeperBindException e) {
                log.warn("Failed to start ZooKeeper on " + miniAccumuloConfigImpl.getZooKeeperPort() + ", will retry");
            }
        }
        while (true) {
            MiniAccumuloConfigImpl miniAccumuloConfigImpl2 = new MiniAccumuloConfigImpl(createTestDir2, "password");
            miniAccumuloConfigImpl2.setNumTservers(1);
            miniAccumuloConfigImpl2.setInstanceName("master2");
            updatePeerConfigFromPrimary(miniAccumuloConfigImpl, miniAccumuloConfigImpl2);
            miniAccumuloConfigImpl2.setProperty(Property.REPLICATION_NAME, miniAccumuloConfigImpl2.getInstanceName());
            miniAccumuloConfigImpl2.setProperty(Property.TSERV_WALOG_MAX_SIZE, "5M");
            miniAccumuloConfigImpl2.setProperty(Property.REPLICATION_THREADCHECK, "5m");
            miniAccumuloConfigImpl2.setProperty(Property.REPLICATION_WORK_ASSIGNMENT_SLEEP, "1s");
            miniAccumuloConfigImpl2.setProperty(Property.MASTER_REPLICATION_SCAN_INTERVAL, "1s");
            miniAccumuloClusterImpl2 = new MiniAccumuloClusterImpl(miniAccumuloConfigImpl2);
            setCoreSite(miniAccumuloClusterImpl2);
            try {
                miniAccumuloClusterImpl2.start();
                try {
                    break;
                } catch (Throwable th) {
                    miniAccumuloClusterImpl.stop();
                    miniAccumuloClusterImpl2.stop();
                    throw th;
                }
            } catch (ZooKeeperBindException e2) {
                log.warn("Failed to start ZooKeeper on " + miniAccumuloConfigImpl2.getZooKeeperPort() + ", will retry");
            }
        }
        Connector connector = miniAccumuloClusterImpl.getConnector("root", new PasswordToken("password"));
        Connector connector2 = miniAccumuloClusterImpl2.getConnector("root", new PasswordToken("password"));
        String instanceName = miniAccumuloClusterImpl.getInstanceName();
        String instanceName2 = miniAccumuloClusterImpl2.getInstanceName();
        connector.securityOperations().createLocalUser("master1", new PasswordToken("foo"));
        connector2.securityOperations().createLocalUser("master2", new PasswordToken("bar"));
        connector.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + miniAccumuloClusterImpl2.getInstanceName(), "master2");
        connector.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + miniAccumuloClusterImpl2.getInstanceName(), "bar");
        connector2.instanceOperations().setProperty(Property.REPLICATION_PEER_USER.getKey() + miniAccumuloClusterImpl.getInstanceName(), "master1");
        connector2.instanceOperations().setProperty(Property.REPLICATION_PEER_PASSWORD.getKey() + miniAccumuloClusterImpl.getInstanceName(), "foo");
        connector.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + miniAccumuloClusterImpl2.getInstanceName(), ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl2.getInstanceName(), miniAccumuloClusterImpl2.getZooKeepers())));
        connector2.instanceOperations().setProperty(Property.REPLICATION_PEERS.getKey() + miniAccumuloClusterImpl.getInstanceName(), ReplicaSystemFactory.getPeerConfigurationValue(AccumuloReplicaSystem.class, AccumuloReplicaSystem.buildConfiguration(miniAccumuloClusterImpl.getInstanceName(), miniAccumuloClusterImpl.getZooKeepers())));
        connector.tableOperations().create(instanceName, new NewTableConfiguration().withoutDefaultIterators());
        String str = (String) connector.tableOperations().tableIdMap().get(instanceName);
        Assert.assertNotNull(str);
        connector2.tableOperations().create(instanceName2, new NewTableConfiguration().withoutDefaultIterators());
        String str2 = (String) connector2.tableOperations().tableIdMap().get(instanceName2);
        Assert.assertNotNull(str2);
        connector.tableOperations().setProperty(instanceName, Property.TABLE_REPLICATION.getKey(), "true");
        connector.tableOperations().setProperty(instanceName, Property.TABLE_REPLICATION_TARGET.getKey() + miniAccumuloClusterImpl2.getInstanceName(), str2);
        connector2.tableOperations().setProperty(instanceName2, Property.TABLE_REPLICATION.getKey(), "true");
        connector2.tableOperations().setProperty(instanceName2, Property.TABLE_REPLICATION_TARGET.getKey() + miniAccumuloClusterImpl.getInstanceName(), str);
        connector.securityOperations().grantTablePermission("master1", instanceName, TablePermission.WRITE);
        connector2.securityOperations().grantTablePermission("master2", instanceName2, TablePermission.WRITE);
        IteratorSetting iteratorSetting = new IteratorSetting(50, SummingCombiner.class);
        SummingCombiner.setEncodingType(iteratorSetting, LongCombiner.Type.STRING);
        SummingCombiner.setCombineAllColumns(iteratorSetting, true);
        connector.tableOperations().attachIterator(instanceName, iteratorSetting);
        connector2.tableOperations().attachIterator(instanceName2, iteratorSetting);
        BatchWriter createBatchWriter = connector.createBatchWriter(instanceName, new BatchWriterConfig());
        Mutation mutation = new Mutation("row");
        mutation.put("count", "", "1");
        createBatchWriter.addMutation(mutation);
        createBatchWriter.close();
        Set referencedFiles = connector.replicationOperations().referencedFiles(instanceName);
        log.info("Found {} that need replication from master1", referencedFiles);
        Iterator it = ((Collection) miniAccumuloClusterImpl.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
        while (it.hasNext()) {
            miniAccumuloClusterImpl.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it.next());
        }
        miniAccumuloClusterImpl.exec(TabletServer.class, new String[0]);
        log.info("Restarted tserver on master1");
        Thread.sleep(1000L);
        Assert.assertEquals("1", ((Value) ((Map.Entry) Iterables.getOnlyElement(connector.createScanner(instanceName, Authorizations.EMPTY))).getValue()).toString());
        connector.replicationOperations().drain(instanceName, referencedFiles);
        Thread.sleep(5000L);
        Assert.assertEquals("1", ((Value) ((Map.Entry) Iterables.getOnlyElement(connector2.createScanner(instanceName2, Authorizations.EMPTY))).getValue()).toString());
        Set referencedFiles2 = connector2.replicationOperations().referencedFiles(instanceName2);
        Iterator it2 = ((Collection) miniAccumuloClusterImpl2.getProcesses().get(ServerType.TABLET_SERVER)).iterator();
        while (it2.hasNext()) {
            miniAccumuloClusterImpl2.killProcess(ServerType.TABLET_SERVER, (ProcessReference) it2.next());
        }
        miniAccumuloClusterImpl2.exec(TabletServer.class, new String[0]);
        Thread.sleep(1000L);
        Assert.assertEquals("1", ((Value) ((Map.Entry) Iterables.getOnlyElement(connector2.createScanner(instanceName2, Authorizations.EMPTY))).getValue()).toString());
        connector2.replicationOperations().drain(instanceName2, referencedFiles2);
        Thread.sleep(5000L);
        Assert.assertEquals("1", ((Value) ((Map.Entry) Iterables.getOnlyElement(connector.createScanner(instanceName, Authorizations.EMPTY))).getValue()).toString());
        miniAccumuloClusterImpl.stop();
        miniAccumuloClusterImpl2.stop();
    }
}
