Понимание Akka грузоотправитель

голоса
-1

У меня есть несколько вопросов, основанных на ниже примере кода.

1) я указал

akka.cluster.use-dispatcher = cluster-dispatcher in my config. 

Когда я поставил точку останова на этой линии в Frontend.scala,

   _frontend = system.actorOf(Props[Frontend],
        name = frontend)

Я вижу, по умолчанию-диспетчер внутри «_frontend» объект. Как же это не получили кластерный-диспетчер из конфигурации?

2) Я хочу , чтобы имитировать сценарий блокировки этой документации говорит. https://doc.akka.io/docs/akka/2.5/dispatchers.html#problem-blocking-on-default-dispatcher Я пытался поставить по умолчанию-диспетчеру

default-dispatcher {
  fork-join-executor {
    parallelism-min = 1
    parallelism-max = 1
    throughput = 1
  }
}

И я думал , что один «получить» в интерфейсе будет обрабатывать в одно время. Прежде всего, я снова отлаживать в «_frontend» объект , и я не думаю , что он читает в моем умолчанию. Во- вторых, если у вас есть несколько актеров работают в различных удаленных процессов, что это значит на все актеры одни и те же диспетчеру по умолчанию , и что блокирование задач может привести к зависанию? Если актеры работают в разных процессах, не думаю , что каждый из них имеет свой собственный пул потоков? Суть заключается в том , если вы можете дать мне пример или изменить ниже, я могу производить сценарий зависанию, то я могу понять , что это говорит о лучше. Спасибо за вашу помощь.
Грейс

  akka {
      actor {
        provider = akka.cluster.ClusterActorRefProvider
    //    default-dispatcher {
    //      fork-join-executor {
    //        parallelism-min = 1
    //        parallelism-max = 1
    //        throughput = 1
    //      }
    //    }
      }
      remote {
        log-remote-lifecycle-events = off
        netty.tcp {
          hostname = 127.0.0.1
          port = 0
        }
      }

      akka.cluster.use-dispatcher = cluster-dispatcher

      cluster-dispatcher {
        type = Dispatcher
        executor = fork-join-executor
        fork-join-executor {
          parallelism-min = 1
          parallelism-max = 1
        }
      }


      cluster {
        seed-nodes = [
          akka.tcp://ClusterSystem@127.0.0.1:2551,
          akka.tcp://ClusterSystem@127.0.0.1:2552]

        auto-down-unreachable-after = 10s
      }
    }

    akka.cluster.min-nr-of-members = 3


    akka.cluster.role {
      frontend.min-nr-of-members = 1
      backend.min-nr-of-members = 2
    }

    akka.actor.deployment {
      /frontend/backendRouter {
        # Router type provided by metrics extension.
        router = adaptive-group
        # Router parameter specific for metrics extension.
        # metrics-selector = heap
        # metrics-selector = load
        # metrics-selector = cpu
        metrics-selector = mix
        #
        nr-of-instances = 100
        routees.paths = [/user/backend]
        cluster {
          enabled = on
          use-role = backend
          allow-local-routees = off
        }
      }
    }

============================

package com.packt.akka.loadBalancing

import com.packt.akka.commons.Add

object LoadBalancingApp extends App {

//initiate three nodes from backend
Backend.initiate(2551)

Backend.initiate(2552)

Backend.initiate(2561)

//initiate frontend node
Frontend.initiate()

Thread.sleep(10000)

Frontend.getFrontend ! Add(2, 4)

}

=============================

package com.packt.akka.loadBalancing

import akka.cluster._
import com.packt.akka.commons._
import com.typesafe.config.ConfigFactory
import akka.cluster.ClusterEvent.MemberUp
import akka.actor.{ Actor, ActorRef, ActorSystem, Props, RootActorPath }

class Backend extends Actor {

  def receive = {
    case Add(num1, num2) =>
      println(sI'm a backend with path: ${self} and I received add operation.)
      Thread.sleep(60000)
      println(sI'm a backend with path: ${self} and I am done with add operation.)
  }

}

object Backend {
  def initiate(port: Int){
    val config = ConfigFactory.parseString(sakka.remote.netty.tcp.port=$port).
      withFallback(ConfigFactory.parseString(akka.cluster.roles = [backend])).
      withFallback(ConfigFactory.load(loadbalancer))

    val system = ActorSystem(ClusterSystem, config)

    val Backend = system.actorOf(Props[Backend], name = backend)

    Backend
  }
}

=====================

    package com.packt.akka.loadBalancing

    import com.packt.akka.commons._
    import scala.concurrent.duration._
    import com.typesafe.config.ConfigFactory
    import akka.actor.{ Actor, ActorRef, ActorSystem, Props } 
    import akka.cluster.Cluster
    import akka.routing.FromConfig
    import akka.actor.ReceiveTimeout
    import scala.util.Random


    class Frontend extends Actor {
      import context.dispatcher

      val backend = context.actorOf(FromConfig.props(), name = backendRouter)

      context.system.scheduler.schedule(3.seconds, 3.seconds, self,
        Add(Random.nextInt(100), Random.nextInt(100)))

      def receive = {
        case addOp: Add =>
          println(Frontend: I'll forward add operation to backend node to handle it.)
          backend forward addOp

      }

    }


object Frontend {

  private var _frontend: ActorRef = _ 

  val upToN = 200

  def initiate() = {
    val config = ConfigFactory.parseString(akka.cluster.roles = [frontend]).
      withFallback(ConfigFactory.load(loadbalancer))

    val system = ActorSystem(ClusterSystem, config)
    system.log.info(Frontend will start when 2 backend members in the cluster.)
    //#registerOnUp
    Cluster(system) registerOnMemberUp {
      _frontend = system.actorOf(Props[Frontend],
        name = frontend)
    }
    //#registerOnUp

  }

  def getFrontend = _frontend
}
Задан 20/10/2018 в 05:21
источник пользователем
На других языках...                            


1 ответов

голоса
1

1) Смотри документацию akka.cluster.use-dispatcher = cluster-dispatcher in my config.в reference.conf :

# The id of the dispatcher to use for cluster actors. If not specified
# default dispatcher is used.
# If specified you need to define the settings of the actual dispatcher.
use-dispatcher

Этот параметр позволяет настроить то, что диспетчер используются для «внутренних» кластера актеров, а не для собственных актеров.

2) parallelism-maxПараметр к ForkJoinPool не ограничивает число фактических потоков. Как поясняется в примечании в документации :

Обратите внимание , что параллелизм-макс не установлен верхней границей полного числа потоков , выделяемых ForkJoinPool. Это установка конкретно говорить о количестве горячих потоков бассейн продолжает работать для того , чтобы уменьшить задержку обработки нового входящего задания. Вы можете прочитать больше о параллельности в JDK в документации ForkJoinPool .

Вы правы, что когда актеры работают в различных процессах виртуальной машины Java, они имеют отдельные диспетчер.

Если вы хотите , чтобы сделать эксперимент и увидеть проблемы зависания в действии, самый простой подход заключается в создании актера , который использует блокирующие вызовы (например Thread.sleep) в его обработке сообщений. Теперь приступаем создать множество экземпляров этого актера, и отправлять им сообщения. Вы увидите , ваша программа делает очень медленный прогресс.

Для контраста, если вы пишете тот же актер , но достичь «замедленного вычисления» с планировщиком вместо с Thread.sleep, вы должны увидеть гораздо более высокую производительность.

Ответил 02/11/2018 в 12:55
источник пользователем

Cookies help us deliver our services. By using our services, you agree to our use of cookies. Learn more