Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

flatMap in Future closes over too much #7493

Closed
scabug opened this issue May 17, 2013 · 5 comments
Closed

flatMap in Future closes over too much #7493

scabug opened this issue May 17, 2013 · 5 comments
Assignees
Milestone

Comments

@scabug
Copy link

scabug commented May 17, 2013

Future's flatMap implementation holds on to a little to much inside it's inner closure. This can be seen when just using a future, but is really easy to show when using Akka's dataflow and CPS:
https://gist.github.com/amirshim/5602245

def mainLoop() : Unit@cps[Future[Any]] = {
  while (true) {
    val fut = Promise.successful(new Array[Byte](1000000*100)).future // 100MB
    
    // This shift is the same as Akka DataFlow's DataflowFuture.apply()
    val result = shift { onComplete: (Array[Byte] => Future[Any]) =>
      fut flatMap onComplete
    }
    
    println(result)
  }
} 

When this code is run, it leaks because the continuation is waiting for the loop to finish. (at https://github.com/scala/scala/blob/1f4a52b4ed9457863e00fe16d18705b6c6cd5db9/src/library/scala/concurrent/Future.scala#L280) and holding onto all the previous closures.

Although this is a real memory leak in an infinite loop, it doesn't need to be so bad, especially when we have long iterators that allocate lots of memory.
If we limit what the inside closure holds on to, by introducing tempP (see below), we can mitigate most of the memory use in this case:
https://gist.github.com/amirshim/5602077

def flatMap[S](f: T => Future[S])(implicit executor: ExecutionContext): Future[S] = {
    val p = Promise[S]()
 
    onComplete {
      case f: Failure[_] => p complete f.asInstanceOf[Failure[S]]
      case Success(v) => {
        val tempP = p
        try {
          f(v).onComplete({
            case f: Failure[_] => tempP complete f.asInstanceOf[Failure[S]]
            case Success(v) => tempP success v
          })(internalExecutor)
        } catch {
          case NonFatal(t) => tempP failure t
        }
      }
    }(executor)
 
    p.future
  }
@scabug
Copy link
Author

scabug commented May 17, 2013

Imported From: https://issues.scala-lang.org/browse/SI-7493?orig=1
Reporter: Amir Shimoni (amirshim)
Affected Versions: 2.10.1
See #6932

@scabug
Copy link
Author

scabug commented May 18, 2013

@retronym said:
See also scala/scala#1941 / #6932.

@scabug
Copy link
Author

scabug commented May 18, 2013

@retronym said:
Philipp, could you please take a look at this one?

@scabug
Copy link
Author

scabug commented Jul 12, 2013

@adriaanm said:
Assuming this is fixed by scala/scala#2674

@scabug scabug closed this as completed Jul 12, 2013
@scabug
Copy link
Author

scabug commented Jul 12, 2013

Amir Shimoni (amirshim) said:
scala/scala#2674 looks promising. I haven't tested it rigorously, but will report if I see any more issues.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants