/*
 * Decompiled with CFR 0.152.
 */
package org.jgroups.protocols.relay;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.JChannel;
import org.jgroups.Message;
import org.jgroups.ReceiverAdapter;
import org.jgroups.View;
import org.jgroups.logging.Log;
import org.jgroups.protocols.relay.RELAY2;
import org.jgroups.protocols.relay.SiteMaster;
import org.jgroups.protocols.relay.SiteUUID;
import org.jgroups.protocols.relay.config.RelayConfig;
import org.jgroups.stack.AddressGenerator;
import org.jgroups.util.UUID;
import org.jgroups.util.Util;

public class Relayer {
    protected Route[] routes;
    protected final Queue<Bridge> bridges = new ConcurrentLinkedQueue<Bridge>();
    protected final Log log;
    protected final RELAY2 relay;
    protected volatile boolean done;
    protected boolean stats;
    protected final ConcurrentMap<Short, BlockingQueue<Message>> fwd_queue = new ConcurrentHashMap<Short, BlockingQueue<Message>>();
    protected final ConcurrentMap<Short, Future<?>> down_tasks = new ConcurrentHashMap();

    public Relayer(RELAY2 relay, Log log, int num_routes) {
        this.relay = relay;
        this.stats = relay.statsEnabled();
        this.log = log;
        this.init(num_routes);
    }

    public boolean done() {
        return this.done;
    }

    public void start(List<RelayConfig.BridgeConfig> bridge_configs, String bridge_name, final short my_site_id) throws Throwable {
        if (this.done) {
            if (this.log.isTraceEnabled()) {
                this.log.trace(this.relay.getLocalAddress() + ": will not start the Relayer as stop() has been called");
            }
            return;
        }
        try {
            for (RelayConfig.BridgeConfig bridge_config : bridge_configs) {
                Bridge bridge = new Bridge(bridge_config.createChannel(), bridge_config.getClusterName(), bridge_name, new AddressGenerator(){

                    @Override
                    public Address generateAddress() {
                        UUID uuid = UUID.randomUUID();
                        return new SiteUUID(uuid, null, my_site_id);
                    }
                });
                this.bridges.add(bridge);
            }
            for (Bridge bridge : this.bridges) {
                bridge.start();
            }
        }
        catch (Throwable t) {
            this.stop();
            throw t;
        }
        finally {
            if (this.done) {
                if (this.log.isTraceEnabled()) {
                    this.log.trace(this.relay.getLocalAddress() + ": stop() was called while starting the relayer; stopping the relayer now");
                }
                this.stop();
            }
        }
    }

    protected void init(int num_routes) {
        if (this.routes == null) {
            this.routes = new Route[num_routes];
        }
        for (int i = 0; i < num_routes; i = (int)((short)(i + 1))) {
            if (this.routes[i] != null) continue;
            this.routes[i] = new Route(null, null, RELAY2.RouteStatus.DOWN);
        }
    }

    public void stop() {
        this.done = true;
        ArrayList tasks = new ArrayList(this.down_tasks.values());
        this.down_tasks.clear();
        for (Future task : tasks) {
            task.cancel(true);
        }
        for (Bridge bridge : this.bridges) {
            bridge.stop();
        }
        this.bridges.clear();
        this.fwd_queue.clear();
        for (Route route : this.routes) {
            route.reset();
        }
    }

    public synchronized String printRoutes() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < this.routes.length; ++i) {
            Route route = this.routes[i];
            if (route == null) continue;
            String name = SiteUUID.getSiteName((short)i);
            if (name == null) {
                name = String.valueOf(i);
            }
            sb.append(name + " --> " + route + "\n");
        }
        return sb.toString();
    }

    protected synchronized void setRoute(short site, JChannel bridge, SiteMaster site_master, RELAY2.RouteStatus status) {
        Route existing_route = this.routes[site];
        existing_route.bridge(bridge);
        existing_route.siteMaster(site_master);
        existing_route.status(status);
    }

    protected synchronized Route getRoute(short site) {
        if (site <= this.routes.length - 1) {
            return this.routes[site];
        }
        return null;
    }

    protected synchronized List<Route> getRoutes(short ... excluded_sites) {
        ArrayList<Route> retval = new ArrayList<Route>(this.routes.length);
        for (int i = 0; i < this.routes.length; i = (int)((short)(i + 1))) {
            Route tmp = this.routes[i];
            if (tmp == null || tmp.status() == RELAY2.RouteStatus.DOWN || Relayer.isExcluded(tmp, excluded_sites)) continue;
            retval.add(tmp);
        }
        return retval;
    }

    protected View getBridgeView(String cluster_name) {
        if (cluster_name == null || this.bridges == null) {
            return null;
        }
        for (Bridge bridge : this.bridges) {
            if (bridge.cluster_name == null || !bridge.cluster_name.equals(cluster_name)) continue;
            return bridge.view;
        }
        return null;
    }

    protected static boolean isExcluded(Route route, short ... excluded_sites) {
        if (excluded_sites == null) {
            return false;
        }
        short site = ((SiteUUID)route.site_master).getSite();
        for (short excluded_site : excluded_sites) {
            if (site != excluded_site) continue;
            return true;
        }
        return false;
    }

    protected class Bridge
    extends ReceiverAdapter {
        protected JChannel channel;
        protected final String cluster_name;
        protected View view;

        protected Bridge(JChannel ch, String cluster_name, String channel_name, AddressGenerator addr_generator) throws Exception {
            this.channel = ch;
            this.channel.setName(channel_name);
            this.channel.setReceiver(this);
            this.channel.setAddressGenerator(addr_generator);
            this.cluster_name = cluster_name;
        }

        protected void start() throws Exception {
            this.channel.connect(this.cluster_name);
        }

        protected void stop() {
            Util.close((Channel)this.channel);
        }

        @Override
        public void receive(Message msg) {
            RELAY2.Relay2Header hdr = (RELAY2.Relay2Header)msg.getHeader(Relayer.this.relay.getId());
            if (hdr == null) {
                Relayer.this.log.warn("received a message without a relay header; discarding it");
                return;
            }
            Relayer.this.relay.handleRelayMessage(hdr, msg);
        }

        @Override
        public void viewAccepted(View new_view) {
            Cloneable sites;
            List<Address> left_mbrs = this.view != null ? Util.determineLeftMembers(this.view.getMembers(), new_view.getMembers()) : null;
            this.view = new_view;
            if (Relayer.this.log.isTraceEnabled()) {
                Relayer.this.log.trace("[Relayer " + this.channel.getAddress() + "] view: " + new_view);
            }
            if (left_mbrs != null) {
                sites = new HashSet();
                for (Address address : left_mbrs) {
                    if (!(address instanceof SiteUUID)) continue;
                    sites.add(((SiteUUID)address).getSite());
                }
                Iterator<Object> i$ = sites.iterator();
                while (i$.hasNext()) {
                    short s = (Short)i$.next();
                    this.changeStatusToUnknown(s);
                }
            }
            sites = new HashMap();
            for (Address address : new_view.getMembers()) {
                if (!(address instanceof SiteUUID)) continue;
                sites.put(((SiteUUID)address).getSite(), address);
            }
            for (Map.Entry entry : sites.entrySet()) {
                this.changeStatusToUp((Short)entry.getKey(), this.channel, (Address)entry.getValue());
            }
        }

        protected void changeStatusToUnknown(final short site) {
            Route route = Relayer.this.routes[site];
            route.status(RELAY2.RouteStatus.UNKNOWN);
            Future<?> task = Relayer.this.relay.getTimer().schedule(new Runnable(){

                @Override
                public void run() {
                    Route route = Relayer.this.routes[site];
                    if (route.status() == RELAY2.RouteStatus.UNKNOWN) {
                        Bridge.this.changeStatusToDown(site);
                    }
                }
            }, Relayer.this.relay.siteDownTimeout(), TimeUnit.MILLISECONDS);
            if (task == null) {
                return;
            }
            Future<?> existing_task = Relayer.this.down_tasks.put(site, task);
            if (existing_task != null) {
                existing_task.cancel(true);
            }
        }

        protected void changeStatusToDown(short id) {
            Route route = Relayer.this.routes[id];
            if (route.status() != RELAY2.RouteStatus.UNKNOWN) {
                Relayer.this.log.warn(Relayer.this.relay.getLocalAddress() + ": didn't change status of " + SiteUUID.getSiteName(id) + " to DOWN as it is UP");
                return;
            }
            route.status(RELAY2.RouteStatus.DOWN);
            BlockingQueue msgs = (BlockingQueue)Relayer.this.fwd_queue.remove(id);
            if (msgs != null && !msgs.isEmpty()) {
                HashSet<Address> targets = new HashSet<Address>();
                for (Message msg : msgs) {
                    RELAY2.Relay2Header hdr = (RELAY2.Relay2Header)msg.getHeader(Relayer.this.relay.getId());
                    targets.add(hdr.original_sender);
                }
                for (Address target : targets) {
                    if (route.status() == RELAY2.RouteStatus.UP) continue;
                    Relayer.this.relay.sendSiteUnreachableTo(target, id);
                }
            }
        }

        protected void changeStatusToUp(final short id, JChannel bridge, Address site_master) {
            final Route route = Relayer.this.routes[id];
            if (route.bridge() == null || !route.bridge().equals(bridge)) {
                route.bridge(bridge);
            }
            if (route.siteMaster() == null || !route.siteMaster().equals(site_master)) {
                route.siteMaster(site_master);
            }
            RELAY2.RouteStatus old_status = route.status();
            route.status(RELAY2.RouteStatus.UP);
            switch (old_status) {
                case DOWN: 
                case UNKNOWN: {
                    this.cancelTask(id);
                    if (old_status != RELAY2.RouteStatus.UNKNOWN) break;
                    Relayer.this.relay.getTimer().execute(new Runnable(){

                        @Override
                        public void run() {
                            Bridge.this.flushQueue(id, route);
                        }
                    });
                }
            }
        }

        protected void cancelTask(short id) {
            Future task = (Future)Relayer.this.down_tasks.remove(id);
            if (task != null) {
                task.cancel(true);
            }
        }

        protected void flushQueue(short id, Route route) {
            Message msg;
            BlockingQueue msgs = (BlockingQueue)Relayer.this.fwd_queue.get(id);
            if (msgs == null || msgs.isEmpty()) {
                return;
            }
            JChannel bridge = route.bridge();
            if (Relayer.this.log.isTraceEnabled()) {
                Relayer.this.log.trace(Relayer.this.relay.getLocalAddress() + ": forwarding " + msgs.size() + " queued messages");
            }
            while ((msg = (Message)msgs.poll()) != null && route.status() == RELAY2.RouteStatus.UP) {
                try {
                    msg.setDest(route.siteMaster());
                    bridge.send(msg);
                }
                catch (Throwable ex) {
                    Relayer.this.log.error("failed forwarding queued message to " + SiteUUID.getSiteName(id), ex);
                }
            }
            Relayer.this.fwd_queue.remove(id);
        }
    }

    public class Route {
        private volatile Address site_master;
        private volatile JChannel bridge;
        private volatile RELAY2.RouteStatus status;
        private volatile long lastStatusChangeTime;

        public Route(Address site_master, JChannel bridge) {
            this(site_master, bridge, RELAY2.RouteStatus.UP);
        }

        public Route(Address site_master, JChannel bridge, RELAY2.RouteStatus status) {
            this.site_master = site_master;
            this.bridge = bridge;
            this.status = status;
            this.lastStatusChangeTime = System.currentTimeMillis();
        }

        public JChannel bridge() {
            return this.bridge;
        }

        public Route bridge(JChannel new_bridge) {
            this.bridge = new_bridge;
            return this;
        }

        public Address siteMaster() {
            return this.site_master;
        }

        public Route siteMaster(Address new_site_master) {
            this.site_master = new_site_master;
            return this;
        }

        public RELAY2.RouteStatus status() {
            return this.status;
        }

        public Route status(RELAY2.RouteStatus new_status) {
            this.status = new_status;
            this.lastStatusChangeTime = System.currentTimeMillis();
            return this;
        }

        public Route reset() {
            return this.bridge(null).siteMaster(null).status(RELAY2.RouteStatus.DOWN);
        }

        public void send(short target_site, Address final_destination, Address original_sender, Message msg) {
            switch (this.status) {
                case DOWN: {
                    Relayer.this.relay.sendSiteUnreachableTo(original_sender, target_site);
                    return;
                }
                case UNKNOWN: {
                    BlockingQueue<Message> queue = (LinkedBlockingQueue<Message>)Relayer.this.fwd_queue.get(target_site);
                    if (queue == null) {
                        queue = new LinkedBlockingQueue<Message>(Relayer.this.relay.forwardQueueMaxSize());
                        BlockingQueue existing = Relayer.this.fwd_queue.putIfAbsent(target_site, queue);
                        if (existing != null) {
                            queue = existing;
                        }
                    }
                    try {
                        long wait = Math.max(System.currentTimeMillis() - this.lastStatusChangeTime, 0L);
                        if (!queue.offer(this.createMessage(new SiteMaster(target_site), final_destination, original_sender, msg), wait, TimeUnit.MILLISECONDS)) {
                            Relayer.this.relay.sendSiteUnreachableTo(original_sender, target_site);
                        }
                    }
                    catch (InterruptedException e) {
                        // empty catch block
                    }
                    return;
                }
            }
            if (Relayer.this.log.isTraceEnabled()) {
                Relayer.this.log.trace("routing message to " + final_destination + " via " + this.site_master);
            }
            long start = Relayer.this.stats ? System.nanoTime() : 0L;
            try {
                Message copy = this.createMessage(this.site_master, final_destination, original_sender, msg);
                this.bridge.send(copy);
                if (Relayer.this.stats) {
                    Relayer.this.relay.addToRelayedTime(System.nanoTime() - start);
                    Relayer.this.relay.incrementRelayed();
                }
            }
            catch (Exception e) {
                Relayer.this.log.error("failure relaying message", e);
            }
        }

        public String toString() {
            return (this.site_master != null ? this.site_master + " " : "") + "[" + (Object)((Object)this.status) + "]";
        }

        protected Message createMessage(Address target, Address final_destination, Address original_sender, Message msg) {
            Message copy = Relayer.this.relay.copy(msg).dest(target).src(null);
            RELAY2.Relay2Header hdr = new RELAY2.Relay2Header(1, final_destination, original_sender);
            copy.putHeader(Relayer.this.relay.getId(), hdr);
            return copy;
        }
    }
}

