In past years main reason for the use of the load balancer was growing interest in the web sites. This caused huge traffic spikes which couldn't be handleid by single node. Nowadays application' clients are not only human beings. Each of has number of gadgets (mobile phones, tablets, smart watches) which multiplies the number of potential clients. What's more while we're leaving surrounded by cloud, applications could communicate each other to exchange data eg. daily planer on our phone can fetch data from forecast service.
I would not be surprised if shortly we'll be surrounded by new generation or intelligent devices being first class internet citizens. Sign of change are Beacons or Raspberry Pi. Believe me or not, but this devices could generate much bigger load to your system than human beings.
There is no doubt that traditional scaling solutions will not be able to fulfill, both scalability and high availability is so rapidly changing environment. Having only simple http load balancer up front the cluster is definitely not enough.
Reactive manifesto reflects all this dramatically changing requirements by giving us high level roadmap, Application needs to be responsive, resilient, event-driven, scalable - but what does it really mean ? Does the message driven system using JMS engine could be reactive? You might have a hunch that it needs to be something much more revolutionary.
In a nutshell old fashion monolithic approach of building applications become completely devalued. Especially to be resilient and scalable applications need to be constructed from loosely coupled micro components/services in the multi jvm/node scale. It will guarantee that we'll be able to easily and quickly react on growing load.
Let say that on the end of month all customers are generating sales reports. It means that our reporting module may be overloaded. We're deciding to put additional nodes in the cluster and load balance the load. If your application is tightly coupled you will need to put all application' modules on this new nodes which is not what we exactly need at the moment. We want to use additional computing power only to generate reports so only reporting module should be placed on new nodes.
Ideally would be if we could put any mix of the modules on any node.
Does this architecture has the only advantages? Definitely not. Complexity grows. We need to care about deployment and possible faults. Components also will require adaptive communication mechanisms and rest service will not be always the best choice especially if communication logic is more tricky.
Well, it sound a little bit scary, but do not worry, we have a pretty good stock of tools which will help us in this. I will try devote few articles to explain how to build such kind of reactive application. First of all let me introduce you our main game player.
Akka
To introduce Akka I will need to write at least few articles, nethertheless Akka is toolkit and runtime for building highly concurrent applications, which is employing ideas that have been around for some time - actor system or asynchronous processing. But it's enough to make revolution in distributed systems? Definitely not. What makes that Akka revolutionary in this area is set of clustering features, especially load balancing, location transparency, self maintenance and fault tolerance. All this stuff will allow us to quickly build, highly scalable and highly available clustered applications.
Load balancing
Load balancing functionality implemented in Akka is similar to typical http (tcp/udp) load balancers like (haproxy or keepalived) - the main goal is the same, we want to reduce the load on an individual computing unit by redistribution of the work in a way which is transparent for application's user.
Starting point is a single node with multiple workers which are handling incoming internal messages, truly saying this model is not so far away from the model of http server where we have m number of http requests and n number of worker threads. But in this case we are dealing with internal messages that maybe or may not be distributed around the cluster.
In the above diagram, we have an example showing internal messages to the system which are being handled by the workers. In our examples, Workers (A-n) are example of Akka's Actors
.
Of course in a typical system, messages and message routing might be a little bit more complex, but in general the idea is the same.
When the system load is increased, computing power needs to be increased. An additional new node in the cluster should handle some of the messages, and reduce overall load of the system. This will however require an entity (a Router/Balancer) to process each message and distribute it within the cluster.
This does represent a single point of failure but thanks to Akka's supervision capabilities the state of an Actor
can be monitored and if it fails it can be recreated automatically in another node in the cluster. This architecture also represents a possible bottleneck within the cluster so care must be made that the Router/Balancer is as lightweight as possible.
At this point it is legitimate to ask the question - how big implementation/configuration effort will be required to setup following system?
Router
The above architecture (with a Router/Balancer) is a typical Akka deployment and so the framework provides several ready-made routing strategies called routing logic which could be found inside akka.routing
package :
RoundRobinRoutingLogic
- selects route using most effective round-robin algorithmRandomRoutingLogic
- randomly selects one of the target routeSmallestMailboxRoutingLogic
- selects route to the target with smallest mailbox, in case of remote picks any remote routee since their mailbox size is unknownBroadcastRoutingLogic
- selects routes to all targetsScatterGatherFirstCompletedRoutingLogic
- broadcasts the message to all routees, and replies with the first responseConsistentHashingRoutingLogic
- selects consistent route for on provided key
and inside akka.cluster.routing
package, even more sophisticated adaptive load balancing mechanism dedicated to cluster usage,
AdaptiveLoadBalancingRoutingLogic
- which takes advantage of cluster metrics giving feedback about real load of the system.
Custom logic could be created and used within Router
and used inside or outside of an Actor
. A Router
instance used outside an Actor
doesn't fit so well overall architecture and limits potential extensions like supervision of the workers termination.
Contract between Router
and custom router logic is defined by the RoutingLogic
interface.
final case class Router(val logic: RoutingLogic,
val routees: immutable.IndexedSeq[Routee] = Vector.empty)
Inside RoutingLogic
there is a select
method which is responsible for piking one of provided routes. akka.routing.RoundRobinRoutingLogic
implementation looks as follow:
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else routees((next.getAndIncrement % routees.size).asInstanceOf[Int])
Thanks to this, algorithm used inside select
could be easily extended or replaced using own custom routes calculation.
Router
usage is very clean & simple - only thing worth mention is Broadcast
envelope which forces router to deliver messages to all routes.
def route(message: Any, sender: ActorRef): Unit =
message match {
case akka.routing.Broadcast(msg) ? SeveralRoutees(routees).send(msg, sender)
case msg ? send(logic.select(msg, routees), message, sender)
}
Router actor - pool & group
A self contained Actor
is another way to create Router, and speaking for myself this is much more consistent with overall actor and message passing design.
With regard to the way how the Actor
on the end of the route is created, we can take advantage of two predefined models:
Pool
-Router
creates and deploy childActors
Group
-Router
utilize externally createdActors
An child Actor
created by a Pool will be Router
's child and it means that it will be supervised by the router.
The same as it was for *RoutingLogic
we have similar list of *Pool
and *Group
ready to use implementations eg.
RoundRobinPool
/RoundRobinGroup
RandomPool
/RandomGroup
SmallestMailboxPool
ScatterGatherFirstCompletedPool
/ScatterGatherFirstCompletedGroup
ConsistentHashingPool
/ConsistentHashingGroup
AdaptiveLoadBalancingPool
/AdaptiveLoadBalancingGroup
the exception is lack of BalancingGroup
and SmallestMailboxGroup
, mostly due to fact that internal mailbox of externally created actors is not available.
Of course we shouldn't be surprised by Pool/Group internals - old good DRY principle in use.
override def createRouter(system: ActorSystem): Router = new Router(RandomRoutingLogic())
This is a good moment to start writing some code. Router can be configured programmatically or using configuration file, and we will try both ways.
First of all, we need to introduce ad EchoActor
, which will be receiver of simple string message.
class EchoActor extends Actor with ActorLogging {
def receive: Receive = {
case message =>
log.info("Received Message {} in Actor {}", message, self.path.name)
}
}
Now we can instantiate ready-made Router
and use it to sent messages.
val _system = ActorSystem("Router")
val randomRouter = _system.actorOf(Props[EchoActor]
.withRouter(RandomPool(10)), name = "RandomPoolActor")
1 to 10 foreach {
i => randomRouter ! i
}
An output contains 10 messages randomly delivered to 5 unnamed instances of EchoActo
r. Work is done by the threads
from Router-akka.actor.default-dispatcher-*
executor.
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$h] Received Message 5 in Actor $h
[Router-akka.actor.default-dispatcher-8] [akka://Router/user/RandomPoolActor/$d] Received Message 3 in Actor $d
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$g] Received Message 2 in Actor $g
[Router-akka.actor.default-dispatcher-3] [akka://Router/user/RandomPoolActor/$b] Received Message 1 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$j] Received Message 6 in Actor $j
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$a] Received Message 7 in Actor $a
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$b] Received Message 10 in Actor $b
[Router-akka.actor.default-dispatcher-8] [akka://Router/user/RandomPoolActor/$d] Received Message 8 in Actor $d
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 4 in Actor $e
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 9 in Actor $e
Execution is more or less random and dependents on thread assignment which is controlled internally by Akka runtime.
Lets see what will happen if we decrease number of workers to 1
val _system = ActorSystem("Router")
val randomRouter = _system.actorOf(Props[EchoActor].withRouter(RandomPool(1)), name = "RandomPoolActor")
1 to 10 foreach {
i => randomRouter ! i
}
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 1 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 2 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 3 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 4 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 5 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 6 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 7 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 8 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 9 in Actor $a
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 10 in Actor $a
All the messages are now handled sequentially what's consistent with Actor
's Mailbox
processing architecture. What's worth mention - the same thread is used for processing all events - no context switching - nice.
The same setup initialized from configuration file will require writing few lines in application.conf
RouterExample {
akka.actor.deployment {
/RandomPoolActor {
router = random-pool
nr-of-instances = 5
}
}
}
Plus now initialization needs to be preceded by the configuration load.
val system = ActorSystem.create("Router", ConfigFactory.load().getConfig("RouterExample"))
val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RandomPoolActor")
1 to 10 foreach {
i => randomRouter ! i
}
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 5 in Actor $e
[Router-akka.actor.default-dispatcher-2] [akka://Router/user/RandomPoolActor/$a] Received Message 1 in Actor $a
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$b] Received Message 3 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$c] Received Message 2 in Actor $c
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$d] Received Message 9 in Actor $d
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$b] Received Message 7 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$c] Received Message 4 in Actor $c
[Router-akka.actor.default-dispatcher-6] [akka://Router/user/RandomPoolActor/$b] Received Message 8 in Actor $b
[Router-akka.actor.default-dispatcher-4] [akka://Router/user/RandomPoolActor/$c] Received Message 6 in Actor $c
[Router-akka.actor.default-dispatcher-7] [akka://Router/user/RandomPoolActor/$e] Received Message 10 in Actor $e
No surprises, messages are handled as before.
Custom pool experiments
Brilliant side of Akka is almost unlimited customization. Akka's reference configuration contains list of all available router types.
router.type-mapping {
from-code = "akka.routing.NoRouter"
round-robin-pool = "akka.routing.RoundRobinPool"
round-robin-group = "akka.routing.RoundRobinGroup"
random-pool = "akka.routing.RandomPool"
random-group = "akka.routing.RandomGroup"
balancing-pool = "akka.routing.BalancingPool"
smallest-mailbox-pool = "akka.routing.SmallestMailboxPool"
broadcast-pool = "akka.routing.BroadcastPool"
broadcast-group = "akka.routing.BroadcastGroup"
scatter-gather-pool = "akka.routing.ScatterGatherFirstCompletedPool"
scatter-gather-group = "akka.routing.ScatterGatherFirstCompletedGroup"
consistent-hashing-pool = "akka.routing.ConsistentHashingPool"
consistent-hashing-group = "akka.routing.ConsistentHashingGroup"
}
But what if we would like to introduce our own type, which could be used inside configuration file?
First of all lets create custom Pool
- it totally doesn't make sense but let assume that we want always choose the latest route in the pool. As I mentioned before core of the routing algorithm exists inside RoutingLogic
and this will be first element of our custom implementation created in dummy package akka.routing2
object LastRouteRoutingLogic {
def apply(): LastRouteRoutingLogic = new LastRouteRoutingLogic
}
@SerialVersionUID(1L)
final class LastRouteRoutingLogic extends RoutingLogic {
override def select(message: Any, routees: immutable.IndexedSeq[Routee]): Routee =
if (routees.isEmpty) NoRoutee
else routees.last
}
Our new LastRoutePool
will need to implement few methods, but at this point the crucial part is injection of LastRouteRoutingLogic
@SerialVersionUID(1L)
final case class LastRoutePool(
var nrOfInstances: Int, val resizer: Option[Resizer] = None,
val supervisorStrategy: SupervisorStrategy = Pool.defaultSupervisorStrategy,
val routerDispatcher: String = Dispatchers.DefaultDispatcherId,
override val usePoolDispatcher: Boolean = false)
extends Pool with PoolOverrideUnsetConfig[LastRoutePool] {
def this(config: Config) =
this(nrOfInstances = config.getInt("nr-of-instances"))
def this(nr: Int) = this(nrOfInstances = nr)
override def createRouter(system: ActorSystem): Router = new Router(LastRouteRoutingLogic())
def withSupervisorStrategy(strategy: SupervisorStrategy): LastRoutePool = copy(supervisorStrategy = strategy)
def withResizer(resizer: Resizer): LastRoutePool = copy(resizer = Some(resizer))
def withDispatcher(dispatcherId: String): LastRoutePool = copy(routerDispatcher = dispatcherId)
override def withFallback(other: RouterConfig): RouterConfig = this.overrideUnsetConfig(other)
}
Last thing before we will be able to define router = last-route-pool
is customization of the akka.actor.router.type-mapping
.
RouterExample {
akka {
actor {
router {
type-mapping {
last-route-pool = "akka.routing2.LastRoutePool"
}
},
deployment {
/RandomPoolActor {
router = last-route-pool
nr-of-instances = 5
}
}
}
}
}
Now is the time to enjoy the results.
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 1 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 2 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 3 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 4 in Actor $e
[Router-akka.actor.default-dispatcher-5] [akka://Router/user/RandomPoolActor/$e] Received Message 5 in Actor $e
Important fact if that we're not overriding default types and we cans still use them, following configuration is still correct.
RouterExample {
akka {
actor {
router {
type-mapping {
last-route-pool = "akka.routing2.LastRoutePool"
}
},
deployment {
/RandomPoolActor {
router = random-pool
nr-of-instances = 5
}
}
}
}
}
Cluster aware router
So far we were using routing logic only within one node but this is not what we want to achieve - we would like to load balance messages using existing nodes in the cluster. Truly saying once again Akka rocks, and to make next step beside cluster boilerplate we need to add only few additional configuration lines.
First of all let's configure two simple actor systems. It will be done one physical machine but using two separate JVMs.
ClusterAwareRouter{
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
}
Clue of the configuration is this line provider = "akka.cluster.ClusterActorRefProvider"
object ClusterAwareRouterApp {
def main(args: Array[String]): Unit = {
var port = 0
if (!args.isEmpty && (args(0).equals("node-1"))) port = 2551;
if (!args.isEmpty && (args(0).equals("node-2"))) port = 2551;
val system = ActorSystem.create("ClusterSystem", ConfigFactory.
parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}").
withFallback(ConfigFactory.load())
.getConfig("ClusterAwareRouter"))
}
}
Plus small sbt
trick to make our life easier, especially when we need to run more than two nodes.
addCommandAlias("node-1", "runMain sample.cluster.balancer.ClusterAwareRouterApp node-1")
addCommandAlias("node-2", "runMain sample.cluster.balancer.ClusterAwareRouterApp node-2")
Running sample.cluster.balancer.ClusterAwareRouterApp node-1
[main] [Remoting] Starting remoting
[main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2551]
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Starting up...
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Started up successfully
[ClusterSystem-akka.actor.default-dispatcher-15] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2551] is JOINING, roles []
[ClusterSystem-akka.actor.default-dispatcher-14] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2551] to [Up]
[ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Node [akka.tcp://ClusterSystem@127.0.0.1:2552] is JOINING, roles []
[ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2551] - Leader is moving node [akka.tcp://ClusterSystem@127.0.0.1:2552] to [Up]
Running sample.cluster.balancer.ClusterAwareRouterApp node-2
[main] [Remoting] Starting remoting
[main] [Remoting] Remoting started; listening on addresses :[akka.tcp://ClusterSystem@127.0.0.1:2552]
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Starting up...
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Registered cluster JMX MBean [akka:type=Cluster]
[main] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Started up successfully
[ClusterSystem-akka.actor.default-dispatcher-3] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Metrics collection has started successfully
[ClusterSystem-akka.actor.default-dispatcher-16] [Cluster(akka://ClusterSystem)] Cluster Node [akka.tcp://ClusterSystem@127.0.0.1:2552] - Welcome from [akka.tcp://ClusterSystem@127.0.0.1:2551]
Now is the time to bring back our EchoActor
, RandomPool
and message generation mechanisms used in single node configuration.
object ClusterAwareRouterApp {
def main(args: Array[String]): Unit = {
var port = 0
if (!args.isEmpty && (args(0).equals("node-1"))) port = 2551;
if (!args.isEmpty && (args(0).equals("node-2"))) port = 2552;
val system = ActorSystem.create("ClusterSystem", ConfigFactory.
parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}").
withFallback(ConfigFactory.load())
.getConfig("ClusterAwareRouter"))
if (!args.isEmpty && (args(0).equals("node-1"))) {
val randomRouter = system.actorOf(Props[EchoActor].
withRouter(FromConfig()), name = "ClusterAwareActor")
Thread.sleep(10000)
1 to 10 foreach {
i => randomRouter ! i
}
}
}
}
we have exactly the same lines as we had before ...
val randomRouter = system.actorOf(Props[EchoActor].withRouter(FromConfig()), name = "RandomPoolActor")
1 to 10 foreach {
i => randomRouter ! i
}
and now it's the time to introduce cluster aware router just by small change in configuration under deployment
deployment {
/ClusterAwareActor {
router = random-pool
nr-of-instances = 6
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
}
}
}
enabled
- :)max-nr-of-instances-per-node
- maximum number of routes deployed per nodeallow-local-routees
- this flag allows deployment of routes on node on which the router is placed
finally configuration will looks like this
ClusterAwareRouter{
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
deployment {
/ClusterAwareActor {
router = random-pool
nr-of-instances = 6
cluster {
enabled = on
max-nr-of-instances-per-node = 3
allow-local-routees = on
}
}
}
}
remote {
log-remote-lifecycle-events = off
netty.tcp {
hostname = "127.0.0.1"
}
}
cluster {
seed-nodes = [
"akka.tcp://ClusterSystem@127.0.0.1:2551",
"akka.tcp://ClusterSystem@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
}
First of all lets try to start only one - first node
sbt shell
node-1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 3 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 6 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 7 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 4 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 5 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 8 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 10 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 9 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 1 in Actor c3
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 2 in Actor c3
We had defined 6 routes nr-of-instances = 6
, but this was overridden by the maximum number of routes per node max-nr-of-instances-per-node = 3
. That's why we have only 3 routes : c1, c2 and c3 handling the messages.
Now lets start two nodes
Running sample.cluster.balancer.ClusterAwareRouterApp node-1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c2] Received Message 8 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 5 in Actor c3
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 6 in Actor c3
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c3] Received Message 9 in Actor c3
Running sample.cluster.balancer.ClusterAwareRouterApp node-2
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c5] Received Message 1 in Actor c5
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c5] Received Message 2 in Actor c5
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c4] Received Message 3 in Actor c4
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c4] Received Message 4 in Actor c4
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c5] Received Message 7 in Actor c5
[akka.tcp://ClusterSystem@127.0.0.1:2552/remote/akka.tcp/ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c4] Received Message 10 in Actor c4
We crossed a barrier of one node and have cluster wide load balancing!.
But what if
we would like to sent messages from any node? To do so, we will comment out if
statement which is controlling where the router instance is initialized.
object ClusterAwareRouterApp {
def main(args: Array[String]): Unit = {
var port = 0
if (!args.isEmpty && (args(0).equals("node-1"))) port = 2551;
if (!args.isEmpty && (args(0).equals("node-2"))) port = 2552;
val system = ActorSystem.create("ClusterSystem", ConfigFactory.
parseString(s"ClusterAwareRouter.akka.remote.netty.tcp.port=${port}").
withFallback(ConfigFactory.load())
.getConfig("ClusterAwareRouter"))
// if (!args.isEmpty && (args(0).equals("node-1"))) {
val randomRouter = system.actorOf(Props[EchoActor]
.withRouter(FromConfig()), name = "ClusterAwareActor")
Thread.sleep(10000)
1 to 10 foreach {
i => randomRouter ! i
}
// }
}
}
plus to make results more visible we will modify a little bit configuration
deployment {
/ClusterAwareActor {
router = random-pool
nr-of-instances = 2
cluster {
enabled = on
max-nr-of-instances-per-node = 1
allow-local-routees = on
}
}
}
and once again we'll start two nodes
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 1 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 5 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/user/ClusterAwareActor/c1] Received Message 9 in Actor c1
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 1 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 2 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 7 in Actor c2
[akka.tcp://ClusterSystem@127.0.0.1:2551/remote/akka.tcp/ClusterSystem@127.0.0.1:2552/user/ClusterAwareActor/c2] Received Message 10 in Actor c2
Output from the first node is enough to make some conclusion .. there is no magic :) We asked for two router instances and we got two independent routing systems. Instead of having only one route per node, as we defined it in configuration max-nr-of-instances-per-node = 1
, we have c1 and c2.
To have single instance of router we will need to use more sophisticated Akka's utils eg. ClusterSingletonManager and ClusterSingletonProxy. This topic requires a little bit more time and I'll come back to it in next article.
Summing Up
Routing architecture looks very promising and we're almost ready to make a deep dive into Akka. Stay tuned!