This post is over 6 months old. Some details, especially technical, may have changed.

Building a scalable and highly available reactive applications with Akka! Load balancing revisited.

In past years main reason for the use of the load balancer was growing interest in the web sites. This caused huge traffic spikes which couldn't be handleid by single node. Nowadays application' clients are not only human beings. Each of has number of gadgets (mobile phones, tablets, smart watches) which multiplies the number of potential clients. What's more while we're leaving surrounded by cloud, applications could communicate each other to exchange data eg. daily planer on our phone can fetch data from forecast service.

I would not be surprised if shortly we'll be surrounded by new generation or intelligent devices being first class internet citizens. Sign of change are Beacons or Raspberry Pi. Believe me or not, but this devices could generate much bigger load to your system than human beings.

There is no doubt that traditional scaling solutions will not be able to fulfill, both scalability and high availability is so rapidly changing environment. Having only simple http load balancer up front the cluster is definitely not enough.

Reactive manifesto reflects all this dramatically changing requirements by giving us high level roadmap, Application needs to be responsive, resilient, event-driven, scalable - but what does it really mean ? Does the message driven system using JMS engine could be reactive? You might have a hunch that it needs to be something much more revolutionary.

In a nutshell old fashion monolithic approach of building applications become completely devalued. Especially to be resilient and scalable applications need to be constructed from loosely coupled micro components/services in the multi jvm/node scale. It will guarantee that we'll be able to easily and quickly react on growing load.

Let say that on the end of month all customers are generating sales reports. It means that our reporting module may be overloaded. We're deciding to put additional nodes in the cluster and load balance the load. If your application is tightly coupled you will need to put all application' modules on this new nodes which is not what we exactly need at the moment. We want to use additional computing power only to generate reports so only reporting module should be placed on new nodes.

Ideally would be if we could put any mix of the modules on any node.

Micro components

Does this architecture has the only advantages? Definitely not. Complexity grows. We need to care about deployment and possible faults. Components also will require adaptive communication mechanisms and rest service will not be always the best choice especially if communication logic is more tricky.

Well, it sound a little bit scary, but do not worry, we have a pretty good stock of tools which will help us in this. I will try devote few articles to explain how to build such kind of reactive application. First of all let me introduce you our main game player.

Akka

To introduce Akka I will need to write at least few articles, nethertheless Akka is toolkit and runtime for building highly concurrent applications, which is employing ideas that have been around for some time - actor system or asynchronous processing. But it's enough to make revolution in distributed systems? Definitely not. What makes that Akka revolutionary in this area is set of clustering features, especially load balancing, location transparency, self maintenance and fault tolerance. All this stuff will allow us to quickly build, highly scalable and highly available clustered applications.

Load balancing

Load balancing functionality implemented in Akka is similar to typical http (tcp/udp) load balancers like (haproxy or keepalived) - the main goal is the same, we want to reduce the load on an individual computing unit by redistribution of the work in a way which is transparent for application's user.

Starting point is a single node with multiple workers which are handling incoming internal messages, truly saying this model is not so far away from the model of http server where we have m number of http requests and n number of worker threads. But in this case we are dealing with internal messages that maybe or may not be distributed around the cluster.

Pub Sub

In the above diagram, we have an example showing internal messages to the system which are being handled by the workers. In our examples, Workers (A-n) are example of Akka's Actors.

Of course in a typical system, messages and message routing might be a little bit more complex, but in general the idea is the same.

Pub Sub

When the system load is increased, computing power needs to be increased. An additional new node in the cluster should handle some of the messages, and reduce overall load of the system. This will however require an entity (a Router/Balancer) to process each message and distribute it within the cluster.

Pub Sub

This does represent a single point of failure but thanks to Akka's supervision capabilities the state of an Actor can be monitored and if it fails it can be recreated automatically in another node in the cluster. This architecture also represents a possible bottleneck within the cluster so care must be made that the Router/Balancer is as lightweight as possible.

At this point it is legitimate to ask the question - how big implementation/configuration effort will be required to setup following system?

Router

The above architecture (with a Router/Balancer) is a typical Akka deployment and so the framework provides several ready-made routing strategies called routing logic which could be found inside akka.routing package :

and inside akka.cluster.routing package, even more sophisticated adaptive load balancing mechanism dedicated to cluster usage,

Custom logic could be created and used within Router and used inside or outside of an Actor. A Router instance used outside an Actor doesn't fit so well overall architecture and limits potential extensions like supervision of the workers termination.

Contract between Router and custom router logic is defined by the RoutingLogic interface.

final case class Router(val logic: RoutingLogic, 
                        val routees: immutable.IndexedSeq[Routee] = Vector.empty) 

Inside RoutingLogic there is a select method which is responsible for piking one of provided routes. akka.routing.RoundRobinRoutingLogic implementation looks as follow:

override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.isEmpty) NoRoutee
    else routees((next.getAndIncrement % routees.size).asInstanceOf[Int])

Thanks to this, algorithm used inside select could be easily extended or replaced using own custom routes calculation.

Router usage is very clean & simple - only thing worth mention is Broadcast envelope which forces router to deliver messages to all routes.

def route(message: Any, sender: ActorRef): Unit =
    message match {
      case akka.routing.Broadcast(msg) ? SeveralRoutees(routees).send(msg, sender)
      case msg                         ? send(logic.select(msg, routees), message, sender)
    }

Router actor - pool & group

A self contained Actor is another way to create Router, and speaking for myself this is much more consistent with overall actor and message passing design.

With regard to the way how the Actor on the end of the route is created, we can take advantage of two predefined models:

  • Pool - Router creates and deploy child Actors
  • Group - Router utilize externally created Actors

An child Actor created by a Pool will be Router's child and it means that it will be supervised by the router.

The same as it was for *RoutingLogic we have similar list of *Pool and *Group ready to use implementations eg.

the exception is lack of BalancingGroup and SmallestMailboxGroup, mostly due to fact that internal mailbox of externally created actors is not available.

Of course we shouldn't be surprised by Pool/Group internals - old good DRY principle in use.

override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic())

This is a good moment to start writing some code. Router can be configured programmatically or using configuration file, and we will try both ways.

First of all, we need to introduce ad EchoActor, which will be receiver of simple string message.

class EchoActor extends Actor with ActorLogging {
  def receive: Receive = {
    case message =>
      log.info("Received Message {} in Actor {}", message, self.path.name)
  }
}

Now we can instantiate ready-made Router and use it to sent messages.

    val _system = ActorSystem("Router")
    val randomRouter = _system.actorOf(Props[EchoActor]
                    .withRouter(RandomPool(10)), name = "RandomPoolActor")
    1 to 10 foreach {
      i => randomRouter ! i
    }

An output contains 10 messages randomly delivered to 5 unnamed instances of EchoActor. Work is done by the threads from Router-akka.actor.default-dispatcher-* executor.

[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$h] Received Message 5 in Actor $h
[Router-akka.actor.default-dispatcher-8] [akka://Router/user/RandomPoolActor/$d] Received Message 3 in Actor $d
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$g] Received Message 2 in Actor $g
[Router-akka.actor.default-dispatcher-3] [akka://Router/user/RandomPoolActor/$b] Received Message 1 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$j] Received Message 6 in Actor $j
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$a] Received Message 7 in Actor $a
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$b] Received Message 10 in Actor $b
[Router-akka.actor.default-dispatcher-8] [akka://Router/user/RandomPoolActor/$d] Received Message 8 in Actor $d
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 4 in Actor $e
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 9 in Actor $e

Execution is more or less random and dependents on thread assignment which is controlled internally by Akka runtime.

Lets see what will happen if we decrease number of workers to 1

val _system = ActorSystem("Router")
val randomRouter = _system.actorOf(Props[EchoActor].withRouter(RandomPool(1)), name = "RandomPoolActor")
1 to 10 foreach {
    i => randomRouter ! i
}
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 1 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 2 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 3 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 4 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 5 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 6 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 7 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 8 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 9 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 10 in Actor $a

All the messages are now handled sequentially what's consistent with Actor's Mailbox processing architecture. What's worth mention - the same thread is used for processing all events - no context switching - nice.

The same setup initialized from configuration file will require writing few lines in application.conf

RouterExample {
  akka.actor.deployment {
    /RandomPoolActor {
      router = random-pool
      nr-of-instances = 5
    }
  }
}

Plus now initialization needs to be preceded by the configuration load.

val system = ActorSystem.create("Router", ConfigFactory.load().getConfig("RouterExample"))
val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RandomPoolActor")
1 to 10 foreach {
    i => randomRouter ! i
}
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 5 in Actor $e
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 1 in Actor $a
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$b] Received Message 3 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$c] Received Message 2 in Actor $c
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$d] Received Message 9 in Actor $d
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$b] Received Message 7 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$c] Received Message 4 in Actor $c
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$b] Received Message 8 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$c] Received Message 6 in Actor $c
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 10 in Actor $e

No surprises, messages are handled as before.

Custom pool experiments

Brilliant side of Akka is almost unlimited customization. Akka's reference configuration contains list of all available router types.

router.type-mapping {
      from-code = "akka.routing.NoRouter"
      round-robin-pool = "akka.routing.RoundRobinPool"
      round-robin-group = "akka.routing.RoundRobinGroup"
      random-pool = "akka.routing.RandomPool"
      random-group = "akka.routing.RandomGroup"
      balancing-pool = "akka.routing.BalancingPool"
      smallest-mailbox-pool = "akka.routing.SmallestMailboxPool"
      broadcast-pool = "akka.routing.BroadcastPool"
      broadcast-group = "akka.routing.BroadcastGroup"
      scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool"
      scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup"
      consistent-hashing-pool = "akka.routing.ConsistentHashingPool"
      consistent-hashing-group = "akka.routing.ConsistentHashingGroup"
    }

But what if we would like to introduce our own type, which could be used inside configuration file?

First of all lets create custom Pool - it totally doesn't make sense but let assume that we want always choose the latest route in the pool. As I mentioned before core of the routing algorithm exists inside RoutingLogic and this will be first element of our custom implementation created in dummy package akka.routing2

object LastRouteRoutingLogic {
  def apply(): LastRouteRoutingLogic = new LastRouteRoutingLogic
}

@SerialVersionUID(1L)
final class LastRouteRoutingLogic extends RoutingLogic {
  override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
    if (routees.isEmpty) NoRoutee
    else routees.last
}

Our new LastRoutePool will need to implement few methods, but at this point the crucial part is injection of LastRouteRoutingLogic

@SerialVersionUID(1L)
final case class LastRoutePool(
      var nrOfInstances: Int, val resizer: Option[Resizer] = None,
      val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
      val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
      override val usePoolDispatcher: Boolean = false)
  extends Pool with PoolOverrideUnsetConfig[LastRoutePool] {

  def this(config: Config) =
    this(nrOfInstances = config.getInt("nr-of-instances"))

  def this(nr: Int) = this(nrOfInstances = nr)

  override def createRouter(system: ActorSystem): Router = new Router(LastRouteRoutingLogic())

  def withSupervisorStrategy(strategy: SupervisorStrategy): LastRoutePool = copy(supervisorStrategy = strategy)

  def withResizer(resizer: Resizer): LastRoutePool = copy(resizer = Some(resizer))

  def withDispatcher(dispatcherId: String): LastRoutePool = copy(routerDispatcher = dispatcherId)

  override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
}

Last thing before we will be able to define router = last-route-pool is customization of the akka.actor.router.type-mapping.

RouterExample {
  akka {
    actor {
      router {
        type-mapping {
          last-route-pool = "akka.routing2.LastRoutePool"
        }
      },
      deployment {
        /RandomPoolActor {
          router = last-route-pool
          nr-of-instances = 5
        }
      }
    }
  }
}

Now is the time to enjoy the results.

[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 1 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 2 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 3 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 4 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 5 in Actor $e

Important fact if that we're not overriding default types and we cans still use them, following configuration is still correct.

RouterExample {
  akka {
    actor {
      router {
        type-mapping {
          last-route-pool = "akka.routing2.LastRoutePool"
        }
      },
      deployment {
        /RandomPoolActor {
          router = random-pool
          nr-of-instances = 5
        }
      }
    }
  }
}

Cluster aware router

So far we were using routing logic only within one node but this is not what we want to achieve - we would like to load balance messages using existing nodes in the cluster. Truly saying once again Akka rocks, and to make next step beside cluster boilerplate we need to add only few additional configuration lines.

First of all let's configure two simple actor systems. It will be done one physical machine but using two separate JVMs.

ClusterAwareRouter{
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
      }
    }

    cluster {
      seed-nodes = [
        "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "akka.tcp://ClusterSystem@127.0.0.1:2552"]

      auto-down-unreachable-after = 10s
    }
  }
}

Clue of the configuration is this line provider = "akka.cluster.ClusterActorRefProvider"

object ClusterAwareRouterApp {

  def main(args: Array[String]): Unit = {

    var port = 0
    if (!args.isEmpty && (args(0).equals("node-1")))  port = 2551;
    if (!args.isEmpty && (args(0).equals("node-2")))  port = 2551;

    val system = ActorSystem.create("ClusterSystem", ConfigFactory.
      parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}").
      withFallback(ConfigFactory.load())
      .getConfig("ClusterAwareRouter"))
  }
}

Plus small sbt trick to make our life easier, especially when we need to run more than two nodes.

addCommandAlias("node-1", "runMain sample.cluster.balancer.ClusterAwareRouterApp node-1")

addCommandAlias("node-2", "runMain sample.cluster.balancer.ClusterAwareRouterApp node-2")

Running sample.cluster.balancer.ClusterAwareRouterApp node-1

[main] [Remoting] Starting remoting
[main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Starting up...
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Started up successfully
[ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[ClusterSystem-akka.actor.default-dispatcher-14] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]

Running sample.cluster.balancer.ClusterAwareRouterApp node-2

[main] [Remoting] Starting remoting
[main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
[ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]

Now is the time to bring back our EchoActor, RandomPool and message generation mechanisms used in single node configuration.

object ClusterAwareRouterApp {

  def main(args: Array[String]): Unit = {

    var port = 0
    if (!args.isEmpty && (args(0).equals("node-1")))  port = 2551;
    if (!args.isEmpty && (args(0).equals("node-2")))  port = 2552;

    val system = ActorSystem.create("ClusterSystem", ConfigFactory.
      parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}").
      withFallback(ConfigFactory.load())
      .getConfig("ClusterAwareRouter"))

    if (!args.isEmpty && (args(0).equals("node-1"))) {
      val randomRouter = system.actorOf(Props[EchoActor].
        withRouter(FromConfig()), name = "ClusterAwareActor")

      Thread.sleep(10000)

      1 to 10 foreach {
        i => randomRouter ! i
      }
    }
  }
}

we have exactly the same lines as we had before ...

val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RandomPoolActor")
1 to 10 foreach {
    i => randomRouter ! i
}

and now it's the time to introduce cluster aware router just by small change in configuration under deployment

deployment {
        /ClusterAwareActor {
          router = random-pool
          nr-of-instances = 6
          cluster {
            enabled = on
            max-nr-of-instances-per-node = 3
            allow-local-routees = on
          }
        }
      }
  • enabled - :)
  • max-nr-of-instances-per-node - maximum number of routes deployed per node
  • allow-local-routees - this flag allows deployment of routes on node on which the router is placed

finally configuration will looks like this

ClusterAwareRouter{
  akka {
    actor {
      provider = "akka.cluster.ClusterActorRefProvider"
      deployment {
        /ClusterAwareActor {
          router = random-pool
          nr-of-instances = 6
          cluster {
            enabled = on
            max-nr-of-instances-per-node = 3
            allow-local-routees = on
          }
        }
      }
    }
    remote {
      log-remote-lifecycle-events = off
      netty.tcp {
        hostname = "127.0.0.1"
      }
    }
    cluster {
      seed-nodes = [
        "akka.tcp://ClusterSystem@127.0.0.1:2551",
        "akka.tcp://ClusterSystem@127.0.0.1:2552"]

      auto-down-unreachable-after = 10s
    }
  }
}

First of all lets try to start only one - first node

sbt shell
node-1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 3 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 6 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 7 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 4 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 5 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 8 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 10 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 9 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 1 in Actor c3
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 2 in Actor c3

We had defined 6 routes nr-of-instances = 6, but this was overridden by the maximum number of routes per node max-nr-of-instances-per-node = 3. That's why we have only 3 routes : c1, c2 and c3 handling the messages.

Now lets start two nodes

Running sample.cluster.balancer.ClusterAwareRouterApp node-1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 8 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 5 in Actor c3
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 6 in Actor c3
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 9 in Actor c3
Running sample.cluster.balancer.ClusterAwareRouterApp node-2
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c5] Received Message 1 in Actor c5
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c5] Received Message 2 in Actor c5
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c4] Received Message 3 in Actor c4
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c4] Received Message 4 in Actor c4
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c5] Received Message 7 in Actor c5
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c4] Received Message 10 in Actor c4

We crossed a barrier of one node and have cluster wide load balancing!.

But what if we would like to sent messages from any node? To do so, we will comment out if statement which is controlling where the router instance is initialized.

object ClusterAwareRouterApp {

  def main(args: Array[String]): Unit = {

    var port = 0
    if (!args.isEmpty && (args(0).equals("node-1")))  port = 2551;
    if (!args.isEmpty && (args(0).equals("node-2")))  port = 2552;

    val system = ActorSystem.create("ClusterSystem", ConfigFactory.
      parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}").
      withFallback(ConfigFactory.load())
      .getConfig("ClusterAwareRouter"))

   // if (!args.isEmpty && (args(0).equals("node-1"))) {
      val randomRouter = system.actorOf(Props[EchoActor]
                 .withRouter(FromConfig()), name = "ClusterAwareActor")

      Thread.sleep(10000)

      1 to 10 foreach {
        i => randomRouter ! i
      }
   // }
  }
}

plus to make results more visible we will modify a little bit configuration

deployment {
        /ClusterAwareActor {
          router = random-pool
          nr-of-instances = 2
          cluster {
            enabled = on
            max-nr-of-instances-per-node = 1
            allow-local-routees = on
          }
        }
      }

and once again we'll start two nodes

[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 1 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 5 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 9 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 1 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 2 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 7 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 10 in Actor c2

Output from the first node is enough to make some conclusion .. there is no magic :) We asked for two router instances and we got two independent routing systems. Instead of having only one route per node, as we defined it in configuration max-nr-of-instances-per-node = 1, we have c1 and c2.

To have single instance of router we will need to use more sophisticated Akka's utils eg. ClusterSingletonManager and ClusterSingletonProxy. This topic requires a little bit more time and I'll come back to it in next article.

Summing Up

Routing architecture looks very promising and we're almost ready to make a deep dive into Akka. Stay tuned!

Published in Akka Scala Sbt Load Balancing on July 15, 2014