Asynchronous IO သည် အမည်အတိုင်း Input Output လုပ်နေစဉ် ကြံ့ကြာတတ်သော အချိန်အား စောင့်စရာမလိုပဲ အခြားသော လုပ်ဆောင်ချက်များအား ဆက်လုပ်စေနိုင်သော API ဖြစ်ပါသည်။ Asynchronous IO အား java.nio.channels.AsynchronousChannel အင်တာဖေစ် မှတဆင့် အသုံးပြုနိုင်ပါသည်။ လက်ရှိ အချိန်တွင် Socket Connection နှင့် File IO များတွင် အသုံးပြုနိုင်ပါသည်။ ပံ့ပိုးထားသော Class များမှာ java.nio.channels.AsynchronousSocketChannel နှင့် java.nio.channels.AsynchronousFileChannel တို့ ဖြစ်ကြပါသည်။
အဆိုပါ ကလပ်စ်များတွင် read နှင့် write လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေရာတွင် ပြီးဆုံးအောင် စောင့်စရာမလိုပဲ ဆက်လက်၍ အခြားသော လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေနိုင်ပါသည်။ လုပ်ဆောင်ချက်များ ပြီးဆုံးသည်ကို သိရှိနိုင်ရန်မှာ နည်းလမ်းနှစ်ခု ရှိပါသည်။ ပဋ္ဌမနည်းမှာ Concurrency Utilities ၏ java.util.concurrent.Future အား အသုံးပြုသောနည်း ဖြစ်ပြီး၊ နောက်တစ်နည်းမှာ ပြီးဆုံးသော Event အား လက်ခံရယူသောနည်း ဖြစ်ပါသည်။
Future Interface အား အသုံးပြု၍ Asynchronous IO
Future Interface ဖြင့် Asynchronous IO ၏ လုပ်ဆောင်ချက်များ၏ ရလဒ်များအား ရရှိနိုင်ခြင်း၊ လုပ်ဆောင်ချက်များ၏ အနေအထားအား ကြည့်ရှုနိုင်ခြင်း၊ လုပ်ဆောင်ချက်များအား ရပ်တန့်စေနိုင်ခြင်း အစရှိသည်တို့ကို လုပ်ဆောင်စေနိုင်ပါသည်။ ပြီးခဲ့သော ဘလောဂ်ဖြင့် ရေးသားခဲ့သော Echo ပြန်လုပ်ပေးသည့် ဆာဗာ အပလီကေးရှင်းအား Future Interface နှင့် Asynchronous IO အား အသုံးပြု၍ ပြုပြင် ရေးသားကြည့်ပါဦးမည်။
SimpleFutureEchoServer .java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
public class SimpleFutureEchoServer {
private static final int PORT = 5000;
public SimpleFutureEchoServer() throws IOException {
AsynchronousServerSocketChannel serverChannel
= AsynchronousServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(PORT));
for (;;) {
// open asynchronous socket channel
Future<AsynchronousSocketChannel> future
= serverChannel.accept();
try {
// get Asynchronous socket channel
AsynchronousSocketChannel channel = future.get();
System.out.println("Connect to: "
+ ((InetSocketAddress)channel
.getRemoteAddress()).getHostName());
// IO Processing
startEcho(channel);
} catch (InterruptedException | ExecutionException ex) {}
}
}
private void startEcho(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
try {
for (;;) {
// Input
buffer.clear();
Future<Integer> future = channel.read(buffer);
// get Input Result
if (future.get() < 0) {
try {
channel.close();
} catch (IOException ex) {}
}
buffer.flip();
// output
channel.write(buffer).get();
}
} catch (InterruptedException
|ExecutionException ex) {}
}
public static void main(String[] args) {
try {
new SimpleFutureEchoServer();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
စာကြောင်း ၁၉နှင့် ၂၀အား ကြည့်ပါ။ AsynchronousServerSocketChannel#accept လုပ်ဆောင်ချက်အား အသုံးပြု၍ AsynchronousSocketChannel အား အသုံးပြုနိုင်သော Future အင်တာဖေစ်၏ အော့ဘဂျက်အား ရယူပါသည်။ ဤနေရာတွင် ထူးခြားသည်မှာ ServerSocketChannel#accept ၏ ရလဒ်သည် SocketChennel ဖြစ်သော်လည်း ဤနေရာတွင်မှု Future အင်တာဖေစ် ဖြစ်သည်ဆိုသည့် အချက်ဖြစ်၏။
တဖန် Asynchronous Process ၏ ရလဒ်အားရရှိရန်မှာ Future#get အား အသုံးပြုနိုင်ပြီး၊ get လုပ်ဆောင်ချက်တွင် Timeout ဖြစ်ရန် အချိန်ကို ပါရာမီတာအဖြစ်လည်း သတ်မှတ် အသုံးပြုနိုင်ပါသည်။ ပြီးပါက စာကြောင်း ၂၉တွင် startEcho လုပ်ဆောင်ချက်ဖြင့် အထက်တွင် ရရှိထားသော AsynchronousSocketChannel ၏ အော့ဘဂျက်အား အသုံးပြု၍ Input Output အား လုပ်ဆောင်စေပါသည်။
startEcho လုပ်ဆောင်ချက် အတွင်းတွင် Input Output လုပ်ဆောင်ချက်များအား လုပ်ဆောင်စေပါသည်။ စာကြောင်း ၄၁အား ကြည့်ပါ။ AsynchronousSocketChannel#read လုပ်ဆောင်ချက်၏ရလဒ်သည် Future အင်တာဖေစ်ဖြစ်ပြီး၊ Future#get ဖြင့် Asynchronous လုပ်ဆောင်ချက်၏ ရလဒ်အား ရရှိနိုင်ပါသည်။ တဖန် စာကြောင်း ၅၂ဖြင့် Output ကို လုပ်ဆောင်စေပြီး၊ get ဖြင့် ရလဒ်အား စောင့်ယူပါသည်။ ဤကဲ့သို့ channel.write(buffer).get() ဟု တိုက်ရိုက် ရေးသားရခြင်းမှာ၊ Asynchronous ဖြင့် အလုပ်လုပ်နေစဉ် buffer အတွင်းရှိ အချက်အလက်များအား ပြီးအောင် မရေးရသေးခင်၊ for ဝါကျ၏ ထိပ်သို့ ပြန်ရောက်သွားပြီး buffer အား clear မလုပ်မိစေရန် ဖြစ်ပါသည်။
ဤနည်းအားဖြင့် Asynchronous IO တွင် Future အင်တာဖေစ်အား အသုံးပြု၍ Input output များအား အသုံးပြုနိုင်ကြောင်း တွေ့ရှိရမည် ဖြစ်သည်။ သို့ရာတွင် သေသေချာချာကြည့်မည်ဆိုပါက Thread တစ်ခုတည်းကို သာအသုံးပြုနေသည်ကို တွေ့ရပါမည်။ ထို့ကြောင့် Access များလာပါက အလားတူ Performance ကျဆင်းသွားမည်မှာ မလွဲပေ။ ဒါကြောင့်ဆိုပြီး Thread Pool ကို အသုံးပြုပြန်ရင်လည်း Asynchronous IO ကို အသုံးပြုသည်မှာ အဓိပ္ပါယ်ရှိမည် မဟုတ်။
ကျွှန်တော်တို့ ဆက်လက်၍ Work Thread နှင့်Main Thread ကို ခွဲရေးပြီး၊ Thread နှစ်ခုကြားတွင် Queue ဖြင့် အချက်အလက်များအား လက်ကမ်းပေးသည့် ပုံစံကို ဆက်လေ့လာ ကြည့်ပါဦးမည်။
Work Thread အား အသုံးပြုခြင်း
FutureEchoServer.java
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class FutureEchoServer {
private static final int PORT = 5000;
private LinkedBlockingQueue<AsyncInfo> queue;
static class AsyncInfo {
private Future<Integer> future;
private AsynchronousSocketChannel channel;
private ByteBuffer buffer;
public AsyncInfo(Future<Integer> future,
AsynchronousSocketChannel channel,
ByteBuffer buffer) {
this.future = future;
this.channel = channel;
this.buffer = buffer;
}
}
public FutureEchoServer() throws IOException {
AsynchronousServerSocketChannel serverChannel
= AsynchronousServerSocketChannel.open();
serverChannel.bind(new InetSocketAddress(PORT));
for (;;) {
// open asynchronous socket channel
Future<AsynchronousSocketChannel> future
= serverChannel.accept();
try {
// get Asynchronous socket channel
AsynchronousSocketChannel channel = future.get();
System.out.println("Connect to: "
+ ((InetSocketAddress)channel
.getRemoteAddress()).getHostName());
// Watching Queue
startWatchQueue();
// IO Processing
startEcho(channel);
} catch (InterruptedException | ExecutionException ex) {}
}
}
private void startEcho(AsynchronousSocketChannel channel) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.clear();
// Asynchronous Input
Future<Integer> future = channel.read(buffer);
// Queue Processing
queue.offer(new AsyncInfo(future, channel, buffer));
}
private void startWatchQueue() {
// queue for receiving data
queue = new LinkedBlockingQueue<>();
// work thread
Runnable runnable = new Runnable() {
public void run() {
try {
for (;;) {
// get asynchronous info from queue
AsyncInfo info = (AsyncInfo) queue.take();
Future<Integer> future = info.future;
// get result
// Timeout = 100ms
int n = 0;
try {
n = future.get(100, TimeUnit.MILLISECONDS);
} catch(TimeoutException ex) {}
if (future.isDone()) {
// The process is done
if (n < 0) {
try {
info.channel.close();
} catch (IOException ex) {}
continue;
}
AsynchronousSocketChannel channel
= info.channel;
ByteBuffer buffer = info.buffer;
// Output
buffer.flip();
channel.write(buffer).get();
// Asynchronous Input
buffer.clear();
Future<Integer> nextFuture
= channel.read(buffer);
// Next input to queue
queue.offer(new AsyncInfo(nextFuture,
channel,
buffer));
} else {
// If process is still working
queue.offer(info);
}
}
} catch (InterruptedException
| ExecutionException ex) {
}
}
};
// Work Thread Start
ExecutorService service = Executors.newSingleThreadExecutor();
service.execute(runnable);
}
public static void main(String[] args) {
try {
new FutureEchoServer();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
ဦးစွာ စာကြောင်း ၁၈မှ ၃၀ အထိ Queue ဖြင့် အချက်အလက်များအား လက်ဆင့်ကမ်းရန် အတွက် ကလပ်စ် တစ်ခုကို ရေးသားပါသည်။ Work Thread နှင့် Main Thread အကြားတွင် အချက်အလက်များအား လက်ဆင့်ပေးရန် အတွက်ဖြစ်ပါသည်။ Main Thread မှ အချက်အလက်များအား Queue တွင် ထည့်ထားပါက၊ အခြားသော Work Thread မှ တဆင့် ထို Queue ထဲမှ အချက်အလက်များအား ထုတ်ယူ အသုံးပြုနိုင်မည် ဖြစ်ပါသည်။ Future Object, AsynchronousSocketChannel နှင့် ByteBuffer Object တို့ကို ပိုင်ဆိုင်ပါသည်။
Asynchronous Socket Channel ကို ခေါ်ယူသည့် နေရာအထိကတော့ အထက်ပါ နမှုနာနဲ့ အတူတူပဲ ဖြစ်ပါတယ်။ Socket Channel ကို ခေါ်ယူပြီးသည့်အခါတွင် Work Thread ကို အလုပ်လုပ်စေရန် အတွက် စာကြောင်း ၄၉တွင် startWatchQueue လုပ်ဆောင်ချက်ကို ခေါ်ယူပါသည်။ ပြီးပါက Input လုပ်ဆောင်ချက်ကို လုပ်ဆောင်စေမည့် startEcho လုပ်ဆောင်ချက်ကို ခေါ်ယူပါသည်။
startEcho လုပ်ဆောင်ချက်အတွင်းတွင် Input လုပ်ငန်းများကို လုပ်ဆောင်မည့် future Object အား ရယူကာ Queue တွင် Registration ပြုလုပ်ပါသည်။ တကယ်တန်း Input Output လုပ်ငန်းများအား လုပ်ဆောင်မည်မှာ အထက်ပါ startWatchQueue တွင် စောင့်ကြည့်နေစေသော Work Thread အတွင်းတွင် ဖြစ်ပါသည်။
startWatchQueue တွင် စာကြောင်း ၇၇ဖြင့် အခြားသော Thread တစ်ခု အနေဖြင့် အလုပ်လုပ်စေနိုင်သော Runnable အင်တာဖေစ်၏ အော့ဘဂျက်အား သတ်မှတ်ရေးသားကာ၊ စာကြောင်း ၁၂၅တွင် Single Thread အနေဖြင့် အလုပ်လုပ်စေပါသည်။ ဤနည်းအားဖြင့် Runnable အော့ဘဂျက်သည် Work Thread အဖြစ်အလုပ်လုပ်ကာ Queue အား စောင့်ကြည့်ပါသည်။
Runnable Object အတွင်းမှာမူ စာကြောင်း ၇၇ ဖြင့် Queue အတွင်းမှ အသုံးပြုလိုသည့် info အား ထုတ်ယူကာ၊ စာကြောင်း ၇၈ဖြင့် info အတွင်းမှ input ကို လုပ်ဆောင်နိုင်သော future အား ရယူပါသည်။ စာကြောင်း ၈၇ ဖြင့် future သည် အလုပ်ပြီးပြီလားဆိုသည်ကို စစ်ဆေးပြီး၊ အလုပ်ပြီးပြီဆိုပါက စာကြောင်း ၈၉ဖြင့် ဖတ်စရာကျန်မကျန် စစ်ဆေးပြီး၊ မကျန်တော့ပါက Asynchronous Socket Channel အား close လုပ်မည် ဖြစ်ပါသည်။ ကျန်သေးပါက Buffer အတွင်းရှိ အချက်အလက်များအား Output လုပ်ပြီး၊ future Object အား အသစ်ခေါ်ယူကာ စာကြောင်း ၁၁၀ ဖြင့် Queue အတွင်းသို့ ပြန်ထည့်ပေးလိုက်ပါသည်။ ဤနည်းအားဖြင့် Input အချက်အလက်များ မကုန်မချင်း အကြိမ်ကြိမ် အလုပ်လုပ်စေပါသည်။
နောက်ဘလောဂ်ဖြင့် CompletionHandler ကို အသုံးပြုကာ AsynchronousSocketChannel အား အသုံးပြုရေးသားပုံကို ဆက်လက်ဖော်ပြပါဦးမည်။
ကိုးကား
http://itpro.nikkeibp.co.jp/article/COLUMN/20110927/369451/?ST=develop&P=4
လေးစားစွာဖြင့်
မင်းလွင်
No comments:
Post a Comment