Uploaded image for project: 'Scala Programming Language'
  1. Scala Programming Language
  2. SI-8689

Fail to Recover Remaining Tasks in BatchExecutor

    Details

      Description

      In scala.concurrent.BatchExecutor$Batch.run() of Scala 2.11.1.

      If `head.run()` BatchingExecutor.scala(63) throws a exception, remaining tasks will attempt to execute. But `require(_tasksLocal.get eq null)` at the beginning of the `Batch.run()` that is called recursively from `unbatchedExecute()` will always fail because it set the `Nil` to `_tasksLocal` here.
      This hides the original cause. `_tasksLocal` should `remove()` because the remaining tasks are passed as an argument.

      java.lang.IllegalStateException: problem in scala.concurrent internal callback
      at scala.concurrent.Future$InternalCallbackExecutor$.reportFailure(Future.scala:601)
      at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
      at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
      at scala.concurrent.Promise$class.complete(Promise.scala:55)
      at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
      at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
      at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
      at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32)
      at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
      at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
      at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
      at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
      at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
      Caused by: java.lang.IllegalArgumentException: requirement failed
      at scala.Predef$.require(Predef.scala:207)
      at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:51)
      at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
      at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:72)
      at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
      at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
      at scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
      at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
      at scala.concurrent.BatchingExecutor$Batch.run(BatchingExecutor.scala:54)
      at scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
      at scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
      at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
      ... 13 more

        Attachments

          Activity

          Hide
          viktorklang Viktor Klang added a comment - - edited

          Hi,

          Isn't the only way to get that requirement failure is if you execute the new Batch synchronously in unbatchedExecute?

          Do you have a reproducer for this issue?

          Show
          viktorklang Viktor Klang added a comment - - edited Hi, Isn't the only way to get that requirement failure is if you execute the new Batch synchronously in unbatchedExecute? Do you have a reproducer for this issue?
          Hide
          satyagraha satyagraha added a comment -

          The attached program when run under 2.10.4 generates a stack trace on stderr similar to that reported.

          I think the expected program effects are correct in this case, it's more that the thread attempting to complete the previously completed promise gets broken.

          BTW it does something similar with the Akka execution context, though the traceback is slightly different.

          Show
          satyagraha satyagraha added a comment - The attached program when run under 2.10.4 generates a stack trace on stderr similar to that reported. I think the expected program effects are correct in this case, it's more that the thread attempting to complete the previously completed promise gets broken. BTW it does something similar with the Akka execution context, though the traceback is slightly different.
          Hide
          viktorklang Viktor Klang added a comment -

          Hi @satyagraha,

          may I ask you minimize the reproducer? (i.e. removing all code that isn't needed to reproduce the problem)

          Thank you!

          Show
          viktorklang Viktor Klang added a comment - Hi @satyagraha, may I ask you minimize the reproducer? (i.e. removing all code that isn't needed to reproduce the problem) Thank you! √
          Hide
          satyagraha satyagraha added a comment -

          Now simplified to one future and one promise.

          Show
          satyagraha satyagraha added a comment - Now simplified to one future and one promise.
          Hide
          viktorklang Viktor Klang added a comment - - edited

          Thanks @satyagraha,

          I managed to minimize it a bit further, I can reproduce your problem locally.

          package org.anomaly
          object AnomalyApp3 extends App {
            import scala.concurrent._
            import ExecutionContext.Implicits.global
            val source1 = Promise[Int]()
            val source2 = Promise[Int]()
            source2.completeWith(source1.future).future.onComplete {
              case completion => print(s"source2 completed with: ${completion}")
            }
            source2.tryFailure(new TimeoutException)
            source1.success(123)
          }
          

          Show
          viktorklang Viktor Klang added a comment - - edited Thanks @satyagraha, I managed to minimize it a bit further, I can reproduce your problem locally. package org.anomaly object AnomalyApp3 extends App { import scala.concurrent._ import ExecutionContext.Implicits.global val source1 = Promise[Int]() val source2 = Promise[Int]() source2.completeWith(source1.future).future.onComplete { case completion => print(s"source2 completed with: ${completion}") } source2.tryFailure(new TimeoutException) source1.success(123) }
          Hide
          viktorklang Viktor Klang added a comment -

          Submitted a PR that solves the problem (completeWith): https://github.com/scala/scala/pull/4215

          Show
          viktorklang Viktor Klang added a comment - Submitted a PR that solves the problem (completeWith): https://github.com/scala/scala/pull/4215
          Hide
          satyagraha satyagraha added a comment -

          Delighted with the prompt response and proposed solution! That substitution of tryCompleteWith for completeWith fixes the AnomalyApp examples. I can't see much difference in practical semantics between these methods.

          Show
          satyagraha satyagraha added a comment - Delighted with the prompt response and proposed solution! That substitution of tryCompleteWith for completeWith fixes the AnomalyApp examples. I can't see much difference in practical semantics between these methods.
          Hide
          viktorklang Viktor Klang added a comment -

          Thanks!

          I just added some more tests to completeWith and tryCompleteWith to the PR, and introduced a new commit which deprecates `completeWith` in favor of `tryCompleteWith` as well as applies an optimization in the form of root linking for DefaultPromise.

          Show
          viktorklang Viktor Klang added a comment - Thanks! I just added some more tests to completeWith and tryCompleteWith to the PR, and introduced a new commit which deprecates `completeWith` in favor of `tryCompleteWith` as well as applies an optimization in the form of root linking for DefaultPromise.
          Show
          grek Grzegorz Kossakowski added a comment - https://github.com/scala/scala/pull/4289

            People

            • Assignee:
              viktor.klang Viktor Klang
              Reporter:
              torao Takami Torao
            • Votes:
              1 Vote for this issue
              Watchers:
              6 Start watching this issue

              Dates

              • Created:
                Updated:
                Resolved: