【介绍】Executors
直接上栗子
FixedThreadPool
package com.Multithreading;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FixedThreadPoolTest {
public static void main(String[] args) {
ExecutorService newFixedThreadPool = Executors.newFixedThreadPool(5);
for (int i = 0; i < 10; i++) {
newFixedThreadPool.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis());
try {
Thread.sleep(1000);//睡眠1000ms
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
}
执行结果:
pool-1-thread-1 : 1538407294756
pool-1-thread-2 : 1538407294757
pool-1-thread-3 : 1538407294757
pool-1-thread-4 : 1538407294758
pool-1-thread-5 : 1538407294760
pool-1-thread-2 : 1538407295758
pool-1-thread-4 : 1538407295759
pool-1-thread-3 : 1538407295760
pool-1-thread-1 : 1538407295760
pool-1-thread-5 : 1538407295764
Process finished with exit code 0
从结果中可以看到,线程池中有5个线程在跑。每个线程执行任务的时间间隔是1000ms多一些。
WorkStealingPool
package com.Multithreading;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
public class WorkStealingPoolTest {
public static void main(String[] args) {
ForkJoinPool newWorkStealingPool = (ForkJoinPool) Executors.newWorkStealingPool();
ProductListGenerator generator = new ProductListGenerator();
List<Product> products = generator.generate(100);
Task task = new Task(products, 0, products.size(), 0.20);
newWorkStealingPool.execute(task);
do {
System.out.printf("Main:Thread Count: %d\n", newWorkStealingPool.getActiveThreadCount());
System.out.printf("Main:Thread Steal: %d\n", newWorkStealingPool.getStealCount());
System.out.printf("Main:Parallelism: %d\n", newWorkStealingPool.getParallelism());
} while (!task.isDone());
newWorkStealingPool.shutdown();
if (task.isCompletedNormally())
System.out.println("Main:The process has completed normally.");
for (Product product : products)
if (product.getPrice() != 12)
System.out.printf("Product %s: %f\n", product.getName(), product.getPrice());
System.out.println("Main: End of the program.");
}
private static class Task extends RecursiveAction {
private static final long serialVersionUID = 1L;
private List<Product> products;
private int first;
private int last;
private double increment;
public Task(List<Product> products, int first, int last, double increment) {
this.products = products;
this.first = first;
this.last = last;
this.increment = increment;
}
@Override
protected void compute() {
if (last - first < 10) {
updatePrices();
} else {
int middle = (last + first) / 2;
System.out.printf("%s: Pending tasks:%s\n", Thread.currentThread().getName(), getQueuedTaskCount());
Task t1 = new Task(products, first, middle + 1, increment);
Task t2 = new Task(products, middle + 1, last, increment);
invokeAll(t1, t2);
}
}
private void updatePrices() {
for (int i = first; i < last; i++) {
Product product = products.get(i);
product.setPrice(product.getPrice() * (1 + increment));
}
}
}
private static class Product {
private String name;
private double price;
private String getName() {return name;}
private void setName(String name) {this.name = name;}
private double getPrice() {return price;}
private void setPrice(double price) {this.price = price;}
}
private static class ProductListGenerator {
private List<Product> generate(int size) {
List<Product> ret = new ArrayList<>();
for (int i = 0; i < size; i++) {
Product product = new Product();
product.setName("Product" + i);
product.setPrice(10);
ret.add(product);
}
return ret;
}
}
}
执行结果:
Main:Thread Count: 1
ForkJoinPool-1-worker-1: Pending tasks:0
ForkJoinPool-1-worker-1: Pending tasks:1
Main:Thread Steal: 0
Main:Parallelism: 4
Main:Thread Count: 4
ForkJoinPool-1-worker-2: Pending tasks:0
ForkJoinPool-1-worker-1: Pending tasks:1
ForkJoinPool-1-worker-0: Pending tasks:0
ForkJoinPool-1-worker-2: Pending tasks:1
Main:Thread Steal: 0
Main:Parallelism: 4
ForkJoinPool-1-worker-3: Pending tasks:0
Main:Thread Count: 4
ForkJoinPool-1-worker-2: Pending tasks:1
ForkJoinPool-1-worker-0: Pending tasks:1
ForkJoinPool-1-worker-1: Pending tasks:1
ForkJoinPool-1-worker-0: Pending tasks:0
ForkJoinPool-1-worker-2: Pending tasks:0
Main:Thread Steal: 0
...
Main:Thread Count: 3
ForkJoinPool-1-worker-3: Pending tasks:1
...
Main:Thread Count: 2
ForkJoinPool-1-worker-0: Pending tasks:0
ForkJoinPool-1-worker-1: Pending tasks:0
Main:Thread Steal: 2
Main:Parallelism: 4
Main:The process has completed normally.
Main: End of the program.
Process finished with exit code 0
WorkStealingPool使用的是ForkJoinPool,默认线程数是支持JVM的线程数量,从打印的结果中可以看到执行的线程数量是动态的,根据运行情况尽可能满足需求。
SingleThreadExecutor
package com.Multithreading;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class SingleThreadExecutorTest {
public static void main(String[] args) {
ExecutorService newSingleThreadExecutor = Executors.newSingleThreadExecutor();
for (int i : new int[]{1, 2, 3, 4, 5}) {
newSingleThreadExecutor.execute(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
newSingleThreadExecutor.shutdown();
}
}
执行结果:
pool-1-thread-1 : 1538470592027
pool-1-thread-1 : 1538470593032
pool-1-thread-1 : 1538470594034
pool-1-thread-1 : 1538470595035
pool-1-thread-1 : 1538470596040
Process finished with exit code 0
从结果中可以看到,线程池中只有1个线程在跑。每个线程执行任务的时间间隔是1000ms多一些。
newCachedThreadPool
package com.Multithreading;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class CachedThreadPoolTest {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor newCachedThreadPool = (ThreadPoolExecutor) Executors.newCachedThreadPool();
for (int i = 0; i < 5; i++)
newCachedThreadPool.execute(new RunnableTest());
TimeUnit.SECONDS.sleep(31);
System.out.println("pool size : " + newCachedThreadPool.getPoolSize());
newCachedThreadPool.execute(new RunnableTest());
TimeUnit.SECONDS.sleep(31);
System.out.println("pool size : " + newCachedThreadPool.getPoolSize());
newCachedThreadPool.shutdown();
}
private static class RunnableTest implements Runnable {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis());
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
执行结果:
pool-1-thread-1 : 1538473539047
pool-1-thread-2 : 1538473539047
pool-1-thread-3 : 1538473539048
pool-1-thread-4 : 1538473539048
pool-1-thread-5 : 1538473539049
pool size : 5
pool-1-thread-2 : 1538473570050
pool size : 1
Process finished with exit code 0
从结果中可以看到,缓存线程池中第一次发起5个线程,此时poolsize是5,31s后发起一个线程,再31s后获取poolsize是1,默认60s回收空闲线程。
ScheduledThreadPool
package com.Multithreading;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
public class ScheduledThreadPoolTest {
private static AtomicInteger ac = new AtomicInteger(0);
public static void main(String[] args) {
ScheduledExecutorService newScheduledThreadPool = Executors.newScheduledThreadPool(2);
newScheduledThreadPool.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis());
ac.getAndIncrement();
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
newScheduledThreadPool.scheduleWithFixedDelay(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + " : " + System.currentTimeMillis());
}
}, 1000, 1000, TimeUnit.MILLISECONDS);
while (ac.get() < 5) {}
newScheduledThreadPool.shutdown();
}
}
执行结果:
pool-1-thread-1 : 1538474649680
pool-1-thread-2 : 1538474649680
pool-1-thread-1 : 1538474650679
pool-1-thread-2 : 1538474650681
pool-1-thread-1 : 1538474651676
pool-1-thread-2 : 1538474651683
pool-1-thread-1 : 1538474652678
pool-1-thread-2 : 1538474652684
pool-1-thread-1 : 1538474653680
Process finished with exit code 0
Schedule与Timer比,可以同时处理多个定时任务。