Skip to content
本站总访问量次, 访客数人次

Scala 与 Java 多线程对比介绍

线程池/执行上下文

单个线程顺序执行谈不上并发,自然引入多线程概念。创建线程是一个重量级操作,因为需要调用操作系统内核的API,所以最好不要频繁的创建和销毁线程,为了能够复用创建的线程,常用的办法的就是创建线程池。一个线程池对应一个或多个物理线程。

Java

Java 线程池实现是 Executor 框架。详细参考 Java Executor框架介绍 picture 0

picture 1

Executor

  • 执行器接口,也是最顶层的抽象核心接口, 分离了任务和任务的执行。

ExecutorService

  • 在Executor的基础上提供了执行器生命周期管理,任务异步执行等功能。
  • 代理类,因为直接使用 ThreadPoolExecutor 的话需要设置很多参数,ExecutorService 代理了很多开箱即用的 ThreadPoolExecutor (指定了配套参数)。

ScheduledExecutorService

  • 在ExecutorService基础上提供了任务的延迟执行/周期执行的功能。 AbstractExecutorService

  • ExecutorService的抽象实现,为各类执行器类的实现提供基础。

ThreadPoolExecutor

  • 线程池Executor,也是最常用的Executor,可以以线程池的方式管理线程。 ScheduledThreadPoolExecutor

  • 在ThreadPoolExecutor基础上,增加了对周期任务调度的支持

ForkJoinPool

  • Fork/Join线程池,在JDK1.7时引入,时实现Fork/Join框架的核心类。支持 work-stealing 机制。

Java创建线程池的例子如下

ExecutorService executor = Executors.newFixedThreadPool(10);
线程池种类:

  • newCachedThreadPool:按需创建,复用已经创建的线程
  • newFixedThreadPool:创建固定数目线程
  • newScheduledThreadPool:定时执行、周期性执行
  • newSingleThreadExecutor:单线程执行器
  • newSingleThreadScheduledExecutor:单线程、定时执行器
  • newWorkStealingPool:work-stealing 线程池,线程池中有空闲的线程时会尝试执行其他任务

Scala

Scala 多线程/并发中一个重要的概念 —— ExecutionContext,对应 Java 中的 Executor。 ExecutionContext 实现方式

  • scala.concurrent 包中有开箱即用的实现
  • 一个全局静态线程池 ExecutionContext.global,所有使用 global 变量的共享一个线程池。这个 global 会使用所有可用的线程(当然不是独占物理线程)。
  • 可以由 Executor 转成而来
     val pool = Executors.newFixedThreadPool(2)
    
     implicit val ec: ExecutionContextExecutorService = ExecutionContext.fromExecutorService(pool)
    
  • 实现 ExecutionContext 接口

Scala 线程池使用的几种方法

  1. 隐式传递:只要在定义 Future 变量所在的范围内定义了 ExecutionContext 对象,这个对象会被隐式传递过去
    implicit val ec: ExecutionContext = ...
    // scala 3 的语法: given ExecutionContext = ...
    val inverseFuture : Future[Matrix] = Future {
      fatMatrix.inverse()
    } // ec is implicitly passed
    
  2. 隐式使用全局静态的线程池
    import scala.concurrent.ExecutionContext.global
    val inverseFuture : Future[Matrix] = Future {
      fatMatrix.inverse()
    } // ec is implicitly passed
    
  3. 显式使用线程池
    import scala.concurrent.ExecutionContext.global
    
    val inverseFuture : Future[Matrix] = Future {
      fatMatrix.inverse()
    } (global)
    

ExecutionContext.global 是由 ForkJoinPool 支持的 ExecutionContext。 它应该满足大部分情况,但有几点需要注意。 ForkJoinPool 管理有限数量的线程(线程的最大数量由 parallelism level 指定)。 仅当每个阻塞调用都包装在 blocking 调用中时(更多内容见下文),并发阻塞计算的数量才能超过并行度级别。 否则,全局执行上下文中的线程池会有饥饿死锁风险,致使任何计算无法继续进行。缺省情况下,ExecutionContext.global 将其底层ForkJoin连接池的并行级别设置为可用处理器的数量(Runtime.availableProcessors)。 通过设置以下一个(或多个) VM 属性,来重载这个设置:

  • scala.concurrent.context.minThreads
    • 缺省为 Runtime.availableProcessors
  • scala.concurrent.context.numThreads
    • 可以是一个数字,或者是 “xN” 这样形式中的乘数(N);缺省为 Runtime.availableProcessors
  • scala.concurrent.context.maxThreads
    • 缺省为 Runtime.availableProcessors

线程超时

在并发场景下,经常有这样的需求:要求某个线程执行时间不超过阈值(如 3秒)。这是防止因为线程执行时间过长导致接口响应超时。 在 Java 和 Scala 中都使用 Future 接口(但不是同一个类)包装异步计算的结果,等异步计算结束后, - 如果线程有返回结果,可以通过 Future 的方法获取。 - 如果线程没有返回结果,Future 中存放着 null

Java

Future<String> future = executorService.submit(callableTask);
// 允许线程最多执行 200ms, 然后获取结果,如果没计算完会抛出异常
String result = future.get(200, TimeUnit.MILLISECONDS);

Future<String> future1 = executorService.submit(callableTask);
// 立即获取线程执行结果,否则抛出异常
String result2 = future1.get();

Scala

Scala 中使用 Await 类的 Await.ready、Await.result 来实现线程超时。

import scala.concurrent.duration._
import scala.concurrent._
import ExecutionContext.Implicits.global  // implicit execution context
val firstZebra: Future[Int] = Future {
  val words = Files.readAllLines("/etc/dictionaries-common/words").asScala
  words.indexOfSlice("zebra")
}
// 这是阻塞的方法,执行到这个语句时,如果 firstZebra 任务没有执行到 10s,会等到它执行足够10s
val ret: Int = Await.result(firstZebra, 10.seconds)

// btw: 可以注册回调函数来避免阻塞,给 Future 变量指定 .onSuccess 回调函数

Await.ready 和 Await.result 的区别如下

Both are blocking for at most the given Duration. However, Await.result tries to return the future result right away and throws an exception if the future failed while Await.ready returns the completed future from which the result (Success or Failure) can safely be extracted via the value property. The latter is very handy when you have to deal with a timeout as well:

 val future = Future { Thread.sleep(Random.nextInt(2000)); 123 }

 Try(Await.ready(future, 1.second)) match {
    case Success(f) => f.value.get match {
      case Success(res) => // handle future success 
      case Failure(e) => // handle future failure
    }
    case Failure(_) => // handle timeout
}

When using Await.result, the timeout exception and exceptions from failing futures are "mixed up".

Try(Await.result(future, 1.second)) match {
   case Success(res) => // we can deal with the result directly
   case Failure(e) => // but we might have to figure out if a timeout happened
}

参考文档

Comments