samedi 25 juin 2016

Usage of Ask pattern causing the creation of default-akka.actor.default-dispatcher threads for each request

Information: You might want to skip directly to Edit 4 after the introduction. Recently I wrote a simple scala server app. The app mostly wrote incoming data to the database, or retrieved it. This is my first scala-akka application. When I deployed the app to my server it failed after about a day. I realized from the simple statistics provided by digitalocean that the CPU usage was rising in a linear fashion, if I s̶e̶n̶d̶ ̶d̶a̶t̶a̶ ̶t̶o̶ request data from the server. If I didn't s̶e̶n̶d̶ request anything from the server the CPU usage was about constant, but wasn't falling from it's previous state. I connected the app to visualvm and saw that the number of threads is either constant if I don't do anything with the app (I), or grows if I send stuff to the server (II) in a saw like fashion. There is an obvious coloration here between the number of threads and CPU usage which makes sens. When I checked the threads tab I saw that most threads are default-akka.actor.default-dispatcher threads They also don't seem to be doing much. What could cause this sort of problem? How do I solve it? Ad. Edit4: I think I found the source of the problem, but I still don't understand why it happens, and how I should solve it. PS: I must admit that the screenshots are not from the application which failed. I don't have any from the original fialure. However the only difference between this program and the one that failed is that in application.conf I added: actor { default-dispatcher { fork-join-executor { # Settings this to 1 instead of 3 seems to improve performance. parallelism-factor = 2.0 parallelism-max = 24 task-peeking-mode = FIFO } } } This seems to have slowed the speed by which the number of threads rises, but didn't solve the problem. Edit: Fragment of WriterActor usage (RestApi) trait RestApi extends CassandraCluster { import models._ import cassandraDB.{WriterActor, ReaderActor} implicit def system: ActorSystem implicit def materializer: ActorMaterializer implicit def ec: ExecutionContext implicit val timeout = Timeout(20 seconds) val cassandraWriterWorker = system.actorOf(Props(new WriterActor(cluster)), "cassandra-writer-actor") val cassandraReaderWorker = system.actorOf(Props(new ReaderActor(cluster)), "cassandra-reader-actor") ... def cassandraReaderCall(message: Any): ToResponseMarshallable = message match { //... case message: GetActiveContactsByPhoneNumber => (cassandraReaderWorker ? message)(2 seconds).mapTo[Vector[String]].map(result => Json.obj("active_contacts" -> result)) case _ => StatusCodes.BadRequest } def confirmedWriterCall(message: Any) = { (cassandraWriterWorker ? message).mapTo[Boolean].map(result => result) } val apiKeyStringV1 = "test123" val route = ... path("contacts") { parameter('apikey ! apiKeyStringV1) { post { entity(as[Contacts]){ contact: Contacts => cassandraWriterWorker ! contact complete(StatusCodes.OK) } } ~ get { parameter('phonenumber) { phoneNumber: String => complete(cassandraReaderCall(GetActiveContactsByPhoneNumber(phoneNumber))) } } } } ~ } ~ path("log"/ "gps") { parameter('apikey ! apiKeyStringV1) { (post & entity(as[GpsLog])) { gpsLog => cassandraWriterWorker ! gpsLog complete(StatusCodes.OK) } } } } } } Edit 2: Writer Worker relevant code. I didn't post all the methods since they are all basically the same. But here you can find the whole file import java.util.UUID import akka.actor.Actor import com.datastax.driver.core.Cluster import models._ class WriterActor(cluster: Cluster) extends Actor{ import scala.collection.JavaConversions._ val session = cluster.connect(Keyspaces.akkaCassandra) // ... other inserts val insertGpsLog = session.prepare("INSERT INTO gps_logs(id, phone_number, lat, long, time) VALUES (?,?,?,?,?);") // ... def insertGpsLog(phoneNumber: String, locWithTime: LocationWithTime): Unit = session.executeAsync(insertGpsLog.bind(UUID.randomUUID().toString, phoneNumber, new java.lang.Double(locWithTime.location.lat), new java.lang.Double(locWithTime.location.long), new java.lang.Long(locWithTime.time))) def receive: Receive = { // ... case gpsLog: GpsLog => gpsLog.locationWithTimeLog.foreach(locWithTime => insertGpsLog(gpsLog.phoneNumber, locWithTime)) } } Edit 3. Miss diagnosis of excessive thread use. I'm afraid I miss diagnosed the origin of the problem. I later on added a request for data after the write, and forgot about it. When I removed it the number of threads stopped growing. So this is the most likely place where the mistake was made. I updated the trait where the ReaderActor is used and also added the relevant code of the ReaderActor below. object ReaderActor { // ... case class GetActiveContactsByPhoneNumber(phoneNumber: String) } class ReaderActor(cluster: Cluster) extends Actor { import models._ import ReaderActor._ import akka.pattern.pipe import scala.collection.JavaConversions._ import cassandra.resultset._ import context.dispatcher val session = cluster.connect(Keyspaces.akkaCassandra) def buildActiveContactByPhoneNumberResponse(r: Row): String = { val phoneNumber = r.getString(ContactsKeys.phoneNumber) return phoneNumber } def buildSubSelectContactsList(r: Row): java.util.List[String] = { val phoneNumber = r.getSet(ContactsKeys.contacts, classOf[String]) return phoneNumber.toList } def receive: Receive = { //... case GetActiveContactsByPhoneNumber(phoneNumber: String) => val subQuery = QueryBuilder.select(ContactsKeys.contacts). from(Keyspaces.akkaCassandra, ColumnFamilies.contact). where(QueryBuilder.eq(ContactsKeys.phoneNumber, phoneNumber)) def queryActiveUsers(phoneNumbers: java.util.List[String]) = QueryBuilder.select(ContactsKeys.phoneNumber). from(Keyspaces.akkaCassandra, ColumnFamilies.contact). where(QueryBuilder.in(ContactsKeys.phoneNumber, phoneNumbers)) session.execute(subQuery) map((row: Row) => session.executeAsync(queryActiveUsers(buildSubSelectContactsList(row))) map(_.all().map(buildActiveContactByPhoneNumberResponse).toVector) pipeTo sender) //... } } Edit 4 I run the code locally, controlling all the request. When there are no requests the number of running threads alternates around a certain number, but doesn't have a tendency to go up or down. I made a variety of requests to see what will change. The Image pasted below shows several states. I - no requests yet. number of threads 44-45 II - after a request to the ReaderActor. number of threads 46-47 III - after a request to the ReaderActor. number of threads 48-49 IV - after a request to the ReaderActor. number of threads 50-51 V - after a request to the WrtierActor. number of threads 51-52 (but no problem, notice a daemon thread was started) VI - after a request to the WriterActor. number of threads 51-52 (constant) VII - after a request to the ReaderActor (but a different resource then the first three). number of threads 53-54 So what happens is every time we read from the database (regardless how many executeAsync calls are used) 2 extra threads are created. The only difference between the read and the write calls is that one uses the ask pattern and the other doesn't. I checked it by changing the route from: get { parameter('phonenumber) { phoneNumber: String => complete(cassandraReaderCall(GetActiveContactsByPhoneNumber(phoneNumber))) } } to get { parameter('phonenumber) { phoneNumber: String => cassandraReaderWorker ! GetActiveContactsByPhoneNumber(phoneNumber) complete(StatusCodes.OK) } } obviously not getting any results now, but also not spawning those threads. So the answer seems to lie in the ask pattern. I hope somebody can provide an answer as to why this happens and how to solve it?

Aucun commentaire:

Enregistrer un commentaire