package org.apache.nifi.toolkit.zkmigrator;

import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.google.gson.JsonParser;
import com.google.gson.stream.JsonReader;
import com.google.gson.stream.JsonWriter;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Spliterators;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator.class */
class ZooKeeperMigrator {
    private static final Logger LOGGER = LoggerFactory.getLogger(ZooKeeperMigrator.class);
    private static final String SCHEME_DIGEST = AuthMode.DIGEST.name().toLowerCase();
    private final ZooKeeperEndpointConfig zooKeeperEndpointConfig;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/nifi/toolkit/zkmigrator/ZooKeeperMigrator$AuthMode.class */
    public enum AuthMode {
        OPEN,
        DIGEST,
        SASL
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ZooKeeperMigrator(String str) {
        LOGGER.debug("ZooKeeper connect string parameter: {}", str);
        Preconditions.checkArgument(!Strings.isNullOrEmpty(str), "ZooKeeper connect string must not be null");
        this.zooKeeperEndpointConfig = new ZooKeeperEndpointConfig(str);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void readZooKeeper(OutputStream outputStream, AuthMode authMode, byte[] bArr) throws IOException, KeeperException, InterruptedException, ExecutionException {
        ZooKeeper zooKeeper = getZooKeeper(this.zooKeeperEndpointConfig.getConnectString(), authMode, bArr);
        JsonWriter jsonWriter = new JsonWriter(new BufferedWriter(new OutputStreamWriter(outputStream)));
        jsonWriter.setIndent("  ");
        JsonParser jsonParser = new JsonParser();
        Gson create = new GsonBuilder().create();
        jsonWriter.beginArray();
        create.toJson(jsonParser.parse(create.toJson(this.zooKeeperEndpointConfig)).getAsJsonObject(), jsonWriter);
        LOGGER.info("Retrieving data from source ZooKeeper: {}", this.zooKeeperEndpointConfig);
        List list = (List) ((Stream) streamPaths(getNode(zooKeeper, "/")).parallel()).map(str -> {
            return CompletableFuture.supplyAsync(() -> {
                DataStatAclNode retrieveNode = retrieveNode(zooKeeper, str);
                LOGGER.debug("retrieved node {} from {}", retrieveNode, this.zooKeeperEndpointConfig);
                return retrieveNode;
            }).thenAccept(dataStatAclNode -> {
                synchronized (jsonWriter) {
                    create.toJson(jsonParser.parse(create.toJson(dataStatAclNode)).getAsJsonObject(), jsonWriter);
                }
            });
        }).collect(Collectors.toList());
        List list2 = (List) CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()])).thenApply(r4 -> {
            return (List) list.stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList());
        }).get();
        jsonWriter.endArray();
        jsonWriter.close();
        if (LOGGER.isInfoEnabled()) {
            int size = list2.size();
            Logger logger = LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(size);
            objArr[1] = size == 1 ? "node" : "nodes";
            objArr[2] = this.zooKeeperEndpointConfig;
            logger.info("{} {} read from {}", objArr);
        }
        closeZooKeeper(zooKeeper);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void writeZooKeeper(InputStream inputStream, AuthMode authMode, byte[] bArr, boolean z, boolean z2) throws IOException, ExecutionException, InterruptedException {
        ZooKeeper zooKeeper = getZooKeeper(Joiner.on(',').join(this.zooKeeperEndpointConfig.getServers()), authMode, bArr);
        ensureNodeExists(zooKeeper, this.zooKeeperEndpointConfig.getPath(), CreateMode.PERSISTENT);
        closeZooKeeper(zooKeeper);
        ZooKeeper zooKeeper2 = getZooKeeper(this.zooKeeperEndpointConfig.getConnectString(), authMode, bArr);
        final JsonReader jsonReader = new JsonReader(new BufferedReader(new InputStreamReader(inputStream)));
        final Gson create = new GsonBuilder().create();
        jsonReader.beginArray();
        ZooKeeperEndpointConfig zooKeeperEndpointConfig = (ZooKeeperEndpointConfig) create.fromJson(jsonReader, ZooKeeperEndpointConfig.class);
        LOGGER.info("Source data was obtained from ZooKeeper: {}", zooKeeperEndpointConfig);
        Preconditions.checkArgument((Strings.isNullOrEmpty(zooKeeperEndpointConfig.getConnectString()) || Strings.isNullOrEmpty(zooKeeperEndpointConfig.getPath()) || zooKeeperEndpointConfig.getServers() == null || zooKeeperEndpointConfig.getServers().size() <= 0) ? false : true, "Source ZooKeeper %s from %s is invalid", zooKeeperEndpointConfig, inputStream);
        Preconditions.checkArgument(Collections.disjoint(this.zooKeeperEndpointConfig.getServers(), zooKeeperEndpointConfig.getServers()) || !this.zooKeeperEndpointConfig.getPath().equals(zooKeeperEndpointConfig.getPath()) || z, "Source ZooKeeper config %s for the data provided can not contain the same server and path as the configured destination ZooKeeper config %s", zooKeeperEndpointConfig, this.zooKeeperEndpointConfig);
        List list = (List) ((Stream) StreamSupport.stream(new Spliterators.AbstractSpliterator<DataStatAclNode>(0L, 0) { // from class: org.apache.nifi.toolkit.zkmigrator.ZooKeeperMigrator.1
            @Override // java.util.Spliterator
            public boolean tryAdvance(Consumer<? super DataStatAclNode> consumer) {
                try {
                    synchronized (jsonReader) {
                        if (!jsonReader.hasNext()) {
                            return false;
                        }
                        consumer.accept(create.fromJson(jsonReader, DataStatAclNode.class));
                        return true;
                    }
                } catch (IOException e) {
                    throw new RuntimeException("unable to read nodes from json", e);
                }
            }
        }, false).parallel()).map(dataStatAclNode -> {
            CompletableFuture supplyAsync = CompletableFuture.supplyAsync(() -> {
                return determineACLs(dataStatAclNode, authMode, Boolean.valueOf(z2));
            });
            Function function = list2 -> {
                return CompletableFuture.supplyAsync(() -> {
                    return transformNode(dataStatAclNode, list2);
                });
            };
            Function function2 = dataStatAclNode -> {
                return CompletableFuture.supplyAsync(() -> {
                    return ensureNodeExists(zooKeeper2, dataStatAclNode.getPath(), dataStatAclNode.getEphemeralOwner() == 0 ? CreateMode.PERSISTENT : CreateMode.EPHEMERAL);
                });
            };
            BiFunction biFunction = (str, dataStatAclNode2) -> {
                return dataStatAclNode2;
            };
            Function function3 = dataStatAclNode3 -> {
                return CompletableFuture.supplyAsync(() -> {
                    return transmitNode(zooKeeper2, dataStatAclNode3);
                });
            };
            CompletableFuture thenCompose = supplyAsync.thenCompose(function);
            return thenCompose.thenCompose(function2).thenCombine((CompletionStage) thenCompose, biFunction).thenCompose(function3);
        }).collect(Collectors.toList());
        List list2 = (List) CompletableFuture.allOf((CompletableFuture[]) list.toArray(new CompletableFuture[list.size()])).thenApply(r4 -> {
            return (List) list.stream().map((v0) -> {
                return v0.join();
            }).collect(Collectors.toList());
        }).get();
        if (LOGGER.isInfoEnabled()) {
            int size = list2.size();
            Logger logger = LOGGER;
            Object[] objArr = new Object[3];
            objArr[0] = Integer.valueOf(size);
            objArr[1] = size == 1 ? "node" : "nodes";
            objArr[2] = this.zooKeeperEndpointConfig;
            logger.info("{} {} transferred to {}", objArr);
        }
        jsonReader.close();
        closeZooKeeper(zooKeeper2);
    }

    private Stream<String> streamPaths(ZooKeeperNode zooKeeperNode) {
        return Stream.concat(Stream.of(zooKeeperNode.getPath()), zooKeeperNode.getChildren().stream().flatMap(this::streamPaths));
    }

    private ZooKeeperNode getNode(ZooKeeper zooKeeper, String str) throws KeeperException, InterruptedException {
        LOGGER.debug("retrieving node and children at {}", str);
        return new ZooKeeperNode(str, (List) zooKeeper.getChildren(str, false).stream().map(str2 -> {
            String join = Joiner.on('/').skipNulls().join(str.equals("/") ? "" : str, str2, new Object[0]);
            try {
                return getNode(zooKeeper, join);
            } catch (InterruptedException | KeeperException e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                throw new RuntimeException(String.format("unable to discover sub-tree from %s", join), e);
            }
        }).collect(Collectors.toList()));
    }

    private DataStatAclNode retrieveNode(ZooKeeper zooKeeper, String str) {
        Preconditions.checkNotNull(zooKeeper, "ZooKeeper client must not be null");
        Preconditions.checkNotNull(str, "path must not be null");
        Stat stat = new Stat();
        try {
            return new DataStatAclNode(str, zooKeeper.getData(str, false, stat), stat, zooKeeper.getACL(str, stat), stat.getEphemeralOwner());
        } catch (InterruptedException | KeeperException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(String.format("unable to get data, ACLs, and stats from %s for node at path %s", zooKeeper, str), e);
        }
    }

    private String ensureNodeExists(ZooKeeper zooKeeper, String str, CreateMode createMode) {
        try {
            LOGGER.debug("attempting to create node at {}", str);
            ArrayList arrayList = ZooDefs.Ids.OPEN_ACL_UNSAFE;
            String create = zooKeeper.create(str, new byte[0], arrayList, createMode);
            LOGGER.info("created node at {}, acls: {}, createMode: {}", new Object[]{create, arrayList, createMode});
            return create;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(String.format("unable to create node at path %s", str), e);
        } catch (KeeperException e2) {
            if (!KeeperException.Code.NONODE.equals(e2.code())) {
                if (KeeperException.Code.NODEEXISTS.equals(e2.code())) {
                    return str;
                }
                throw new RuntimeException(String.format("unable to create node at path %s, ZooKeeper returned %s", str, e2.code()), e2);
            }
            List splitToList = Splitter.on('/').omitEmptyStrings().trimResults().splitToList(str);
            String str2 = "/" + Joiner.on('/').skipNulls().join(splitToList.subList(0, splitToList.size() - 1));
            LOGGER.debug("node doesn't exist, recursively attempting to create node at {}", str2);
            ensureNodeExists(zooKeeper, str2, CreateMode.PERSISTENT);
            LOGGER.debug("recursively created node at {}", str2);
            LOGGER.debug("retrying attempt to create node at {}", str);
            return ensureNodeExists(zooKeeper, str, createMode);
        }
    }

    private List<ACL> determineACLs(DataStatAclNode dataStatAclNode, AuthMode authMode, Boolean bool) {
        return bool.booleanValue() ? dataStatAclNode.getAcls() : authMode.equals(AuthMode.OPEN) ? ZooDefs.Ids.OPEN_ACL_UNSAFE : ZooDefs.Ids.CREATOR_ALL_ACL;
    }

    private DataStatAclNode transformNode(DataStatAclNode dataStatAclNode, List<ACL> list) {
        DataStatAclNode dataStatAclNode2 = new DataStatAclNode(dataStatAclNode.getPath(), dataStatAclNode.getData(), dataStatAclNode.getStat(), list, dataStatAclNode.getEphemeralOwner());
        LOGGER.info("transformed original node {} to {}", dataStatAclNode, dataStatAclNode2);
        return dataStatAclNode2;
    }

    private Stat transmitNode(ZooKeeper zooKeeper, DataStatAclNode dataStatAclNode) {
        Preconditions.checkNotNull(zooKeeper, "zooKeeper must not be null");
        Preconditions.checkNotNull(dataStatAclNode, "node must not be null");
        try {
            LOGGER.debug("attempting to transfer node to {} with ACL {}: {}", new Object[]{this.zooKeeperEndpointConfig, dataStatAclNode.getAcls(), dataStatAclNode});
            zooKeeper.setData(dataStatAclNode.getPath(), dataStatAclNode.getData(), -1);
            zooKeeper.setACL(dataStatAclNode.getPath(), dataStatAclNode.getAcls(), -1);
            LOGGER.info("transferred node {} in {}", dataStatAclNode, this.zooKeeperEndpointConfig);
            return dataStatAclNode.getStat();
        } catch (InterruptedException | KeeperException e) {
            if (e instanceof InterruptedException) {
                Thread.currentThread().interrupt();
            }
            throw new RuntimeException(String.format("unable to transmit data to %s for path %s", zooKeeper, dataStatAclNode.getPath()), e);
        }
    }

    private ZooKeeper getZooKeeper(String str, AuthMode authMode, byte[] bArr) throws IOException {
        CountDownLatch countDownLatch = new CountDownLatch(1);
        ZooKeeper zooKeeper = new ZooKeeper(str, 3000, watchedEvent -> {
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("ZooKeeper server state changed to {} in {}", watchedEvent.getState(), str);
            }
            if (watchedEvent.getType().equals(Watcher.Event.EventType.None) && watchedEvent.getState().equals(Watcher.Event.KeeperState.SyncConnected)) {
                countDownLatch.countDown();
            }
        });
        try {
            if (!countDownLatch.await(5L, TimeUnit.SECONDS)) {
                closeZooKeeper(zooKeeper);
                throw new IOException(String.format("unable to connect to %s", str));
            }
            if (authMode.equals(AuthMode.DIGEST)) {
                zooKeeper.addAuthInfo(SCHEME_DIGEST, bArr);
            }
            return zooKeeper;
        } catch (InterruptedException e) {
            closeZooKeeper(zooKeeper);
            Thread.currentThread().interrupt();
            throw new IOException(String.format("interrupted while waiting for ZooKeeper connection to %s", str), e);
        }
    }

    private void closeZooKeeper(ZooKeeper zooKeeper) {
        try {
            zooKeeper.close();
        } catch (InterruptedException e) {
            LOGGER.warn("could not close ZooKeeper client due to interrupt", e);
            Thread.currentThread().interrupt();
        }
    }

    ZooKeeperEndpointConfig getZooKeeperEndpointConfig() {
        return this.zooKeeperEndpointConfig;
    }
}
