Yoshio Terada Java Evangelist http://yoshio3.com, Twitter : @yoshioterada 1
2
3
4
5
1996 6 JDK1.0 Thread Runnable 1997 1998 JDK1.1 J2SE1.2 2000 2002 J2SE 1.3 J2SE 1.4 2004 2006 Java SE 6 JSR-166x Java SE 5 JSR-166 Concurrency Utilities
2011 Java SE 7 JSR-166y Fork/Join 2013 Java SE 8 JSR-166e Concurrency Utilities 7
class MyWebServer{! public static void main(string argv[]){! ServerSocket socket = new ServerSocket(80);! while(true){! final Socket conn = socket.accept();! Runnable r = new Runnable(){! publiv void run(){! 8 addconnqueue(conn);// サーバ処理 }};! new Thread(r).start();! }! } }!
Thread-1 Thread-2 Thread-3 new Thread(r).start(); Thread-n 9
-Xss Thread-1 Thread-1 用スタック Thread-2 Thread-2 用スタック Thread-3 Thread-3 用スタック Thread-n Thread-n 用スタック スタック 10
スレッドを無制限に生成する欠点 Java VM や OS に負荷 スレッド生成ごとにメモリを消費 スレッド生成数には上限があり コンテキスト スイッチの切り替え 11
12
Java Concurrency Utilities とは 並列処理の実装をかんたんに!! 13 並列処理用の簡易 API を提供 スケーラビリティ パフォーマンス 保守性 可読性の向上 スレッド セーフ
JSR-166 で提供された機能 14 タスクの非同期実行 並行コレクション ロック シンクロナイザ アトミック処理
利用可能なクラスと拡張 Executors, Thread Pool, Futures コレクション: Queues, Blocking Queues, Concurrent HashMap ロック 同期: Semaphores, Barriers, Atomic変数 他の拡張 15
public interface Executor{! void execute(runnable command);! }! 16
public interface ExecutorService extends Executor{! void shutdown();! List<Runnable> shutdownnow();! boolean isshutdown();! boolean isterminated();! boolean awaittermination(long timeout,! TimeUnit unit);! <T> Future<T> submit(callable<t> task)! 他! }! 17
static int CPU_NUM =! Runtime.getRuntime().availableProcessors();! ExecutorService pool =! Executors.newFixedThreadPool(CPU_NUM);! public static void main(string argv[]){! ServerSocket socket = new ServerSocket(80);! while(true){! final Socket conn = socket.accept();! Runnable r = new Runnable(){! public void run(){;// 何らかの処理 } };! pool.execute(r);! }}! 18
ExecutorService pool =! Executors.newFixedThreadPool(CPU_NUM); pool.execute(r); LinkedBlockingQueue (FIFO) T1 T2 T3 T4 ThreadFactory newfixedthreadpool Tn 19
Concurrency Utilities の導入効果 効率的にスレッド実行が可能 スレッド ライフサイクル管理も可能 20
21
1998 JPE 1999 J2EE 1.2 JMS 2001 2003 J2EE 1.4 J2EE 1.3 MDB 2006 Java EE 5 2009 Java EE 6 Servlet EJB 22
23
24
25
@Stateless! public class MailAddressRegisterEJB {! @Resource(mappedName = "java:comp/jmsconfact")! ConnectionFactory conn;! @Resource(mappedName = "jms/mailregistqueue")! Queue queue;! public void registemailaddress(string address){! try(jmscontext context = conn.createcontext()){! context.createproducer().send(queue,! address);}}} 26
@MessageDriven(mappedName = "jms/mailregistqueue")! public class SendMessageMDB implements MessageListener! { public SendMessageMDB(){}! @Inject MailSender mailsender;! @Override! public void onmessage(message message) {! try {! TextMessage msg = (TextMessage) message;! mailsender.sendmessage(msg.gettext());! } catch (JMSException jmse) {! jmse.printstacktrace();! }! }} 27
28
@WebServlet(name = "MailSenderServlet", urlpatterns =! {"/MailSenderServlet"}, asyncsupported = true)! public class MailSenderServlet extends HttpServlet {! protected void processrequest(! HttpServletRequest request,! HttpServletResponse response)! throws ServletException, IOException {! AsyncContext ac = request.startasync();! ac.start(new MailSenderRunnable(ac));! }! 29
30
@Stateless! public class SyncEmailSenderEJB {! @Inject! MailSender mailsend;! public void syncsendmessage(string email){! mailsend.sendmessage(email);! }! @Asynchronous! public void asyncsendmessage(string email){! mailsend.sendmessage(email);! }! }! 31
32
Web/EJB コンテナ EJB JSP Servlet Java EE 関連機能 (JAX-RS,JavaMail, CDI など ) Runnable Callable 33
Web/EJB コンテナ EJB JSP Servlet Runnable Callable Java EE 関連機能 (JAX-RS,JavaMail, CDI など ) ManagedExecutor Service ManagedScheduledExecutorService Concurrency Utilities for EE ContextService ManagedThreadFactory 34
35
タスクを実装 A implements Runnable B implements Callable サーバ側で設定を実施 デフォルト設定も利用可 非同期タスクを実装 リソース インジェクションでサーバ管理スレッドを利用 36
public class MyRunnableTask implements Runnable {! @Override! public void run() {! try {! Thread.sleep(10000); } catch (InterruptedException ex) {! logger.log(level.severe, null, ex);! }! }! }! 37
public class MyCallableTask implements! Callable<String> {! @Override! public String call() throws Exception {! return Hello World ;! }! }! 38
39
@Stateless! public class MyManagedExecutorService {! @Resource(name =! "concurrent/defaultmanagedexecutorservice")! ManagedExecutorService managedexecsvc;! public void execexecutorservice() {! MyRunnableTask task = new MyRunnableTask();! managedexecsvc.submit(task);! MyCallableTask singletask =! new MyCallableTask("Foo Bar");! Future<String> singlefuture =! managedexecsvc.submit(singletask);}! 40
41
タスクを実装 A implements Runnable B implements Callable サーバ側で設定を実施 デフォルト設定も利用可 非同期タスクを実装 リソース インジェクションでサーバ管理スレッドを利用 42
43
@Stateless! public class MyManagedScheduledExecutorService{! @Resource(name = "concurrent/! DefaultManagedScheduledExecutorService")! ManagedScheduledExecutorService managedscheduledexecsvc;! public void execscheduledexecutorservice() {! MyRunnableTask task = new MyRunnableTask();! managedscheduledexecsvc.schedule(! task, 60L, TimeUnit.SECONDS);! }! 44
@Stateless! public class MyManagedScheduledExecutorService{! @Resource(name = "concurrent/! DefaultManagedScheduledExecutorService")! ManagedScheduledExecutorService managedscheduledexecsvc;! public void execscheduledexecutorservice() {! MyRunnableTask task = new MyRunnableTask();! managedscheduledexecsvc.schedule(! task, new MyTrigger(new Date(), 10, 1000) }! 45
import javax.enterprise.concurrent.trigger;! public class MyTrigger implements Trigger {! @Override! public Date getnextruntime(lastexecution le,! Date date){! }! @Override! public boolean skiprun(lastexecution le,! Date date) {! }! }! 46
47
submit() の実行 submit 成功 tasksubmitted Submitted taskstarting キャンセル 中止 タスクの実行準備 タスク実行 taskaborted Started キャンセル 中止 taskdone タスク実行完了 Done 48
public class MyManagedTaskListener implements! ManagedTaskListener {! public void tasksubmitted(future<?> future,! ManagedExecutorService mes, Object o) {! }! public void taskstarting(future<?> future,! ManagedExecutorService mes, Object o) {! }! public void taskaborted(future<?> future,! ManagedExecutorService mes, Object o, Throwable thrwbl){! }! public void taskdone(future<?> future,! ManagedExecutorService mes, Object o, Throwable thrwbl){! }! }! 49
@Resource(name = "concurrent/! MyManagedExecutorService")! ManagedExecutorService manageexecsvc;! public void invokemytasklistener() {! MyRunnableTask task = new MyRunnableTask();! MyManagedTaskListener listener =! new MyManagedTaskListener();! Runnable taskwithlistener =! ManagedExecutors.managedTask(task, listener);! manageexecsvc.execute(taskwithlistener);! }! 50
51
public class MyRunnable implements Runnable{! @Override! public void run() {! System.out.println( オリジナル メソッド呼び出し ");! }! } 52
public class MyInvocationHandler implements InvocationHandler {! private Object underlying;! public MyInvocationHandler(Object underlying) {! this.underlying = underlying;! @Override! public }! Object invoke(object proxy, Method method,! Object[] args) throws Throwable {! System.out.println( オリジナルの呼び出しの前処理 ");! Object ret = method.invoke(underlying, args);! System.out.println( オリジナルの呼び出しの後処理 ");! return ret;}} 53
MyRunnable public class task MyDynamicProxy = new MyRunnable();! {! InvocationHandler public static void handler main(string = new! argv[]){! ここで! MyInvocationHandler(task);! Runnable }}! proxy =! (Runnable)Proxy.newProxyInstance(! MyRunnable.class.getClassLoader(),! new Class[]{Runnable.class}, handler);! ExecutorService exec =! Executors.newSingleThreadExecutor();! exec.submit(proxy); 54
ExecutorService exec =! Executors.newSingleThreadExecutor();! exec.submit(proxy);! 55
56
@Stateless! public class ContextServiceManager {! @Resource(name = "concurrent/defaultcontextservice")! ContextService ctxsvc;! public void execsimplecontextservice() {! ExecutorService singlethreadexecutor =! Executors.newSingleThreadExecutor(threadFactory);! MyRunnableTask task = new MyRunnableTask();! Runnable proxiedtask =! ctxsvc.createcontextualproxy(task,runnable.class);! singlethreadexecutor.submit(proxiedtask);}}! 57
58
59
60
@Resource(name = "concurrent/! DefaultManagedThreadFactory")! ManagedThreadFactory threadfactory;! public void execthreadfactory() {! MyRunnableTask task = new MyRunnableTask();! Thread taskthread =! threadfactory.newthread(task);! taskthread.start();! }! 61
ExecutorService threadpoolexecutor =! Executors.newFixedThreadPool(4,threadFac);! threadpoolexecutor = new ThreadPoolExecutor(4, 4,! 0L, TimeUnit.MILLISECONDS,! new LinkedBlockingQueue<Runnable>(),! threadfac);! 62 java.util.concurrent.threadpoolexecutor
@Resource(name = "concurrent/! DefaultManagedThreadFactory")! ManagedThreadFactory threadfactory;! public void execthreadfactory() {! MyRunnableTask task = new MyRunnableTask();! ExecutorService exec =! new ThreadPoolExecutor(4, 4,! 0L, TimeUnit.MILLISECONDS,! new LinkedBlockingQueue<Runnable>(),! threadfactory);! exec.submit(task);}! 63
64
65
66
67
68