Java

Collections工具类

集合框架中一个重要的类,其实是Collection接口的伴随类,其中定义了许多实用方法,用来获取集合视图,或提供一些方便的操作集合元素的算法

由于视图是直接封装的Collection接口,因此其方法有些局限,并且由于特殊的设计,部分操作是不允许的(会抛出 UnsupportedOperationExceptin )。

...

内部类

将一个类定义在另一个类的内部,这就是内部类。

定义言简意赅 ,内涵丰富多彩。

...

死锁问题2例

Java有能力使任务为等待某些条件成立而进入阻塞状态,所以就有可能出现这样一种情况:某个任务在等待另一个任务,而后者又在等待其他的任务,这样一直等待下去,直到等待链上的最后一个任务又在等待第一个任务释放锁,这样就出现了任务之间相互等待的连续循环现象,这种情况出现之后,没有哪个任务能够执行,于是 死锁 出现。

死锁之所以难以规避,其重要的原因就在于其不确定性,可能程序运行良好,但是有潜在的死锁风险,这个风险在某些域的初始条件变化时,变得特别大,导致程序很快死锁。同时,死锁难以复现,当程序出现死锁时,往往只能通过jvm的堆栈日志来探究原因。

...

终结任务

终结任务 #

一般地,如果程序运行良好,任务执行完所需操作后自然结束,任务终结。

如果任务执行时出现异常,任务也会终结。

在设计多个线程协同工作的任务时,需要判断任务终结的条件,以便合适地终结任务,这点尤为重要。

在本节中主要讨论在多线程协同工作的情况下,如何合适的终结任务。

...

生产者-消费者与阻塞队列

在讨论线程协作的时候,已经讨论了生产者与消费者雏形,比如录音是生产者,而播放则是消费者;同样的,在汽车打蜡的模型中,打蜡可看作生产者,抛光可看作消费者;只是它们的关系是简单的生产-消费关系。

除了简单的线程协同之外,Java提供了同步队列来解决线程的协同问题,本节重点讨论这部分的内容。

...

Executors与Executor框架

Executors可以称作执行器。Java并发系列的文章到目前为止,虽然没有特别说明,但是使用执行器(Executor(s))的次数已经难以计数了,Executors提供了一些非常方便的静态方法,可以根据需要创建不同的ExecutorService,然后调用其execute(Runnable)submit(Callable<T>)方法。

在并发条件下,执行器还有一个非常明显的优势,它使用线程池管理线程,减少了系统创建和销毁线程的开销。在一般的Java并发过程中,也建议使用执行器完成任务而非显式地创建线程。

本文将从执行器开始,阐述Java中的线程池。

...

CompletionService

在提交单个任务时,使用submit()或者execute()方法或许能够满足要求,但如果需要控制多个任务时,依次提交的操作看起来“有些繁琐”,此时我们可以使用ExecutorService提供的invokeAny/invokeAll方法,在介绍CompletionService接口时,我们不妨先看看这两个方法。

之前介绍AbstractExecutorService时提到,这两个方法是在这个抽象类中实现的,其中前者在获取到一个任务的返回值时便取消其他(未执行或正在执行的任务)任务,而后者需要等待所有的任务执行完成之后才能对任务的返回进行处理,接下来我们分别来看:

invokeAll会阻塞等待所有的任务执行完成。

  1public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
  2    throws InterruptedException {
  3    if (tasks == null)
  4        throw new NullPointerException();
  5    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
  6    boolean done = false;
  7    try {
  8        for (Callable<T> t : tasks) {
  9            RunnableFuture<T> f = newTaskFor(t);
 10            futures.add(f);
 11            execute(f);
 12        }
 13        // 有序迭代
 14        for (int i = 0, size = futures.size(); i < size; i++) {
 15            Future<T> f = futures.get(i);
 16            if (!f.isDone()) {
 17                try {
 18                    // 阻塞等待任务执行完成
 19                    f.get();
 20                } catch (CancellationException ignore) {
 21                } catch (ExecutionException ignore) {
 22                }
 23            }
 24        }
 25        done = true;
 26        return futures;
 27    } finally {
 28        if (!done)
 29            // 处理因异常而未正常执行的任务
 30            for (int i = 0, size = futures.size(); i < size; i++)
 31                futures.get(i).cancel(true);
 32    }
 33}
 34
 35// invokeAny
 36public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
 37    throws InterruptedException, ExecutionException {
 38    try {
 39        return doInvokeAny(tasks, false, 0);
 40    } catch (TimeoutException cannotHappen) {
 41        assert false;
 42        return null;
 43    }
 44}
 45private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
 46                          boolean timed, long nanos)
 47    throws InterruptedException, ExecutionException, TimeoutException {
 48    if (tasks == null)
 49        throw new NullPointerException();
 50    int ntasks = tasks.size();
 51    if (ntasks == 0)
 52        throw new IllegalArgumentException();
 53    ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
 54    ExecutorCompletionService<T> ecs =
 55        new ExecutorCompletionService<T>(this);
 56
 57    try {
 58        // Record exceptions so that if we fail to obtain any
 59        // result, we can throw the last exception we got.
 60        ExecutionException ee = null;
 61        final long deadline = timed ? System.nanoTime() + nanos : 0L;
 62        Iterator<? extends Callable<T>> it = tasks.iterator();
 63
 64        // Start one task for sure; the rest incrementally
 65        futures.add(ecs.submit(it.next()));
 66        --ntasks;
 67        int active = 1;
 68
 69        for (;;) {
 70            // 并没阻塞第一个任务,此时可能第一个任务还未执行完
 71            Future<T> f = ecs.poll();
 72            if (f == null) {
 73                if (ntasks > 0) {
 74                    --ntasks;
 75                    // 不等待上一个任务的结果,直接新执行一个任务
 76                    futures.add(ecs.submit(it.next()));
 77                    ++active;
 78                }
 79                else if (active == 0)
 80                    break;
 81                else if (timed) {
 82                    f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
 83                    if (f == null)
 84                        throw new TimeoutException();
 85                    nanos = deadline - System.nanoTime();
 86                }
 87                else
 88                    // 没有可执行的任务了,则等待一个结果
 89                    f = ecs.take();
 90            }
 91            // 有结果则返回
 92            if (f != null) {
 93                --active;
 94                try {
 95                    return f.get();
 96                } catch (ExecutionException eex) {
 97                    ee = eex;
 98                } catch (RuntimeException rex) {
 99                    ee = new ExecutionException(rex);
100                }
101            }
102        }
103
104        if (ee == null)
105            ee = new ExecutionException();
106        throw ee;
107
108    } finally {
109        for (int i = 0, size = futures.size(); i < size; i++)
110            // 取消还未执行或者执行中的任务
111            // 中断任务
112            futures.get(i).cancel(true);
113    }
114}

可以看到,与invokeAll不同的是,invokeAny方法是在循环的启动任务,直到获取到任一任务的返回值为止,而未执行或正在执行的任务则会被中断。

...