一、任务和执行策略之间的隐性耦合
executor可以将任务的提交和任务的执行策略解耦
只有任务是同类型的且执行时间差别不大,才能发挥最大性能,否则,如将一些耗时长的任务和耗时短的任务放在一个线程池,除非线程池很大,否则会造成死锁等问题
1.线程饥饿死锁
类似于:将两个任务提交给一个单线程池,且两个任务之间相互依赖,一个任务等待另一个任务,则会发生死锁;表现为池不够
定义:某个任务必须等待池中其他任务的运行结果,有可能发生饥饿死锁
2.线程池大小

立即学习“Java免费学习笔记(深入)”;
注意:线程池的大小还受其他的限制,如其他资源池:数据库连接池
如果每个任务都是一个连接,那么线程池的大小就受制于数据库连接池的大小
3.配置ThreadPoolExecutor线程池
实例:
1.通过Executors的工厂方法返回默认的一些实现
2.通过实例化ThreadPoolExecutor(.....)自定义实现
线程池的队列
1.无界队列:任务到达,线程池饱满,则任务在队列中等待,如果任务无限达到,则队列会无限扩张
如:单例和固定大小的线程池用的就是此种
2.有界队列:如果新任务到达,队列满则使用饱和策略
3.同步移交:如果线程池很大,将任务放入队列后在移交就会产生延时,如果任务生产者很快也会导致任务排队
SynchronousQueue直接将任务移交给工作线程
机制:将一个任务放入,必须有一个线程等待接受,如果没有,则新增线程,如果线程饱和,则拒绝任务
如:CacheThreadPool就是使用的这种策略
饱和策略:
setRejectedExecutionHandler来修改饱和策略
1.终止Abort(默认):抛出异常由调用者处理
2.抛弃Discard
3.抛弃DiscardOldest:抛弃最旧的任务,注意:如果是优先级队列将抛弃优先级最高的任务
4.CallerRuns:回退任务,有调用者线程自行处理
4.线程工厂ThreadFactoy
每当创建线程时:其实是调用了线程工厂来完成
自定义线程工厂:implements ThreadFactory
可以定制该线程工厂的行为:如UncaughtExceptionHandler等
public class MyAppThread extends Thread { public static final String DEFAULT_NAME = "MyAppThread"; private static volatile boolean debugLifecycle = false; private static final AtomicInteger created = new AtomicInteger(); private static final AtomicInteger alive = new AtomicInteger(); private static final Logger log = Logger.getAnonymousLogger(); public MyAppThread(Runnable r) { this(r, DEFAULT_NAME);
} public MyAppThread(Runnable runnable, String name) { super(runnable, name + "-" + created.incrementAndGet()); //设置该线程工厂创建的线程的 未捕获异常的行为
setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { public void uncaughtException(Thread t,
Throwable e) {
log.log(Level.SEVERE, "UNCAUGHT in thread " + t.getName(), e);
}
});
} public void run() { // Copy debug flag to ensure consistent value throughout.
boolean debug = debugLifecycle; if (debug) log.log(Level.FINE, "Created " + getName()); try {
alive.incrementAndGet(); super.run();
} finally {
alive.decrementAndGet(); if (debug) log.log(Level.FINE, "Exiting " + getName());
}
} public static int getThreadsCreated() { return created.get();
} public static int getThreadsAlive() { return alive.get();
} public static boolean getDebug() { return debugLifecycle;
} public static void setDebug(boolean b) {
debugLifecycle = b;
}
}
5.扩展ThreadPoolExecutor
可以被自定义子类覆盖的方法:
1.afterExecute:结束后,如果抛出RuntimeException则方法不会执行
2.beforeExecute:开始前,如果抛出RuntimeException则任务不会执行
3.terminated:在线程池关闭时,可以用来释放资源等
二、递归算法的并行化
1.循环
在循环中,每次循环操作都是独立的
//串行化
void processSequentially(List elements) { for (Element e : elements)
process(e);
} //并行化
void processInParallel(Executor exec, List elements) { for (final Element e : elements)
exec.execute(new Runnable() { public void run() {
process(e);
}
});
}
2.迭代
如果每个迭代操作是彼此独立的,则可以串行执行
如:深度优先搜索算法;注意:递归还是串行的,但是,每个节点的计算是并行的
//串行 计算compute 和串行迭代
public void sequentialRecursive(List> nodes, Collection results) { for (Node n : nodes) {
results.add(n.compute());
sequentialRecursive(n.getChildren(), results);
}
} //并行 计算compute 和串行迭代
public void parallelRecursive(final Executor exec, List> nodes, final Collection results) { for (final Node n : nodes) {
exec.execute(() -> results.add(n.compute()));
parallelRecursive(exec, n.getChildren(), results);
}
} //调用并行方法的操作
public Collection getParallelResults(List> nodes) throws InterruptedException {
ExecutorService exec = Executors.newCachedThreadPool();
Queue resultQueue = new ConcurrentLinkedQueue();
parallelRecursive(exec, nodes, resultQueue);
exec.shutdown();
exec.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); return resultQueue;
}
实例:
public class ConcurrentPuzzleSolver{ private final Puzzle
puzzle; private final ExecutorService exec; private final ConcurrentMap
seen; protected final ValueLatch
> solution = new ValueLatch >(); public ConcurrentPuzzleSolver(Puzzle puzzle) { this.puzzle = puzzle; this.exec = initThreadPool(); this.seen = new ConcurrentHashMap
(); if (exec instanceof ThreadPoolExecutor) { ThreadPoolExecutor tpe = (ThreadPoolExecutor) exec; tpe.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy()); } } private ExecutorService initThreadPool() { return Executors.newCachedThreadPool(); } public List
solve() throws InterruptedException { try { P p = puzzle.initialPosition(); exec.execute(newTask(p, null, null)); // 等待ValueLatch中闭锁解开,则表示已经找到答案 PuzzleNode solnPuzzleNode = solution.getValue(); return (solnPuzzleNode == null) ? null : solnPuzzleNode.asMoveList(); } finally { exec.shutdown();//最终主线程关闭线程池 } } protected Runnable newTask(P p, M m, PuzzleNode
n) { return new SolverTask(p, m, n); } protected class SolverTask extends PuzzleNode
implements Runnable { SolverTask(P pos, M move, PuzzleNode
prev) { super(pos, move, prev); } public void run() { //如果有一个线程找到了答案,则return,通过ValueLatch中isSet CountDownlatch闭锁实现; //为类避免死锁,将已经扫描的节点放入set集合中,避免继续扫描产生死循环 if (solution.isSet() || seen.putIfAbsent(pos, true) != null){ return; // already solved or seen this position } if (puzzle.isGoal(pos)) { solution.setValue(this); } else { for (M m : puzzle.legalMoves(pos)) exec.execute(newTask(puzzle.move(pos, m), m, this)); } } } }











