samedi 25 juin 2016
Usage of Ask pattern causing the creation of default-akka.actor.default-dispatcher threads for each request
Information: You might want to skip directly to Edit 4 after the introduction.
Recently I wrote a simple scala server app. The app mostly wrote incoming data to the database, or retrieved it. This is my first scala-akka application.
When I deployed the app to my server it failed after about a day. I realized from the simple statistics provided by digitalocean that the CPU usage was rising in a linear fashion, if I s̶e̶n̶d̶ ̶d̶a̶t̶a̶ ̶t̶o̶ request data from the server. If I didn't s̶e̶n̶d̶ request anything from the server the CPU usage was about constant, but wasn't falling from it's previous state.
I connected the app to visualvm and saw that the number of threads is either constant if I don't do anything with the app (I), or grows if I send stuff to the server (II) in a saw like fashion.
There is an obvious coloration here between the number of threads and CPU usage which makes sens.
When I checked the threads tab I saw that most threads are default-akka.actor.default-dispatcher threads
They also don't seem to be doing much.
What could cause this sort of problem? How do I solve it?
Ad. Edit4: I think I found the source of the problem, but I still don't understand why it happens, and how I should solve it.
PS: I must admit that the screenshots are not from the application which failed. I don't have any from the original fialure. However the only difference between this program and the one that failed is that in application.conf I added:
actor {
default-dispatcher {
fork-join-executor {
# Settings this to 1 instead of 3 seems to improve performance.
parallelism-factor = 2.0
parallelism-max = 24
task-peeking-mode = FIFO
}
}
}
This seems to have slowed the speed by which the number of threads rises, but didn't solve the problem.
Edit: Fragment of WriterActor usage (RestApi)
trait RestApi extends CassandraCluster {
import models._
import cassandraDB.{WriterActor, ReaderActor}
implicit def system: ActorSystem
implicit def materializer: ActorMaterializer
implicit def ec: ExecutionContext
implicit val timeout = Timeout(20 seconds)
val cassandraWriterWorker = system.actorOf(Props(new WriterActor(cluster)), "cassandra-writer-actor")
val cassandraReaderWorker = system.actorOf(Props(new ReaderActor(cluster)), "cassandra-reader-actor")
...
def cassandraReaderCall(message: Any): ToResponseMarshallable = message match {
//...
case message: GetActiveContactsByPhoneNumber => (cassandraReaderWorker ? message)(2 seconds).mapTo[Vector[String]].map(result => Json.obj("active_contacts" -> result))
case _ => StatusCodes.BadRequest
}
def confirmedWriterCall(message: Any) = {
(cassandraWriterWorker ? message).mapTo[Boolean].map(result => result)
}
val apiKeyStringV1 = "test123"
val route =
...
path("contacts") {
parameter('apikey ! apiKeyStringV1) {
post {
entity(as[Contacts]){ contact: Contacts =>
cassandraWriterWorker ! contact
complete(StatusCodes.OK)
}
} ~
get {
parameter('phonenumber) { phoneNumber: String =>
complete(cassandraReaderCall(GetActiveContactsByPhoneNumber(phoneNumber)))
}
}
}
} ~
} ~
path("log"/ "gps") {
parameter('apikey ! apiKeyStringV1) {
(post & entity(as[GpsLog])) { gpsLog =>
cassandraWriterWorker ! gpsLog
complete(StatusCodes.OK)
}
}
}
}
}
}
Edit 2: Writer Worker relevant code.
I didn't post all the methods since they are all basically the same. But here you can find the whole file
import java.util.UUID
import akka.actor.Actor
import com.datastax.driver.core.Cluster
import models._
class WriterActor(cluster: Cluster) extends Actor{
import scala.collection.JavaConversions._
val session = cluster.connect(Keyspaces.akkaCassandra)
// ... other inserts
val insertGpsLog = session.prepare("INSERT INTO gps_logs(id, phone_number, lat, long, time) VALUES (?,?,?,?,?);")
// ...
def insertGpsLog(phoneNumber: String, locWithTime: LocationWithTime): Unit =
session.executeAsync(insertGpsLog.bind(UUID.randomUUID().toString, phoneNumber, new java.lang.Double(locWithTime.location.lat),
new java.lang.Double(locWithTime.location.long), new java.lang.Long(locWithTime.time)))
def receive: Receive = {
// ...
case gpsLog: GpsLog => gpsLog.locationWithTimeLog.foreach(locWithTime => insertGpsLog(gpsLog.phoneNumber, locWithTime))
}
}
Edit 3. Miss diagnosis of excessive thread use.
I'm afraid I miss diagnosed the origin of the problem. I later on added a request for data after the write, and forgot about it. When I removed it the number of threads stopped growing. So this is the most likely place where the mistake was made. I updated the trait where the ReaderActor is used and also added the relevant code of the ReaderActor below.
object ReaderActor {
// ...
case class GetActiveContactsByPhoneNumber(phoneNumber: String)
}
class ReaderActor(cluster: Cluster) extends Actor {
import models._
import ReaderActor._
import akka.pattern.pipe
import scala.collection.JavaConversions._
import cassandra.resultset._
import context.dispatcher
val session = cluster.connect(Keyspaces.akkaCassandra)
def buildActiveContactByPhoneNumberResponse(r: Row): String = {
val phoneNumber = r.getString(ContactsKeys.phoneNumber)
return phoneNumber
}
def buildSubSelectContactsList(r: Row): java.util.List[String] = {
val phoneNumber = r.getSet(ContactsKeys.contacts, classOf[String])
return phoneNumber.toList
}
def receive: Receive = {
//...
case GetActiveContactsByPhoneNumber(phoneNumber: String) =>
val subQuery = QueryBuilder.select(ContactsKeys.contacts).
from(Keyspaces.akkaCassandra, ColumnFamilies.contact).
where(QueryBuilder.eq(ContactsKeys.phoneNumber, phoneNumber))
def queryActiveUsers(phoneNumbers: java.util.List[String]) = QueryBuilder.select(ContactsKeys.phoneNumber).
from(Keyspaces.akkaCassandra, ColumnFamilies.contact).
where(QueryBuilder.in(ContactsKeys.phoneNumber, phoneNumbers))
session.execute(subQuery) map((row: Row) => session.executeAsync(queryActiveUsers(buildSubSelectContactsList(row))) map(_.all().map(buildActiveContactByPhoneNumberResponse).toVector) pipeTo sender)
//...
}
}
Edit 4
I run the code locally, controlling all the request. When there are no requests the number of running threads alternates around a certain number, but doesn't have a tendency to go up or down.
I made a variety of requests to see what will change.
The Image pasted below shows several states.
I - no requests yet. number of threads 44-45
II - after a request to the ReaderActor. number of threads 46-47
III - after a request to the ReaderActor. number of threads 48-49
IV - after a request to the ReaderActor. number of threads 50-51
V - after a request to the WrtierActor. number of threads 51-52 (but no problem, notice a daemon thread was started)
VI - after a request to the WriterActor. number of threads 51-52 (constant)
VII - after a request to the ReaderActor (but a different resource then the first three). number of threads 53-54
So what happens is every time we read from the database (regardless how many executeAsync calls are used) 2 extra threads are created. The only difference between the read and the write calls is that one uses the ask pattern and the other doesn't. I checked it by changing the route from:
get {
parameter('phonenumber) { phoneNumber: String =>
complete(cassandraReaderCall(GetActiveContactsByPhoneNumber(phoneNumber)))
}
}
to
get {
parameter('phonenumber) { phoneNumber: String =>
cassandraReaderWorker ! GetActiveContactsByPhoneNumber(phoneNumber)
complete(StatusCodes.OK)
}
}
obviously not getting any results now, but also not spawning those threads.
So the answer seems to lie in the ask pattern.
I hope somebody can provide an answer as to why this happens and how to solve it?
Inscription à :
Publier les commentaires (Atom)
Aucun commentaire:
Enregistrer un commentaire