package org.adamalang.common.gossip;

import io.netty.buffer.ByteBuf;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.adamalang.common.NamedRunnable;
import org.adamalang.common.ProtectedUUID;
import org.adamalang.common.SimpleExecutor;
import org.adamalang.common.TimeSource;
import org.adamalang.common.gossip.codec.GossipProtocol;
import org.adamalang.common.gossip.codec.GossipProtocolCodec;
import org.adamalang.common.net.ByteStream;
import org.adamalang.common.net.ChannelClient;

/* loaded from: input_file:org/adamalang/common/gossip/Engine.class */
public class Engine {
    private final String ip;
    private final GossipMetrics metrics;
    private final InstanceSetChain chain;
    private final SimpleExecutor executor = SimpleExecutor.create("gossip");
    private final HashMap<String, ArrayList<Consumer<Collection<String>>>> subscribersByApp = new HashMap<>();
    private final HashMap<Integer, ChannelClient> clients = new HashMap<>();
    private int clientId = 1;
    private ChannelClient[] flatClients = new ChannelClient[0];

    /* renamed from: org.adamalang.common.gossip.Engine$7, reason: invalid class name */
    /* loaded from: input_file:org/adamalang/common/gossip/Engine$7.class */
    class AnonymousClass7 extends GossipProtocolCodec.StreamChatterFromClient {
        InstanceSet set;
        final /* synthetic */ ByteStream val$upstream;

        AnonymousClass7(ByteStream byteStream) {
            this.val$upstream = byteStream;
        }

        @Override // org.adamalang.common.net.ByteStream
        public void completed() {
            Engine.this.metrics.gossip_inflight.down();
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromClient
        public void handle(final GossipProtocol.ForwardSlowGossip forwardSlowGossip) {
            Engine.this.executor.execute(new NamedRunnable("gossip-forward-slow", new String[0]) { // from class: org.adamalang.common.gossip.Engine.7.1
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_forward_slow_gossip.run();
                    boolean ingest = Engine.this.chain.ingest(forwardSlowGossip.all_endpoints, forwardSlowGossip.recent_deletes, false);
                    GossipProtocol.ReverseSlowGossip reverseSlowGossip = new GossipProtocol.ReverseSlowGossip();
                    reverseSlowGossip.all_endpoints = Engine.this.chain.all();
                    reverseSlowGossip.recent_deletes = Engine.this.chain.deletes();
                    ByteBuf create = AnonymousClass7.this.val$upstream.create(1024);
                    GossipProtocolCodec.write(create, reverseSlowGossip);
                    AnonymousClass7.this.val$upstream.next(create);
                    AnonymousClass7.this.val$upstream.completed();
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.net.ByteStream
        public void error(int i) {
            completed();
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromClient
        public void handle(final GossipProtocol.ReverseHashFound reverseHashFound) {
            Engine.this.executor.execute(new NamedRunnable("gossip-reverse-hash-found", new String[0]) { // from class: org.adamalang.common.gossip.Engine.7.2
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_reverse_hash_found.run();
                    AnonymousClass7.this.set.ingest(reverseHashFound.counters, Engine.this.chain.now());
                    boolean ingest = Engine.this.chain.ingest(reverseHashFound.missing_endpoints, reverseHashFound.recent_deletes, false);
                    GossipProtocol.ReverseQuickGossip reverseQuickGossip = new GossipProtocol.ReverseQuickGossip();
                    reverseQuickGossip.counters = AnonymousClass7.this.set.counters();
                    reverseQuickGossip.missing_endpoints = Engine.this.chain.missing(AnonymousClass7.this.set);
                    reverseQuickGossip.recent_deletes = Engine.this.chain.deletes();
                    ByteBuf create = AnonymousClass7.this.val$upstream.create(1024);
                    GossipProtocolCodec.write(create, reverseQuickGossip);
                    AnonymousClass7.this.val$upstream.next(create);
                    AnonymousClass7.this.val$upstream.completed();
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromClient
        public void handle(final GossipProtocol.ForwardQuickGossip forwardQuickGossip) {
            Engine.this.executor.execute(new NamedRunnable("gossip-forward-quick", new String[0]) { // from class: org.adamalang.common.gossip.Engine.7.3
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_forward_quick_gossip.run();
                    AnonymousClass7.this.set.ingest(forwardQuickGossip.counters, Engine.this.chain.now());
                    boolean ingest = Engine.this.chain.ingest(forwardQuickGossip.recent_endpoints, forwardQuickGossip.recent_deletes, false);
                    AnonymousClass7.this.val$upstream.completed();
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromClient
        public void handle(final GossipProtocol.BeginGossip beginGossip) {
            Engine.this.executor.execute(new NamedRunnable("gossip-begin", new String[0]) { // from class: org.adamalang.common.gossip.Engine.7.4
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_begin_gossip.run();
                    boolean ingest = Engine.this.chain.ingest(beginGossip.recent_endpoints, beginGossip.recent_deletes, false);
                    AnonymousClass7.this.set = Engine.this.chain.find(beginGossip.hash);
                    ByteBuf create = AnonymousClass7.this.val$upstream.create(1024);
                    if (AnonymousClass7.this.set != null) {
                        Engine.this.metrics.gossip_send_hash_found.run();
                        GossipProtocol.HashFoundRequestForwardQuickGossip hashFoundRequestForwardQuickGossip = new GossipProtocol.HashFoundRequestForwardQuickGossip();
                        hashFoundRequestForwardQuickGossip.counters = AnonymousClass7.this.set.counters();
                        hashFoundRequestForwardQuickGossip.recent_deletes = Engine.this.chain.deletes();
                        hashFoundRequestForwardQuickGossip.recent_endpoints = Engine.this.chain.missing(AnonymousClass7.this.set);
                        GossipProtocolCodec.write(create, hashFoundRequestForwardQuickGossip);
                    } else {
                        Engine.this.metrics.gossip_send_hash_not_found.run();
                        AnonymousClass7.this.set = Engine.this.chain.current();
                        GossipProtocol.HashNotFoundReverseConversation hashNotFoundReverseConversation = new GossipProtocol.HashNotFoundReverseConversation();
                        hashNotFoundReverseConversation.hash = AnonymousClass7.this.set.hash;
                        hashNotFoundReverseConversation.recent_endpoints = Engine.this.chain.recent();
                        hashNotFoundReverseConversation.recent_deletes = Engine.this.chain.deletes();
                        GossipProtocolCodec.write(create, hashNotFoundReverseConversation);
                    }
                    AnonymousClass7.this.val$upstream.next(create);
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }
    }

    /* loaded from: input_file:org/adamalang/common/gossip/Engine$Exchange.class */
    public class Exchange extends GossipProtocolCodec.StreamChatterFromServer {
        private InstanceSet current;
        private ByteStream remote;

        public Exchange() {
        }

        public void start(final ByteStream byteStream) {
            this.remote = byteStream;
            Engine.this.executor.execute(new NamedRunnable("gossip-start", new String[0]) { // from class: org.adamalang.common.gossip.Engine.Exchange.1
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_send_begin.run();
                    Exchange.this.current = Engine.this.chain.current();
                    ByteBuf create = byteStream.create(1024);
                    GossipProtocol.BeginGossip beginGossip = new GossipProtocol.BeginGossip();
                    beginGossip.hash = Exchange.this.current.hash();
                    beginGossip.recent_deletes = Engine.this.chain.deletes();
                    beginGossip.recent_endpoints = Engine.this.chain.recent();
                    GossipProtocolCodec.write(create, beginGossip);
                    byteStream.next(create);
                }
            });
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromServer
        public void handle(final GossipProtocol.ReverseSlowGossip reverseSlowGossip) {
            Engine.this.executor.execute(new NamedRunnable("gossip-reverse-slow", new String[0]) { // from class: org.adamalang.common.gossip.Engine.Exchange.2
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_reverse_slow_gossip.run();
                    boolean ingest = Engine.this.chain.ingest(reverseSlowGossip.all_endpoints, reverseSlowGossip.recent_deletes, false);
                    Exchange.this.remote.completed();
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromServer
        public void handle(final GossipProtocol.ReverseQuickGossip reverseQuickGossip) {
            Engine.this.executor.execute(new NamedRunnable("gossip-reverse-quick", new String[0]) { // from class: org.adamalang.common.gossip.Engine.Exchange.3
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_reverse_quick_gossip.run();
                    Exchange.this.current.ingest(reverseQuickGossip.counters, Engine.this.chain.now());
                    Engine.this.chain.ingest(reverseQuickGossip.missing_endpoints, reverseQuickGossip.recent_deletes, false);
                    Exchange.this.remote.completed();
                    if (reverseQuickGossip.missing_endpoints.length > 0 || reverseQuickGossip.recent_deletes.length > 0) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromServer
        public void handle(final GossipProtocol.HashNotFoundReverseConversation hashNotFoundReverseConversation) {
            Engine.this.executor.execute(new NamedRunnable("gossip-hash-not-found-do-reverse", new String[0]) { // from class: org.adamalang.common.gossip.Engine.Exchange.4
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_hash_not_found.run();
                    boolean ingest = Engine.this.chain.ingest(hashNotFoundReverseConversation.recent_endpoints, hashNotFoundReverseConversation.recent_deletes, false);
                    Exchange.this.current = Engine.this.chain.find(hashNotFoundReverseConversation.hash);
                    ByteBuf create = Exchange.this.remote.create(1024);
                    if (Exchange.this.current != null) {
                        Engine.this.metrics.gossip_send_reverse_hash_found.run();
                        GossipProtocol.ReverseHashFound reverseHashFound = new GossipProtocol.ReverseHashFound();
                        reverseHashFound.missing_endpoints = Engine.this.chain.missing(Exchange.this.current);
                        reverseHashFound.recent_deletes = Engine.this.chain.deletes();
                        reverseHashFound.counters = Exchange.this.current.counters();
                        GossipProtocolCodec.write(create, reverseHashFound);
                    } else {
                        Engine.this.metrics.gossip_send_forward_slow_gossip.run();
                        GossipProtocol.ForwardSlowGossip forwardSlowGossip = new GossipProtocol.ForwardSlowGossip();
                        forwardSlowGossip.all_endpoints = Engine.this.chain.all();
                        forwardSlowGossip.recent_deletes = Engine.this.chain.deletes();
                        GossipProtocolCodec.write(create, forwardSlowGossip);
                    }
                    Exchange.this.remote.next(create);
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.gossip.codec.GossipProtocolCodec.StreamChatterFromServer
        public void handle(final GossipProtocol.HashFoundRequestForwardQuickGossip hashFoundRequestForwardQuickGossip) {
            Engine.this.executor.execute(new NamedRunnable("gossip-hash-found-forward", new String[0]) { // from class: org.adamalang.common.gossip.Engine.Exchange.5
                @Override // org.adamalang.common.NamedRunnable
                public void execute() throws Exception {
                    Engine.this.metrics.gossip_read_hash_found_forward_quick_gossip.run();
                    Exchange.this.current.ingest(hashFoundRequestForwardQuickGossip.counters, Engine.this.chain.now());
                    boolean ingest = Engine.this.chain.ingest(hashFoundRequestForwardQuickGossip.recent_endpoints, hashFoundRequestForwardQuickGossip.recent_deletes, false);
                    GossipProtocol.ForwardQuickGossip forwardQuickGossip = new GossipProtocol.ForwardQuickGossip();
                    forwardQuickGossip.counters = Exchange.this.current.counters();
                    forwardQuickGossip.recent_endpoints = Engine.this.chain.recent();
                    forwardQuickGossip.recent_deletes = Engine.this.chain.deletes();
                    ByteBuf create = Exchange.this.remote.create(1024);
                    GossipProtocolCodec.write(create, forwardQuickGossip);
                    Exchange.this.remote.next(create);
                    Exchange.this.remote.completed();
                    if (ingest) {
                        Engine.this.broadcastChangesWhileInExecutor();
                    }
                }
            });
        }

        @Override // org.adamalang.common.net.ByteStream
        public void completed() {
        }

        @Override // org.adamalang.common.net.ByteStream
        public void error(int i) {
        }
    }

    public Engine(String str, GossipMetrics gossipMetrics, TimeSource timeSource) {
        this.ip = str;
        this.metrics = gossipMetrics;
        this.chain = new InstanceSetChain(timeSource);
    }

    public void kickoff(final AtomicBoolean atomicBoolean) {
        final Random random = new Random();
        this.executor.schedule(new NamedRunnable("gossip", new String[0]) { // from class: org.adamalang.common.gossip.Engine.1
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                Engine.this.chain.scan();
                Engine.this.chain.gc();
                Engine.this.metrics.gossip_wake.run();
                Engine.this.metrics.gossip_active_clients.set(Engine.this.flatClients.length);
                if (Engine.this.flatClients.length > 0) {
                    Engine.this.flatClients[random.nextInt(Engine.this.flatClients.length)].gossip();
                }
                if (atomicBoolean.get()) {
                    Engine.this.executor.schedule(this, (int) (250.0d + (250.0d * Math.random()) + (500.0d * Math.random())));
                }
            }
        }, 500L);
    }

    public void setWatcher(final Consumer<GossipProtocol.Endpoint[]> consumer) {
        this.executor.execute(new NamedRunnable("set-watcher", new String[0]) { // from class: org.adamalang.common.gossip.Engine.2
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                Engine.this.chain.setWatcher(consumer);
            }
        });
    }

    public void shutdown() {
        this.executor.shutdown();
    }

    public void createLocalApplicationHeartbeat(final String str, final int i, final int i2, final Consumer<Runnable> consumer) {
        this.executor.execute(new NamedRunnable("heartbeat-local-app", new String[0]) { // from class: org.adamalang.common.gossip.Engine.3
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                String generate = ProtectedUUID.generate();
                GossipProtocol.Endpoint endpoint = new GossipProtocol.Endpoint();
                endpoint.id = generate;
                endpoint.ip = Engine.this.ip;
                endpoint.role = str;
                endpoint.port = i;
                endpoint.monitoringPort = i2;
                endpoint.counter = 0;
                endpoint.created = System.currentTimeMillis();
                Engine.this.chain.ingest(new GossipProtocol.Endpoint[]{endpoint}, new String[0], true);
                consumer.accept(Engine.this.chain.pick(generate));
            }
        });
    }

    public void subscribe(final String str, final Consumer<Collection<String>> consumer) {
        this.executor.execute(new NamedRunnable("subscribe-app", new String[]{str}) { // from class: org.adamalang.common.gossip.Engine.4
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                ArrayList<Consumer<Collection<String>>> arrayList = Engine.this.subscribersByApp.get(str);
                if (arrayList == null) {
                    arrayList = new ArrayList<>();
                    Engine.this.subscribersByApp.put(str, arrayList);
                }
                arrayList.add(consumer);
                consumer.accept(Engine.this.chain.current().targetsFor(str));
            }
        });
    }

    public void summarizeHtml(final Consumer<String> consumer) {
        this.executor.execute(new NamedRunnable("summarizing-html", new String[0]) { // from class: org.adamalang.common.gossip.Engine.5
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                StringBuilder sb = new StringBuilder();
                sb.append("<html><head><title>Gossip Summary</title></head><body><table>");
                sb.append("<tr><th>ID</th><th>Witness (ms ago)</th><th>IP</th><th>Port</th><th>Role</th><th>Counter</th><th>Age (ms)</th></tr>");
                ArrayList arrayList = new ArrayList(Engine.this.chain.current().instances);
                arrayList.sort(Instance::humanizeCompare);
                Iterator it = arrayList.iterator();
                while (it.hasNext()) {
                    Instance instance = (Instance) it.next();
                    sb.append("<tr>");
                    sb.append("<td>").append(instance.id).append("</td>");
                    sb.append("<td>").append(System.currentTimeMillis() - instance.witnessed()).append(" ms</td>");
                    sb.append("<td>").append(instance.ip).append("</td>");
                    sb.append("<td>").append(instance.port).append("</td>");
                    sb.append("<td>").append(instance.role).append("</td>");
                    sb.append("<td>").append(instance.counter()).append("</td>");
                    sb.append("<td>").append(System.currentTimeMillis() - instance.created).append("</td>");
                    sb.append("</tr>");
                }
                sb.append("</table></body></html>");
                consumer.accept(sb.toString());
            }
        });
    }

    public Runnable registerClient(ChannelClient channelClient) {
        int i;
        synchronized (this.clients) {
            i = this.clientId;
            this.clientId = i + 1;
            this.clients.put(Integer.valueOf(i), channelClient);
            flatten();
        }
        return () -> {
            synchronized (this.clients) {
                this.clients.remove(Integer.valueOf(i));
                flatten();
            }
        };
    }

    private void flatten() {
        this.flatClients = new ChannelClient[this.clients.size()];
        int i = 0;
        Iterator<ChannelClient> it = this.clients.values().iterator();
        while (it.hasNext()) {
            this.flatClients[i] = it.next();
            i++;
        }
    }

    public void ready(final Runnable runnable) {
        this.executor.execute(new NamedRunnable("ready-check", new String[0]) { // from class: org.adamalang.common.gossip.Engine.6
            @Override // org.adamalang.common.NamedRunnable
            public void execute() throws Exception {
                runnable.run();
            }
        });
    }

    public Exchange client() {
        return new Exchange();
    }

    public ByteStream server(ByteStream byteStream) {
        this.metrics.gossip_inflight.up();
        return new AnonymousClass7(byteStream);
    }

    public void broadcastChangesWhileInExecutor() {
        for (Map.Entry<String, ArrayList<Consumer<Collection<String>>>> entry : this.subscribersByApp.entrySet()) {
            ArrayList<String> targetsFor = this.chain.current().targetsFor(entry.getKey());
            Iterator<Consumer<Collection<String>>> it = entry.getValue().iterator();
            while (it.hasNext()) {
                it.next().accept(targetsFor);
            }
        }
    }
}
