이번 장은 자바 멀티플렉싱 서버와도 관련이 있다. NIO를 다루기 때문이다. 블로그들을 찾아보다가 NIO에 대해서 아주 잘 설명해주는 사이트들을 찾았다.
jongmin92.github.io/2019/03/03/Java/java-nio/
이 블로그에서 기초적인 내용을 얻어낼 수 있었다. 링크된 사이트들이 몇 있던데,
tutorials.jenkov.com/java-nio/index.html
위에 링크된 두개의 사이트이다. 외국 사이트인데, NIO에 대한 내용뿐만이 아니라 자바의 전반적인 내용에 대해서 설명해주고 있다. netty도 설명해주는 파트가 있는걸로 알고 있다.
나중에 시간이 되면 netty쪽을 한번 살펴봐야 겠다.
이번장이 마지막이었다. 정말 괜찮은 강의였고, 꽤 깊은 내용을 다루었다. <이것이 자바다> 라는 책말고 자바의 정석을 보라는 말이 많았는데, 지극히 개인적으로 생각해봤는데, 자바의 정석보다 이 책이 더 좋은 것같다. (물론 두개 다 읽으면 더 좋고 ^^)
스레드 ->IO-> NIO -> 더 깊은 서버에 대한 이해
로 이어질 수 있는 것같다. 특히 서버와 스트림, 각종 채널에 대한 지식이 중요한 것같다. 물론 버퍼도 중요하고 !
아무튼 이쪽은 공부하면 할 수록 재밌는 것 같다.
동물책 중에 자바 네트워크 프로그래밍이 있는데, 이제 이걸 읽을 차례가 온 것같다. 그리고 디자인 패턴 -> 리팩토링 순으로 나아가야 할 것같다. 리팩토링에서 더 나은 디자인 패턴을 다루기 위해서 기본적인 디자인 패턴에 대한 지식이 필요하기 때문이다.
아무튼 이번 강의는 정말 재밌게 들었다... TCP/IP + 기본 네트워크 지식이 아직 부족해서 그쪽도 더 공부하면 될 것같다.
예전에 OKKY에서 유명하신 분이 했던 말이 기억나는데, (그분 말이 틀릴 수도 있고, 맞을 수도 있다. 누구나 객관적인 시선을 명확하게 갖긴 힘드니까)
" 특히 자바에서는 아니 객체지향 언어라면 알고리즘보다는 큰 그림을 볼 줄 아는게 중요합니다. 그 첫출발이 디자인 패턴이고, 전체적인 프로그램의 거대한 설계를 이해할 수 있는 객체지향적 이해가 '알고리즘' 보다 훨씬 중요합니다. "
라고 말했다. 어느정도 맞는 말이라고 생각한다. (나중가면 틀린 말이라고 생각될 수도 있겠지만, 일단 지금은 맞는 말 같다.) 근데 객체지향에 대한 이해는 끝이 없겠지만?ㅋㅋ 디자인 패턴이라도 잘해두자. 그렇게 어려워 보이지는 않더라.
--> 알고리즘이 중요하지 않다는 말은 절대 아니다. 다만 우선순위가 다른 것 같다.
serversocketchannel_socketchannel
package sec06.exam01_serversocketchannel_socketchannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
public class ServerExample {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
try {
serverSocketChannel = ServerSocketChannel.open(); //얘는 예외가 발생할 수 있다.
serverSocketChannel.configureBlocking(true); //명시적으로 블로킹이라고 선언
serverSocketChannel.bind(new InetSocketAddress(5001)); //포트 바인딩
while(true) { //연결 수락을 하기 위해서 무한루프를 작성한다.
System.out.println("[연결 기다림]");
SocketChannel socketChannel = serverSocketChannel.accept(); //클라이언트의 연결 요청을 기다린다. 여기서 블로킹이 된다. -> 클라이언트가 연결 요청을 해오면 accept()는 블로킹에서 벗어나서 통신용 소켓채널을 만든다.
//이제 연결이 되면 클라이언트의 정보를 한번 알아보자.
InetSocketAddress isa = (InetSocketAddress)socketChannel.getRemoteAddress(); //socketChannel.getRemoteAddress() 를 하면 실제로는 InetSocketAddress이 리턴된다. -> 타입변환필요
System.out.println("[연결 수락함]" + isa.getHostName()); //클라이언트의 ip를 출력한다.
}
} catch (IOException e) {
e.printStackTrace(); //어떤 예외가 발생하는지를 확인해보자
}
//서버 소켓 채널을 닫아준다.
if(serverSocketChannel.isOpen()) { //이 예제에서는 accept()블로킹을 해제해줄만한 코드가 없기 때문에 이 코드는 사실상 실행이 되지 않는다.그래도 그냥 한번 써준거다. 방법을 알야야 되니까
try {
serverSocketChannel.close();
} catch (IOException e) {} //보통 close()할때 예외가 발생해도 처리해줄 게 없어서 그냥 비워두는게 일반적이다.
}
}
}
package sec06.exam01_serversocketchannel_socketchannel;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
public class ClientExample {
public static void main(String[] args) {
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
System.out.println("[연결 요청]");
socketChannel.connect(new InetSocketAddress("localhost",5001)); //connect()메소드는 연결이 성공할때까지 블로킹이 된다. 그러므로 예외가 발생하지 않고 밑에 있는 문장이 출력되면 연결이 성공한 것이다.
System.out.println("[연결 성공]");
} catch (IOException e) {
e.printStackTrace();
}
//연결 성공하면 바로 끝난다.
if(socketChannel.isOpen()) {
try {
socketChannel.close();
} catch (IOException e) {}
}
}
}
data_read_write
package sec06.exam02_data_read_write;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ServerExample {
public static void main(String[] args) {
ServerSocketChannel serverSocketChannel = null;
try {
serverSocketChannel = ServerSocketChannel.open(); //얘는 예외가 발생할 수 있다.
serverSocketChannel.configureBlocking(true); //명시적으로 블로킹이라고 선언
serverSocketChannel.bind(new InetSocketAddress(5001)); //포트 바인딩
while(true) { //연결 수락을 하기 위해서 무한루프를 작성한다.
System.out.println("[연결 기다림]");
SocketChannel socketChannel = serverSocketChannel.accept(); //클라이언트의 연결 요청을 기다린다. 여기서 블로킹이 된다. -> 클라이언트가 연결 요청을 해오면 accept()는 블로킹에서 벗어나서 통신용 소켓채널을 만든다.
//이제 연결이 되면 클라이언트의 정보를 한번 알아보자.
InetSocketAddress isa = (InetSocketAddress)socketChannel.getRemoteAddress(); //socketChannel.getRemoteAddress() 를 하면 실제로는 InetSocketAddress이 리턴된다. -> 타입변환필요
System.out.println("[연결 수락함]" + isa.getHostName()); //클라이언트의 ip를 출력한다.
//소켓채널로부터 데이터를 읽어들이는 코드 -> 클라이언트가 보낸 문자열을 얻어낸다.
ByteBuffer byteBuffer = null;
Charset charset = Charset.forName("UTF-8");
byteBuffer = ByteBuffer.allocate(100);
int byteCount = socketChannel.read(byteBuffer);
byteBuffer.flip(); //이제 읽어야 되니까 flip으로 읽기모드로 바꾼다.
String message = charset.decode(byteBuffer).toString();
System.out.println("[데이터 받기 성공]: "+message);
//이번에는 서버가 클라이언트로 데이터를 보내보자.
byteBuffer = charset.encode("Hello Client");
socketChannel.write(byteBuffer);
System.out.println("[데이터 보내기 성공]");
}
} catch (IOException e) {
e.printStackTrace(); //어떤 예외가 발생하는지를 확인해보자
}
//서버 소켓 채널을 닫아준다.
if(serverSocketChannel.isOpen()) { //이 예제에서는 accept()블로킹을 해제해줄만한 코드가 없기 때문에 이 코드는 사실상 실행이 되지 않는다.그래도 그냥 한번 써준거다. 방법을 알야야 되니까
try {
serverSocketChannel.close();
} catch (IOException e) {} //보통 close()할때 예외가 발생해주면 처리해줄 게 없어서 그냥 비워두는게 일반적이다.
}
}
}
package sec06.exam02_data_read_write;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
public class ClientExample {
public static void main(String[] args) {
SocketChannel socketChannel = null;
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true);
System.out.println("[연결 요청]");
socketChannel.connect(new InetSocketAddress("localhost",5001)); //connect()메소드는 연결이 성공할때까지 블로킹이 된다. 그러므로 예외가 발생하지 않고 밑에 있는 문장이 출력되면 연결이 성공한 것이다.
System.out.println("[연결 성공]");
//여기 있는 다섯줄이 서버로 데이터를 보내는 코드이다.
ByteBuffer byteBuffer = null;
Charset charset = Charset.forName("UTF-8");
byteBuffer = charset.encode("Hello Server");
socketChannel.write(byteBuffer); //소켓채널을 통해 서버로 보내자.
System.out.println("[데이터 보내기 성공]");
//데이터를 받는 코드를 작성해보자.
byteBuffer = ByteBuffer.allocate(100);
int byteCount = socketChannel.read(byteBuffer);
byteBuffer.flip();
String message = charset.decode(byteBuffer).toString();
System.out.println("[데이터 받기 성공]: "+message);
} catch (IOException e) {
e.printStackTrace();
}
//연결 성공하면 바로 끝난다.
if(socketChannel.isOpen()) {
try {
socketChannel.close();
} catch (IOException e) {}
}
}
}
chatting(FXtest3)
package application;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ServerExample extends Application {
ExecutorService executorService;
ServerSocketChannel serverSocketChannel;
List<Client> connections = new Vector<Client>();
void startServer() { //서버 시작 코드
executorService = Executors.newFixedThreadPool( //스레드풀 생성하자.
Runtime.getRuntime().availableProcessors()
);
try {
serverSocketChannel = ServerSocketChannel.open(); //서버 소켓 채널객체를 생성하자.
serverSocketChannel.configureBlocking(true); //명시적으로 블로킹을 사용할게요. -> 예외 처리 코드 필요
serverSocketChannel.bind(new InetSocketAddress(5001)); //포트 바인딩 시작하기. -> 예외 처리 코드 필요
} catch(Exception e) {
if(serverSocketChannel.isOpen()) { stopServer(); } // 만약 예외 발생한다면, 채널 닫는다.
return; // 종료한다.
}
Runnable runnable = new Runnable() { //익명 구현 객체로 만든다. -> 연결 수락 작업을 runnable 로 정의
@Override
public void run() { //run메소드 재정의
Platform.runLater(()->{
displayText("[서버 시작]");
btnStartStop.setText("stop"); //stop으로 바꾼다. 버튼을
});
while(true) { //무한 루프 생성. 이 runnable 객체가 바로 연결 수락 작업을 정의하는 것이므로, 계속 클라이언트의 연결 요청을 기다려야 하므로 무한 루프로 작성한다.
try {
SocketChannel socketChannel = serverSocketChannel.accept(); //소켓 채널을 연결 수락 후에 생성하게 된다. 즉 이 작업을 계속한다... -> 예외 처리 코드 필요함.
String message = "[연결 수락: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message)); //출력창에 나타내기
Client client = new Client(socketChannel); //클라이언트와 데이터를 주고 받을 수 있는 클라이언트 객체를 생성한다. 소켓채널 객체를 매개값으로 준다.
connections.add(client); //컬렉션에 저장한다.
Platform.runLater(()->displayText("[연결 개수: " + connections.size() + "]")); //총 클라이언트 개수를 알려준다. 몇개의 클라이언트가 연결되있는지...
} catch (Exception e) {
if(serverSocketChannel.isOpen()) { stopServer(); } //예외가 발생하면 정상적으로 stopServer를 통해 서버를 종료한다.
break; //무한 루프를 빠져나간다.
}
}
}
};
executorService.submit(runnable); //스레드풀에 작업을 전달해서 스레드가 처리할 수 있도록 한다. -> 작업 처리 요청
}
void stopServer() {
try {
Iterator<Client> iterator = connections.iterator(); //반복자를 얻어낸다. connections에 있는 모든 클라이언트를 하나씩 가져와서 소켓 채널을 닫고, connections에서 클라이언트를 제거하기 위함이다.
while(iterator.hasNext()) { //가져올게 있다면?
Client client = iterator.next(); //클라이언트를 하나 가지고 오구요
client.socketChannel.close(); //그 클라이언트의 소켓 채널을 close() 합니다. -> 예외 처리 코드 필요
iterator.remove(); //connections 에서 해당 클라이언트 객체를 제거한다.
}
if(serverSocketChannel!=null && serverSocketChannel.isOpen()) {
serverSocketChannel.close(); //서버 소켓 채널을 닫는다.
}
if(executorService!=null && !executorService.isShutdown()) {
executorService.shutdown(); //스레드 풀을 닫는다.
}
Platform.runLater(()->{
displayText("[서버 멈춤]"); //정상적으로 종료가 되면 이 내용들을 출력한다.
btnStartStop.setText("start"); //버튼을 start로 다시 바꾼다.
});
} catch (Exception e) {}
}
class Client { //여기에는 데이터 통신 코드가 들어간다. 클라이언트로부터 데이터를 받는 receive 메소드가 있고, 데이터를 보내는 send 메소드가 있다.
SocketChannel socketChannel;
Client(SocketChannel socketChannel) {
this.socketChannel = socketChannel;
receive(); //항상 받을 준비를 한다. 언제 보낼지 모르니까 .
}
void receive() {
Runnable runnable = new Runnable() { //데이터 읽기 작업을 하나 정의한다. 스레드 풀에서 사용할 수 있도록! 익명 객체로 생성함.
@Override
public void run() {
while(true) { //지속적으로 클라이언트의 데이터를 받아야 하므로 무한루프로 만듬.
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(100); //읽은 데이터를 저장할 바이트버퍼
//클라이언트가 비정상 종료를 했을 경우 IOException 발생
int readByteCount = socketChannel.read(byteBuffer); //예외가 발생할 수 있으므로 예외처리해줌. (read메소드 땜시) read에서는 블로킹이 된다.
//클라이언트가 정상적으로 SocketChannel의 close()를 호출했을 경우
if(readByteCount == -1) {
throw new IOException(); //일부로 예외를 발생시켜서 catch로 넘긴다.
}
//정상적으로 받았다면 아래의 코드를...
String message = "[요청 처리: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message)); //출력창에 보이기.
byteBuffer.flip(); //성공적으로 데이터를 읽었다면 바이트 버퍼에 데이터가 저장이 되어있으므로 그 데이터를 읽기 위해 flip()한다. limit을 position으로 position을 0으로 보낸다.
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString(); //디코딩해서 문자열을 얻는다.
for(Client client : connections) { //얻은 문자열을 모든 클라이언트에게 보낸다.
client.send(data);
}
} catch(Exception e) {
try { //예외가 발생하는 경우는 클라이언트가 비정상종료일경우, 정상적으로 종료했울 경우의 2가지이다.
connections.remove(Client.this); //해당 클라이언트 객체를 제거한다. this를 하게 되면 runnable 객체를 이야기한다. 그러므로 Client.this를 해야 Client객체가 얻어진다.
String message = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]"; //출력할 메시지 작성 / 이곳의 예외도 e2로 잡아준다.
Platform.runLater(()->displayText(message)); //메시지 표시
socketChannel.close(); //자 이제 진짜 닫아주자.
} catch (IOException e2) {}
break;
}
}
}
};
executorService.submit(runnable); //이제 읽기 작업을 스레드 풀에서 처리할 수 있도록 매개값으로 제공한다. -> 작업 큐에 저장한다는 뜻.
}
void send(String data) { //클라이언트로 데이터를 보낸다.
Runnable runnable = new Runnable() { //보내는 작업 객체를 하나 만들어준다.
@Override
public void run() {
try {
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data); //받은 문자열을 바이트버퍼로 만들자.
socketChannel.write(byteBuffer); //이제 클라이언트로 보내자. -> 이곳의 예외를 처리해준다. e로 처리 ->클라이언트와 통신이 안될 경우 예외 발생
} catch(Exception e) {
try {
String message = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]"; //여기서도 예외 발생해서 처리해줘야 됨.
Platform.runLater(()->displayText(message));
connections.remove(Client.this); //클라이언트와 통신이 안되니 해당 클라이언트와 통신하는 클라이언트 객체를 제거한다.
socketChannel.close(); //클라이언트의 소켓 채널을 진짜 닫아준다.
} catch (IOException e2) {}
}
}
};
executorService.submit(runnable); //작업 객체를 스레드풀의 작업 큐에 저장해야 하므로 이렇게 작성해준다.
}
}
//////////////////////////////////////////////////////
TextArea txtDisplay;
Button btnStartStop;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
btnStartStop = new Button("start");
btnStartStop.setPrefHeight(30);
btnStartStop.setMaxWidth(Double.MAX_VALUE);
btnStartStop.setOnAction(e->{
if(btnStartStop.getText().equals("start")) {
startServer();
} else if(btnStartStop.getText().equals("stop")){
stopServer();
}
});
root.setBottom(btnStartStop);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Server");
primaryStage.setOnCloseRequest(event->stopServer());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
package application;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ClientExample extends Application {
SocketChannel socketChannel;
//연결시작 코드 / 새로운 스레드를 생성해서 코드를 실행시킨다.그 이유는 소켓채널이 서버에 연결요청을 할때 connect() 메소드를 실행하는데,
//이 connect() 메소드가 블로킹 메소드이기 때문이다. 또한 서버에서 언제 데이터를 줄지 모르기 때문에 항상 read() 메소드를 실행해야 한다.
//그래서 자바 fx어플리케이션 스레드가 이 startClient를 실행하면 안된다. 그래서 새로운 스레드를 생성해서 connect와 read 메소드를 실행할 수 있도록 해줘야 한다.
void startClient() {
Thread thread = new Thread() { //스레드를 익명 객체로 생성한다.
@Override
public void run() { //run메소드 재정의
try {
socketChannel = SocketChannel.open(); //소켓채널을 생성하자. -> 예외 처리 필요
socketChannel.configureBlocking(true); //명시적으로 블로킹을 설정한다. -> 예외 처리 필요
socketChannel.connect(new InetSocketAddress("localhost", 5001)); //연결 요청을 하자.
Platform.runLater(()->{ //출력창에 연결 완료 내용을 보인다.
try {
displayText("[연결 완료: " + socketChannel.getRemoteAddress() + "]");//어떤 서버와 연결됬는지출력. -> 여기서 예외 발생하므로 잡아준다.
btnConn.setText("stop"); //버튼을 start에서 stop으로 바꿔준다.
btnSend.setDisable(false); // 보내기 버튼을 활성화 해준다.
} catch (Exception e) {}
});
} catch(Exception e) { //connect에서 예외가 발생할 수 있다.
Platform.runLater(()->displayText("[서버 통신 안됨]"));
if(socketChannel.isOpen()) { stopClient(); } //정상적으로 클라이언트를 종료해준다.
return; //빠져나간다.
}
receive(); //정상적으로 연결이 되었다면 receive메소드를 호출해서 서버에서 보낸 데이터를 받을 준비를 한다.
}
};
thread.start(); // 이제 스레드를 시작해준다.
}
void stopClient() { //연결 끊기 코드
try {
Platform.runLater(()->{ //일단 연결을 끊겠다고 출력한다.
displayText("[연결 끊음]");
btnConn.setText("start");
btnSend.setDisable(true); //다시 send 버튼을 비활성화 한다.
});
if(socketChannel!=null && socketChannel.isOpen()) {
socketChannel.close(); //이제 소켓채널을 닫아주다. -> 예외처리해줘야됨.
}
} catch (IOException e) {}
}
void receive() { //데이터 받기 코드
while(true) { //무한루프 작성 //서버에서 언제 데이터를 보낼지 모르니까 항상 받을 준비를 해야 한다.
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
//서버가 비정상적으로 종료했을 경우 IOException 발생
int readByteCount = socketChannel.read(byteBuffer); //예외처리 코드 작성 필요 / 블로킹이 된다.
//서버가 정상적으로 Socket의 close()를 호출했을 경우
if(readByteCount == -1) {
throw new IOException();
}
byteBuffer.flip(); //읽기 위해서 flip한다. limit은 position으로 position은 0으로 간다.
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString(); //문자열을 얻는다.
Platform.runLater(()->displayText("[받기 완료] " + data)); //해당 문자열을 출력한다.
} catch (Exception e) { //위에서 뭔가 발생하면,
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient(); //정상적으로 클라이언트를 중지한다.
break; //더이상 통신이 안되는데,굳이 read를 실행할 필요가 없으므로 무한루프를 빠져나간다.
}
}
}
void send(String data) { //데이터 전송 코드 / 서버로 데이터를 보낸다. 역시 스레드를 새로 만들어야 한다. 스레드가 서버로 데이터를 보내야 한다. javafx 스레드가 send 메소드를 실행하면 데이터를 보내는 동안 ui가 먹통이 되기 때문이다..
Thread thread = new Thread() { //새로운 스레드
@Override
public void run() { //run 재정의
try {
Charset charset = Charset.forName("UTF-8"); //문자셋을 얻자.
ByteBuffer byteBuffer = charset.encode(data); // 바이트버퍼를 얻는다.
socketChannel.write(byteBuffer); // 바이트 버퍼의 데이터를 서버로 보낸다. 이곳의 예외를 잡아준다.
Platform.runLater(()->displayText("[보내기 완료]")); //write가 정상실행됬다면 잘 보내진 것이므로 이 코드가 실행된다.
} catch(Exception e) { //write에서 예외가 발생한다먄 -> 서버통신이 안된다면
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient(); //정상적으로 중지시킨다.
}
}
};
thread.start(); //스레드를 시작한다.
}
//////////////////////////////////////////////////////
TextArea txtDisplay;
TextField txtInput;
Button btnConn, btnSend;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
BorderPane bottom = new BorderPane();
txtInput = new TextField();
txtInput.setPrefSize(60, 30);
BorderPane.setMargin(txtInput, new Insets(0,1,1,1));
btnConn = new Button("start");
btnConn.setPrefSize(60, 30);
btnConn.setOnAction(e->{
if(btnConn.getText().equals("start")) {
startClient();
} else if(btnConn.getText().equals("stop")){
stopClient();
}
});
btnSend = new Button("send");
btnSend.setPrefSize(60, 30);
btnSend.setDisable(true);
btnSend.setOnAction(e->send(txtInput.getText()));
bottom.setCenter(txtInput);
bottom.setLeft(btnConn);
bottom.setRight(btnSend);
root.setBottom(bottom);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Client");
primaryStage.setOnCloseRequest(event->stopClient());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
/*text-area 배경색*/
.text-area {
-fx-background-color: gold;
}
/*scroll-pane 배경색*/
.text-area .scroll-pane {
-fx-background-color: transparent;
}
/*viewport 배경색*/
.text-area .scroll-pane .viewport{
-fx-background-color: transparent;
}
/*content 배경색*/
.text-area .scroll-pane .content{
-fx-background-color: transparent;
}
nonblocking_tcpchannel (FXtest4)
package application;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.Vector;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ServerExample extends Application {
//tcp 넌블로킹은 주로 서버를 구현할떄 쓰는 방식이기에 이번 예제에서는 이전 예제의 클라이언트 예제를 그대로 사용한다.
Selector selector; //선택자 -> 넌블로킹의 핵심객체
ServerSocketChannel serverSocketChannel; //
List<Client> connections = new Vector<Client>();
void startServer() { //서버 시작 코드
try {
selector = Selector.open(); //셀렉터를 얻는다. 즉 생성만 한다. -> 예외처리를 해준다.
serverSocketChannel = ServerSocketChannel.open(); //서버소켓 채널을 얻는다.
serverSocketChannel.configureBlocking(false); //넌블로킹 모드로 설정한다.
serverSocketChannel.bind(new InetSocketAddress(5001)); //서버소켓을 포트에 바인딩한다.
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //서버소켓채널을 셀렉터에 등록한다. (작업유형을 OP_ACCEPT로 지정한다.) -> 즉 해당 채널의 연결 수락작업이 처리 준비가 되었는지를 감시한다.
} catch (Exception e) {
if(serverSocketChannel.isOpen()) { stopServer(); } //정상종료한다.
return;
}
//셀렉터의 관심키셋에서 작업 처리 준비된 키를 가져와서 처리하는 싱글 스레드이다.
Thread thread = new Thread() {
@Override
public void run() {
while(true) { //무한루프를 돈다. / 작업 처리 준비된 키를 항상 가져와서 처리를 해야 되기 때문에 필요하다.
try {
int keyCount = selector.select(); //관심키셋에서 작업처리준비된 키가 몇개가 있는지를 조사한다. select()메서드는 관심키셋에서 작업 처리 준비가 된 키가 통보를 해올때까지 블로킹이 된다. 만약 있다면 그 키의 수를 리턴한다.
//wakeup()이란 메소드가 호출될 때도 리턴이 된다. 대신 keyCount = 0이다. 만약 그렇다면 다시 while로 가야하므로 continue 를 밑에 코드에서 선언한다.
if(keyCount == 0) { continue; } //wakeup으로 이 셀렉터가 블로킹이 해제되었다면 다시 while문 처음으로 돌아가서 다시 셀렉터를 실행시킨다.
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isAcceptable()) { //연결 수락작업이냐?
accept(selectionKey);
} else if (selectionKey.isReadable()) { //셀렉션키가 읽기 작업이냐?
Client client = (Client)selectionKey.attachment(); //읽기 작업을 할 때는 일단 클라이언트 객체가 필요하다. (첨부객체를 얻어낸다. attachment로 ) -> 얘는 리턴타입이 Object라 형변환해줘야 한다.
client.receive(selectionKey);
} else if (selectionKey.isWritable()) { //셀렉션키가 쓰기 작업이냐?
Client client = (Client)selectionKey.attachment();
client.send(selectionKey);
}
iterator.remove(); //작업 처리가 완료가 되면 셀렉션키를 선택된 키셋에서 제거해야 한다. -> 완료됬으니까 이제 필요가 없는거죠
}
} catch (Exception e) {
if(serverSocketChannel.isOpen()) { stopServer(); } //정상종료한다.
break; //더이상 while문이 실행할 필요가 없으므로 break; 한다.
}
}
}
};
thread.start(); //스레드를 시작한다.
Platform.runLater(()->{ //시작한다는 로그를 출력한다.
displayText("[서버 시작]");
btnStartStop.setText("stop"); //버튼의 글자를 stop으로 바꾼다.
});
}
void stopServer() { //서버 종료 코드 /연결된 모든 클라이언트의 연결을 닫고 서버소켓을 닫아주고, 셀렉터를 닫아주는 기능을 한다.
try {
Iterator<Client> iterator = connections.iterator(); //반복자를 얻어낸다.
while(iterator.hasNext()) { //가져올게 있는지 묻는다
Client client = iterator.next(); //클라이언트 하나를 가져온다.
client.socketChannel.close();// 필드로 선언된 소켓채널에서 close()한다. -> 모든 클라이언트의 연결을 끊어준다. // 얘는 예외처리가 필요하다/
iterator.remove(); //컬렉션으로부터 클라이언트를 제거한다.
}
if(serverSocketChannel!=null && serverSocketChannel.isOpen()) {
serverSocketChannel.close(); //이제 서버 소켓 채널을 닫아준다.
}
if(selector!=null && selector.isOpen()) {
selector.close(); //이제 셀렉터를 닫아준다.
}
Platform.runLater(()->{ //출력함.
displayText("[서버 멈춤]");
btnStartStop.setText("start"); //버튼의 글자를 start로 바꾼다.
});
} catch (Exception e) {}
}
void accept(SelectionKey selectionKey) { //연결 수락 코드 /매개값으로 셀렉션키가 들어옴. / 작업 스레드가 연결 수락작업을 할때 호출하는 메서드이다.
try {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel(); //서버소켓 채널을 얻어낸다.
SocketChannel socketChannel = serverSocketChannel.accept(); //서버소켓 채널의 accept()메서드를 호출해서 연결 수락작업을 해준다. 이 accept는 블로킹이 되지 않는다. 연결 수락 준비가 이미 된 상태에서 accept가 호출되기 때문에 바로 통신용 소켓채널을 리턴한다.
//위의 코드는 예외처리가 필요하다.
String message = "[연결 수락: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]"; //연결이 수락됬다는 메시지
Platform.runLater(()->displayText(message)); //메시지 출력
Client client = new Client(socketChannel); //클라이언트 객체를 만든다. 클라이언트와 데이터 통신을 하기 위해 사용된다.
connections.add(client); //해당 클라이언트를 connections 컬렉션에 추가한다.
Platform.runLater(()->displayText("[연결 개수: " + connections.size() + "]")); //connections에 있는 클라이언트 수를 출력한다. 즉 연결되어 있는 개수이다.
} catch(Exception e) { //예외 처리를 진행한다.
if(serverSocketChannel.isOpen()) { stopServer(); } //정상종료한다.
}
}
class Client { //데이터 통신 코드 /클라이언트와 데이터를 주고 받는 코드가 작성이 된다.
SocketChannel socketChannel;
String sendData;
Client(SocketChannel socketChannel) throws IOException {
this.socketChannel = socketChannel;
socketChannel.configureBlocking(false); //소켓채널을 넌블로킹 모드로 동작 / 예외처리를 해줘야 한다.
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ); //이 소켓 채널을 셀렉터에 등록한다.
selectionKey.attach(this); //이 셀렉션키에 Client 객체를 첨부객체로 저장을 한다. 그 이유는 작업 스레드가 읽기 작업을 할 때 Client객체가 필요하기 때문이다.
}
void receive(SelectionKey selectionKey) { //클라이언트에서 보낸 데이터를 받는다. / 이 셀렉션키는 작업 스레드가 이들 메소드를 호출할 때 매개값으로 대입을 해준다.
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(100); //받은 데이터를 저장할 버퍼 객체 생성
//상대방이 비정상 종료를 했을 경우 자동 IOException 발생
int byteCount = socketChannel.read(byteBuffer); //여기는 예외처리가 필요함. 이 read메소드는 블로킹이 되지 않는다. 그래서 읽기 준비가 되면 바로 receive메소드가 호출되기 때문에 이 read는 바로 데이터를 읽어서 바이트버퍼에 저장한다.
//상대방이 SocketChannel의 close() 메소드를 호출할 경우
if(byteCount == -1) {
throw new IOException(); //catch 절로 넘긴다.
}
String message = "[요청 처리: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]"; //정상적으로 읽었다면 메시지를 작성한다.
Platform.runLater(()->displayText(message)); //출력창에 출력함.
byteBuffer.flip(); //limit을 position으로 position을 0으로 이동시킨다.
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString();
for(Client client : connections) { //받은 데이터를 모든 클라이언트에게 전송해야 하므로 for문으로 가져간다. //하나씩 가져온다,
client.sendData = data; //Client의 sendData라는 필드에 보내고자 하는 데이터를 저장한다.
SelectionKey key = client.socketChannel.keyFor(selector); // -> 이 소켓 채널이 셀렉터에 등록할때 사용한 셀렉션 키를 얻을 수 있다.
//클라이언트가 문자열을 보낼 수 있도록 하기 위해서 Client의 소켓 채널의 작업 유형을 OP_WRITE로 변경해줘야 한다. 현재는 OP_READ이다. 데이터를 보내려면 OP_WRITE가 되어야 한다.
//작업 유형을 변경하려면 소켓 채널이 셀렉터에 등록할 때 사용한 키를 얻어야 한다. 셀렉션 키는 위의 코드와 같이 얻어낸다.
key.interestOps(SelectionKey.OP_WRITE); //위의 키를 이용해서 작업을 변경한다. //데이터를 보낼때는 OP_WRITE이다.
}
selector.wakeup();
//작업 유형이 변경이 되면 반드시 셀렉터의 wakeup을 호출해야 한다.
//그 이유는 셀렉터는 셀렉트 메소드로 현재 블로킹되어 있는 상태인데, 그 셀렉트 메소드를 빠져나와서 다시 변경된 새로운 작업 유형으로 감시를 하기 위해서
//다시 셀렉트를 호출해야 한다. 그래서 wakeup메소드를 호출해서 기존의 셀렉트의 블로킹을 빠져나오도록 해줘야 한다.
} catch(Exception e) { //위의 read 메소드에서 예외가 발생할 수 있다. -> 즉 클라이언트가 통신이 안된다는 뜻이다.
try {
connections.remove(this); //더 이상 컬렉션에서 관리할 필요가 없다. 그러니 제거한다.
String message = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]"; // 출력할 메시지 작성
Platform.runLater(()->displayText(message)); //표시
socketChannel.close(); //통신이 안되니 사용할 수 없으니까 소켓 채널을 닫아준다.
} catch (IOException e2) {}
}
}
void send(SelectionKey selectionKey) { //클라이언트로 데이터를 보내는 메서드이다.
try {
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(sendData); //바이트 버퍼 객체 생성 / 보낼 문자열을 갖고 있는 필드인 sendData를 매개값으로 준다.
socketChannel.write(byteBuffer); //소켓 채널로 데이터를 보내준다. /예외 처리 필요함.
selectionKey.interestOps(SelectionKey.OP_READ); //데이터를 보내고 나서는 다시 원래 작업 유형인 OP_READ로 가야한다. 이번에는 매개값으로 셀렉션키가 있으니 그냥 이걸 사용해서 바로 바꾸면 된다.
selector.wakeup(); // 작업 유형이 변경됬으니 이걸 호출했다. 필수적이다.
} catch(Exception e) { //write에서 예외가 발생할 수 있다. -> 통신이 안된다.
try {
String message = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
connections.remove(this);
socketChannel.close();
} catch (IOException e2) {}
}
}
}
///////////////////////////////////////////
TextArea txtDisplay;
Button btnStartStop;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
btnStartStop = new Button("start");
btnStartStop.setPrefHeight(30);
btnStartStop.setMaxWidth(Double.MAX_VALUE);
btnStartStop.setOnAction(e->{
if(btnStartStop.getText().equals("start")) {
startServer();
} else if(btnStartStop.getText().equals("stop")){
stopServer();
}
});
root.setBottom(btnStartStop);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Server");
primaryStage.setOnCloseRequest(event->stopServer());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
package application;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ClientExample extends Application {
//지금 이예제가 아니라 이전예제의 클라이언트 예제를 사용해도 문제없다. 이 예제는 셀렉터를 사용해서 짠 코드이다. 이걸 활용해도 무방하다. 설명은 없다. 강의에서 설명을 안해줘서
// 근데 읽어봐도 무난히 읽힌다. send 메소드가 2개가 정의되어있는 것에 주의하라. 각각 기능도 다르다.
Selector selector;
SocketChannel socketChannel;
void startClient() {
try {
selector = Selector.open();
} catch(Exception e) {
if(socketChannel.isOpen()) { stopClient(); }
return;
}
try {
socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
socketChannel.register(selector, SelectionKey.OP_CONNECT);
socketChannel.connect(new InetSocketAddress("localhost", 5001));
} catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
if(socketChannel.isOpen()) { stopClient(); }
return;
}
Runnable runnable = new Runnable() {
@Override
public void run() {
while(true) {
try {
int keyCount = selector.select();
if(keyCount == 0) { continue; }
Set<SelectionKey> selectedKeys = selector.selectedKeys();
Iterator<SelectionKey> iterator = selectedKeys.iterator();
while (iterator.hasNext()) {
SelectionKey selectionKey = iterator.next();
if (selectionKey.isConnectable()) {
connect(selectionKey);
} else if(selectionKey.isReadable()) {
receive(selectionKey);
} else if(selectionKey.isWritable()) {
send(selectionKey);
}
iterator.remove();
}
} catch (Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
if(socketChannel.isOpen()) { stopClient(); }
break;
}
}
}
};
new Thread(runnable).start();
}
void stopClient() {
try {
Platform.runLater(()->{
displayText("[연결 끊음]");
btnConn.setText("start");
btnSend.setDisable(true);
});
if(socketChannel!=null && socketChannel.isOpen()) {
socketChannel.close();
}
} catch (IOException e) {}
}
void connect(SelectionKey selectionKey) {
try {
socketChannel.finishConnect(); // public abstract boolean finishConnect() : 소켓 채널의 접속 처리를 완료한다.
Platform.runLater(()->{
try {
displayText("[연결 완료: " + socketChannel.getRemoteAddress() + "]");
btnConn.setText("stop");
btnSend.setDisable(false);
} catch (Exception e) {}
});
selectionKey.interestOps(SelectionKey.OP_READ);
} catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
if(socketChannel.isOpen()) { stopClient(); }
}
}
void receive(SelectionKey selectionKey) {
try {
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
//서버가 비정상 종료를 했을 경우 자동 IOException 발생
int byteCount = socketChannel.read(byteBuffer);
//서버가 정상적으로 SocketChannel의 close() 메소드를 호출할 경우
if(byteCount == -1) {
throw new IOException();
}
byteBuffer.flip();
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString();
Platform.runLater(()->displayText("[받기 완료] " + data));
} catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient();
}
}
void send(SelectionKey selectionKey) {
try {
ByteBuffer byteBuffer = (ByteBuffer) selectionKey.attachment();
socketChannel.write(byteBuffer);
Platform.runLater(()->displayText("[보내기 완료]"));
selectionKey.interestOps(SelectionKey.OP_READ);
} catch(Exception e) {
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient();
}
}
void send(String data) {
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data);
SelectionKey key = socketChannel.keyFor(selector);
key.attach(byteBuffer);
key.interestOps(SelectionKey.OP_WRITE);
selector.wakeup();
}
///////////////////////////////////////////
TextArea txtDisplay;
TextField txtInput;
Button btnConn, btnSend;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
BorderPane bottom = new BorderPane();
txtInput = new TextField();
txtInput.setPrefSize(60, 30);
BorderPane.setMargin(txtInput, new Insets(0,1,1,1));
btnConn = new Button("start");
btnConn.setPrefSize(60, 30);
btnConn.setOnAction(e->{
if(btnConn.getText().equals("start")) {
startClient();
} else if(btnConn.getText().equals("stop")){
stopClient();
}
});
btnSend = new Button("send");
btnSend.setPrefSize(60, 30);
btnSend.setDisable(true);
btnSend.setOnAction(e->send(txtInput.getText()));
bottom.setCenter(txtInput);
bottom.setLeft(btnConn);
bottom.setRight(btnSend);
root.setBottom(bottom);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Client");
primaryStage.setOnCloseRequest(event->stopClient());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
/*text-area 배경색*/
.text-area {
-fx-background-color: gold;
}
/*scroll-pane 배경색*/
.text-area .scroll-pane {
-fx-background-color: transparent;
}
/*viewport 배경색*/
.text-area .scroll-pane .viewport{
-fx-background-color: transparent;
}
/*content 배경색*/
.text-area .scroll-pane .content{
-fx-background-color: transparent;
}
asynchronous_tcpchannel (FXtest5)
package application;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Vector;
import java.util.concurrent.Executors;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ServerExample extends Application {
AsynchronousChannelGroup channelGroup; //비동기 채널 그룹
AsynchronousServerSocketChannel serverSocketChannel; //비동기 서버소켓채널
List<Client> connections = new Vector<Client>(); //리스트 컬렉션
void startServer() { //서버 시작 코드
try {
channelGroup = AsynchronousChannelGroup.withFixedThreadPool( //먼저 비동기 채널 그룹을 생성해준다.
Runtime.getRuntime().availableProcessors(), //스레드풀 개수 지정하기. CPU가 지원하는 스레드만큼
Executors.defaultThreadFactory() // 이건 잘 모르겟네
);
serverSocketChannel = AsynchronousServerSocketChannel.open(channelGroup);//비동기 서버소켓채널을 생성해줍시다. 매개값으로 위에서 생성한 채널 그룹을준다.
serverSocketChannel.bind(new InetSocketAddress(5001)); //포트에 바인딩한다.
} catch(Exception e) {
if(serverSocketChannel.isOpen()) { stopServer(); } //예외 발생하면 정상종료해준다.
return; //더이상 startServer를 실행할 필요가 없으므로 return을 통해 종료시킨다.
}
Platform.runLater(()->{ //출력창에 출력 시킨다.
displayText("[서버 시작]");
btnStartStop.setText("stop"); //버튼의 글자를 바꿉니다.
});
//밑의 accept 메소드는 즉시 리턴이 된다. 그리고 실제로 연결 수락작업을 하는 것은 스레드 풀의 스레드이다. 스레드가 성공적으로 연결을 수락하면 Completed가 실행된다. 아니면 failed 실행함.
serverSocketChannel.accept(null, new CompletionHandler<AsynchronousSocketChannel, Void>() { //서버소켓채널을 가지고 연결수락작업을 한다. 첨부객체는 null이다. 두번째는 콜백메서드를 갖고 있는 객체이다. -> 익명 구현객체로 만든다.
@Override
public void completed(AsynchronousSocketChannel socketChannel, Void attachment) { // 메소드 재정의
try {
String message = "[연결 수락: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]"; //클라이언트의 ip주소와 담당 스레드를 출력
Platform.runLater(()->displayText(message));
} catch (IOException e) {}
Client client = new Client(socketChannel); //클라이언트와 통신하는 클라이언트 객체를 만든다. 매개값으로 AsynchronousSocketChannel 객체를 제공한다.
connections.add(client); //컬렉션에 클라이언트를 추가한다.
Platform.runLater(()->displayText("[연결 개수: " + connections.size() + "]")); //개수를 출력한다.
serverSocketChannel.accept(null, this); //null은 첨부 객체이다. this는 바로 CompletionHandler 객체를 말한다. 즉 그 객체를 반복해서 쓰겠다는 말이다. (반복적으로 연결 수락하기 위해서)
}
@Override
public void failed(Throwable exc, Void attachment) { //예외가 발생하면 안전하게 종료한다.
if(serverSocketChannel.isOpen()) { stopServer(); }
}
});
}
void stopServer() {
// clear()를 바로 한다? 모든 객체를 clear한다? 조금 이상하다고 느낄 수 있다. connections에는 클라이언트 객체가 저장되어 있는데, 그 클라이언트 객체의 소켓 채널을 close()하지 않고 그냥 객체를 다 지워버렸다. 문제가 될 것 같은데,....
// 밑에 코드를 보면 알게 된다. 바로 shutdownNow()에 해답이 있다. (현재 채널그룹에 속한 모든 채널들을 강제적으로 닫아버린다. ) 그렇기 때문에 connections에 있는 클라이언트를 모두 제거하더라도 클라이언트 안에 있는 채널들이
// 자동적으로 닫히게 된다. 그렇게 보면 된다...
try {
connections.clear(); //connections에 있는 모든 클라이언트를 모두 제거
if(channelGroup!=null && !channelGroup.isShutdown()) {
channelGroup.shutdownNow(); //현재 채널그룹에 속한 모든 채널들을 강제적으로 닫아버린다.
}
Platform.runLater(()->{ //출력창에 출력해줍시당
displayText("[서버 멈춤]");
btnStartStop.setText("start"); //버튼 글자 바꿔줍시당
});
} catch (Exception e) {}
}
class Client { //클라이언트와 데이터를 주고 받는 코드
AsynchronousSocketChannel socketChannel;
Client(AsynchronousSocketChannel socketChannel) {
this.socketChannel = socketChannel;
receive(); //클라이언트가 생성과 동시에 받을 준비를 하겠다는 뜻이다.
}
void receive() { //클라이언트에서 보낸 데이터를 받는다.
ByteBuffer byteBuffer = ByteBuffer.allocate(100); //받은 데이터를 저장할 바이트 버퍼 생성
//첫번째는 읽은 바이트가 저장될 바이트버퍼이다. 두번쨰는 byteBuffer를 콜백객체에 제공할 첨부객체로 준다. 세번쨰를 보면 read가 읽은 바이트 수를 리턴하기에 Integer이고 첨부객체타입이 byteBuffer이므로 이게 두번째 타입 파라미터로 간다.
socketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer attachment) { //읽기가 성공했을 때 자동으로 콜백
try {
String message = "[요청 처리: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
attachment.flip(); //받은 첨부객체를 사용한다. byteBuffer를 받았으니까 .
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(attachment).toString(); //문자열을 얻어낸다.
for(Client client : connections) { //이제 이 데이터를 모든 클라이언트에게 전달한다.
client.send(data);
}
ByteBuffer byteBuffer = ByteBuffer.allocate(100); //계속적으로 read메소드를 실행해야 하기 때문에 밑의 코드의 매개값으로 들어갈 바이트버퍼를 새로 생성한다.
socketChannel.read(byteBuffer, byteBuffer, this); //CompletionHandler의 객체는 재활용을 해서 this라고 준다.
} catch(Exception e) {}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {//읽기 실패했을 때 자동으로 콜백
try {
String message = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
connections.remove(Client.this); //해당 클라이언트를 제거합니다.
socketChannel.close(); // 소켓채널도 close해준다.
} catch (IOException e) {}
}
});
}
void send(String data) { //클라이언트로 데이터를 보낸다.
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data);
socketChannel.write(byteBuffer, null, new CompletionHandler<Integer, Void>() { //두번째는 첨부객체인데 null이다. 세번째는 콜백 메소드를 갖고 있는 CompletionHandler 이다. 첨부 타입은 Void이다. 결과값은 Integer이고,
@Override
public void completed(Integer result, Void attachment) { //쓰기 작업이 완료됬을때
//별도로 할 작업이 없다.
}
@Override
public void failed(Throwable exc, Void attachment) { //클라이언트와 통신이 안된다고 코딩한다.
try {
String message = "[클라이언트 통신 안됨: " + socketChannel.getRemoteAddress() + ": " + Thread.currentThread().getName() + "]";
Platform.runLater(()->displayText(message));
connections.remove(Client.this); //컬렉션에서 제거한다.
socketChannel.close(); //소켓 채널 close() 한다.
} catch (IOException e) {}
}
});
}
}
///////////////////////////////////////////
TextArea txtDisplay;
Button btnStartStop;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
btnStartStop = new Button("start");
btnStartStop.setPrefHeight(30);
btnStartStop.setMaxWidth(Double.MAX_VALUE);
btnStartStop.setOnAction(e->{
if(btnStartStop.getText().equals("start")) {
startServer();
} else if(btnStartStop.getText().equals("stop")){
stopServer();
}
});
root.setBottom(btnStartStop);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Server");
primaryStage.setOnCloseRequest(event->stopServer());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
package application;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousChannelGroup;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.nio.charset.Charset;
import java.util.concurrent.Executors;
import javafx.application.Application;
import javafx.application.Platform;
import javafx.geometry.Insets;
import javafx.scene.Scene;
import javafx.scene.control.Button;
import javafx.scene.control.TextArea;
import javafx.scene.control.TextField;
import javafx.scene.layout.BorderPane;
import javafx.stage.Stage;
public class ClientExample extends Application {
AsynchronousChannelGroup channelGroup; //비동기 채널 그룹
AsynchronousSocketChannel socketChannel; //비동기 소켓 채널
void startClient() {
try {
channelGroup = AsynchronousChannelGroup.withFixedThreadPool( //채널그룹을 하나 생성한다.
Runtime.getRuntime().availableProcessors(), //스레드풀의 최대 스레드 개수 ...CPU가 지원하는 코어의 수를 넣어준다.
Executors.defaultThreadFactory() //스레드 팩토리를 넣는다.
);
socketChannel = AsynchronousSocketChannel.open(channelGroup); //비동기 소켓 채널 생성한다.
socketChannel.connect(new InetSocketAddress("localhost", 5001), null, new CompletionHandler<Void, Void>() { //서버에 연결을 요청한다. 첨부객체는 없다. (NULL)/ 세번째는 CompletionHandler 객체이다. 결과타입은 Void이다. 첨부객체가 없어서 또 Void이다.
@Override
public void completed(Void result, Void attachment) { //연결이 성공했을 때 콜백
Platform.runLater(()->{
try {
displayText("[연결 완료: " + socketChannel.getRemoteAddress() + "]");
btnConn.setText("stop");
btnSend.setDisable(false); //send 버튼 활성화
} catch (Exception e) {}
});
receive(); //서버의 데이터를 항상 받기 위해서 / 즉 연결이 성공이 되면 서버의 데이터를 받겠다는 뜻이다.
}
@Override
public void failed(Throwable e, Void attachment) { //연결이 실패되었을 때 콜백
Platform.runLater(()->displayText("[서버 통신 안됨]"));
if(socketChannel.isOpen()) { stopClient(); } //정상종료시켜준다.
}
});
} catch (IOException e) {}
}
void stopClient() {
try {
Platform.runLater(()->{
displayText("[연결 끊음]");
btnConn.setText("start");
btnSend.setDisable(true); //보내기 버튼을 비활성화 시킨다.
});
if(channelGroup!=null && !channelGroup.isShutdown()) {
channelGroup.shutdownNow(); //채널그룹을 닫아준다. 예외처리도 따로 해준다.
}
} catch (IOException e) {}
}
void receive() {
ByteBuffer byteBuffer = ByteBuffer.allocate(100); //받은 데이터를 저장할 바이트 버퍼 객체 생성
socketChannel.read(byteBuffer, byteBuffer, new CompletionHandler<Integer, ByteBuffer>() { //이제 읽자. 첫번째는 읽은 데이터를 저장할 바이트버퍼, 두번쨰는 첨부객체로 바이트 버퍼를 준다. 세번째는 ...첨부타입은 ByteBuffer, 읽은 바이트 수는 Integer
@Override
public void completed(Integer result, ByteBuffer attachment) { //읽기가 성공했을 때 콜백되는 메소드
try {
attachment.flip(); //바이트 버퍼에 저장된 데이터를 읽어야 하므로 flip()
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(attachment).toString(); //문자열로 변환해주자.
Platform.runLater(()->displayText("[받기 완료] " + data));
ByteBuffer byteBuffer = ByteBuffer.allocate(100); //밑의 코드를 위해 새로운 버퍼를 생성해준다.
socketChannel.read(byteBuffer, byteBuffer, this); //반복적으로 계속 read를 실행하기 위해서 이 코드를 쓴다. this는 이 CompletionHandler객체를 의미한다.
} catch(Exception e) {}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) { //읽기가 실패했을 때 콜백되는 메소드
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient();
}
});
}
void send(String data) {
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data); //바이트버퍼를 생성한다. encode해서 만든다.
socketChannel.write(byteBuffer, null, new CompletionHandler<Integer, Void>() { //이제 보낸다. 첨부객체는 없으니 null이다. 세번째는 늘상 보던거... write결과 타입은 Integer/ 첨부객체는 없으므로 Void
@Override
public void completed(Integer result, Void attachment) { //write가 성공하면
Platform.runLater(()->displayText("[보내기 완료]"));
}
@Override
public void failed(Throwable exc, Void attachment) { //write가 실패하면 !
Platform.runLater(()->displayText("[서버 통신 안됨]"));
stopClient(); //종료해줄게요
}
});
}
///////////////////////////////////////////
TextArea txtDisplay;
TextField txtInput;
Button btnConn, btnSend;
@Override
public void start(Stage primaryStage) throws Exception {
BorderPane root = new BorderPane();
root.setPrefSize(500, 300);
txtDisplay = new TextArea();
txtDisplay.setEditable(false);
BorderPane.setMargin(txtDisplay, new Insets(0,0,2,0));
root.setCenter(txtDisplay);
BorderPane bottom = new BorderPane();
txtInput = new TextField();
txtInput.setPrefSize(60, 30);
BorderPane.setMargin(txtInput, new Insets(0,1,1,1));
btnConn = new Button("start");
btnConn.setPrefSize(60, 30);
btnConn.setOnAction(e->{
if(btnConn.getText().equals("start")) {
startClient();
} else if(btnConn.getText().equals("stop")){
stopClient();
}
});
btnSend = new Button("send");
btnSend.setPrefSize(60, 30);
btnSend.setDisable(true);
btnSend.setOnAction(e->send(txtInput.getText()));
bottom.setCenter(txtInput);
bottom.setLeft(btnConn);
bottom.setRight(btnSend);
root.setBottom(bottom);
Scene scene = new Scene(root);
scene.getStylesheets().add(getClass().getResource("app.css").toString());
primaryStage.setScene(scene);
primaryStage.setTitle("Client");
primaryStage.setOnCloseRequest(event->stopClient());
primaryStage.show();
}
void displayText(String text) {
txtDisplay.appendText(text + "\n");
}
public static void main(String[] args) {
launch(args);
}
}
/*text-area 배경색*/
.text-area {
-fx-background-color: gold;
}
/*scroll-pane 배경색*/
.text-area .scroll-pane {
-fx-background-color: transparent;
}
/*viewport 배경색*/
.text-area .scroll-pane .viewport{
-fx-background-color: transparent;
}
/*content 배경색*/
.text-area .scroll-pane .content{
-fx-background-color: transparent;
}
udp
package sec09.exam01_udp;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
public class UdpReceiveExample {
public static void main(String[] args) throws Exception {
//수신 프로그램 먼저 실행시킨다.
DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET); // open 메소드는 예외처리가 필요함.
datagramChannel.bind(new InetSocketAddress(5001));// 포트 바인딩하기
Thread thread = new Thread() { // 익명 객체로 만들어주기.
@Override
public void run() {
System.out.println("[수신 시작]");
try {
while(true) {//계속 데이터를 받아야 하므로
ByteBuffer byteBuffer = ByteBuffer.allocateDirect(100);
SocketAddress socketAddress = datagramChannel.receive(byteBuffer); //데이터를 받는다.
//다 읽었다면 이제 문자열로 변환해야 한다.
byteBuffer.flip();// 일단 플립부터 하자
Charset charset = Charset.forName("UTF-8");
String data = charset.decode(byteBuffer).toString();
System.out.println("[받은 내용:"+socketAddress.toString()+"]"+data);
}
}catch(Exception e) {
System.out.println("[수신 종료]");
}
}
};
thread.start();
//프로그램이 너무 빨리 끝나면 수신을 할 수 없기 떄문에 메인스레드를 10초간 대기시킨다.
Thread.sleep(10000);
datagramChannel.close();
}
}
package sec09.exam01_udp;
import java.net.InetSocketAddress;
import java.net.StandardProtocolFamily;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.charset.Charset;
public class UdpSendExample {
public static void main(String[] args) throws Exception{
DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
System.out.println("[발신 시작]");
for(int i = 1; i<= 3; i++) {
String data = "메시지" + i;
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode(data);
int byteCount = datagramChannel.send(byteBuffer, new InetSocketAddress("localhost",5001));// 두번째매개변수는 어디로 데이터를 보낼지임.
System.out.println("[보낸 바이트 수]"+byteCount + "bytes");
}
System.out.println("[발신 종료]");
datagramChannel.close();
}
}
필기
package nionetworking;
/*
크게 요약하면
tcp 블로킹
tcp 넌블로킹 (셀렉터가 등장함.)
tcp 비동기
로 나누면 됨.
udp는 워낙 내용이 적어서...
TCP 서버/클라이언트 세가지 구현 방식
블로킹: 연결 요청, 연결 수락, 입출력 작업시 블로킹
넌블로킹: 연결요청, 연결 수락, 입출력 작업시 넌블로킹
작업 처리 준비된 것만 셀렉터가 선택해서 처리하는 방식
비동기: 연결 요청, 연결 수락, 입출력 작업시 넌블로킹
스레드풀에서 처리후 콜백 메소드 호출
서버소켓 채널과 소켓채널의 용도
ServerSocketChannel
SocketChannel
서버소켓 채널 생성과 연결 수락
ServerSocketChannel 생성
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(true); //블로킹 방식인지 넌블로킹방식인지를 설정함 디폴드값은 블로킹임. 이건 명시적으로 블로킹방식으로 사용하겠다는 뜻이다.
serverSocketChannel.bind(new InetSocketAddress(5001)); //바인딩하기
연결 수락
SocketChannel socketChannel = serverSocketChannel.accept();
닫기
serverSocketChannel.close();
소켓 채널 생성과 연결 요청
SocketChannel 생성과 연결 요청
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(true); //얘도 디폴트값은 블로킹이다.
socketChannel.connect(new InetSocketAddress("localhost",5001));
닫기
socketChannel.close();
<소켓 채널 데이터 통신 >
통신은 버퍼를 이용한다. 상대방으로 보낼 데이터가 있다면 버퍼에 저장하고 그 버퍼에 있는 내용을 상대방에게 출력한다. 반대도 마찬가지... 그냥 버퍼 사용한다고 보면됨.
보내는 코드
Charset charset = Charset.forName("UTF-8");
ByteBuffer byteBuffer = charset.encode("Hello Server");
socketChannel.write(byteBuffer);
받는코드
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
int byteCount = socketChannel.read(byteBuffer);
byteBuffer.flip();
Charset charset = Charset.forName("UTF-8");
String message = charset.decode(byteBuffer).toString();
read()가 블로킹이 해제되는 경우
1. 상대방이 데이터를 보냄 읽은 바이트 수 리턴
2. 상대방이 정상적으로 SocketChannel의 close() 를 호출 -1 리턴
3. 상대방이 비정상적으로 종료 IOException 발생
--> 2와 3의 경우에는 소켓채널을 close()해줘야 한다.
tip:
tcp서버는 거의 필수적으로 병렬 처리가 필요하다.
클라이언트 수가 갑자기 증가하면 서버 내의 스레드도 갑자기 증가하여 서버가 다운되는 경우가 생길 수 있다.
--> 이런 문제를 해결하기 위해 스레드풀을사용한다. (즉 제한된 스레드를 가지고 작업을 처리하기 때문에 갑자기 성능이 저하되지 않는다. )
작업의 양만 늘어날뿐... 스레드의 개수는 일정하다. < - 스레드풀의 이점...
병렬 처리의 핵심은 스레드풀의 사용이다.
-------------------------------------------------------------------------------------------------------
넌블로킹(non-blocking) 방식의 특징
connect(), accept(), read(), write() 메소드는 블로킹없이 즉시 리턴된다.
-> 작업 처리 준비가 되지 않은 상태에서 이들 메소드를 실행하면 안된다.
-> 클라이언트 연결 요청이 없는 상태에서 accept()를 실행시키면 즉시 null을 리턴한다.
읽을 데이터가 없는 상태에서 read()메소드를 호출하면 즉시 0을 리턴하고 ByteBuffer에는 어떤 데이터도 저장되지 않는다.
따라서 작업 처리 준비가 된 채널만 선택에서 처리해야 한다.
클라이언트 연결 요청이 들어올 때, accept() 메소드를 실행해서 연결 수락해야 한다.
데이터를 읽을 수 있을 때 read()메소드를 실행해서 데이터를 얻어야 한다.
셀렉터 -> 넌블로킹의 핵심 객체
셀렉터가 작업 처리 준비된 채널을 선택한다.
넌 블로킹 채널은 이벤트 리스너 역할을 하는 셀렉터를 사용한다.
채널이 작업 처리가 필요할 경우 셀렉터에 통보하고.
셀렉터는 통보한 채널을 선택한다.
멀티 채널 작업을 싱글 스레드에서 처리할 수 있다.
작업 스레드가 블로킹되지 않기 때문에 셀렉터가 선택한 채널들을 싱글 스레드에서 모두 처리할 수 있다. -> 즉 대기상태가 되지 않으니 하나의 스레드가 다 처리할 수 있다는 뜻
스레드풀을 사용할 경우, 적은 수의 스레드로 많은 양의 작업을 처리할 수 있다.
<셀렉터(Selector) 동작 원리>
채널은 자신의 작업 유형을 키(SelectionKey)로 생성한다.
셀렉터의 관심키셋(interst-set)에 키를 등록한다.
셀렉터는 작업 처리 준비가 된 키를 선택하고 선택된 키셋에 별도로 저장한다.
작업 스레드는 선택된 키셋에서 키를 하나씩 꺼내어 연관된 채널 작업을 처리한다.
셀렉터 생성
try{
Selector selector = Selector.open();
}catch(IOException e){}
넌블로킹 채널 생성
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false); // 즉 false를 주면 넌블로킹채널로 동작한다. 소켓채널도 마찬가지이다.
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);
셀렉터 등록
SelectionKey selectionKey = serverSocketChannel.register(Selector sel,int ops);
SelectionKey selectionKey = socketChannel.register(Selector sel, int ops);
셀렉터 등록시에 사용하는 SelectionKey의 상수 <- 어떤 작업을 셀렉터가 관심을 가지고 감시할지를 설정하는 것이다.
OP_ACCEPT ServerSocketChannel의 연결을 수락 작업
OP_CONNECT SocketChannel의 서버 연결 작업 (클라이언트가 서버로 연결 요청을 할떄.)
OP_READ SocketChannel의 데이터 읽기 작업
OP_WRITE SocketChannel의 데이터 쓰기 작업
<SelectionKey>
register()는 채널과 작업 유형 정보를 가지고 SelectionKey를 생성한 후, 관심키셋에 저장시킨다.
작업 유형 변경, 첨부 객체 저장, 채널 등록 취소할 떄 사용할 수 있다. 작업 스레드가 채널 작업을 처리할 때 SelectionKey가 필요하다.
채널의 KeyFor()메소드로 얻을 수도 있다.
SelectionKey key = socketChannel.keyFor(selector); // -> keyFor로 바로 얻을 수 있기 때문에 별도로 관리해주지 않아도 된다.
<선택된 키셋>
Selector의 select()메소드
관심 키셋의 SelectionKey로부터 작업 처리 준비가 되었다는 통보가 올 때까지 select는 블로킹
-> 최소한 하나의 SelectionKey로부터 작업 처리 준비가 되었다는 통보가 오면 리턴
-> 리턴값은 통보를 해온 SelectionKey의 수
종류
리턴타입 메소드명 설명
int select() 최소한 하나의 채널이 작업 처리 준비가 될 때까지 블로킹된다.
int select(long timeout) select()와 동일한데, 주어진 시간(밀리세컨)동안만 블로킹된다.
int selectNow() 작업을 처리할 준비가 된 채널만 선택하고 즉시 리턴된다.
select()가 리턴되는 경우
-> 최소한 하나는 채널이 작업 처리 준비가 되었다는 통보를 할 떄
-> Selector의 wakeup()메소드를 호출할때
-> select()를 호출한 스레드가 인터럽트될 때
Selector의 wakeup()
채널의 작업 유형이 변경되면 SelectionKey의 작업 유형을 interestOps()로 변경
selectionKey.interestOps(SelectionKey.OP_WRITE);
selector.wakeup();
SelectionKey의 작업 유형을 변경하면 Selector의 wakeup()메소드를 호출한다.
-> 블로킹되어 있는 select()는 즉시 리턴하고, 변경된 작업 유형을 감시하도록 select()를 재실행하는 역할
<선택된 키셋 얻기>
select()메소드가 1이상의 값을 리턴할 경우에만
selectedKeys() 메소드로 작업 처리 준비된 SelectionKey들을 Set 컬렉션으로 얻으면 된다.
이것이 선택된 키셋이다.
int KeyCount = selector.select();
if(KeyCount > 0){
Set<SelectionKey> selectedKeys = selector.selectedKeys();
}
<작업 스레드에서 채널 작업 처리>
선택된 키셋에서 SelectionKey를 하나씩 꺼내어 작업 유형별로 채널 작업을 처리
SelectionKey가 어떤 작업 유형인지 알아내는방법
boolean isAcceptable() 작업 유형이 OP_ACCEPT 인 경우
boolean isConnectable() 작업 유형이 OP_CONNECT 인 경우
boolean isReadable() 작업 유형이 OP_READ 인 경우
boolean isWritable() 작업 유형이 OP_WRITE인 경우
int KeyCount = selector.select(); //작업 처리 준비된 키 감지
if(KeyCount == 0) {continue;} //키가 없을 경우 루프 처음으로 돌아감.
Set<SelectionKey> selectedKeys = selector.selectedKeys(); // 선택된 키셋 얻기
Iterator<SelectionKey> iterator = selectedKeys.iterator(); // 반복자 얻기
while(iterator.hasNext()){
SelectionKey selectionKey = iterator.next(); // 키를 하나씩 꺼내옴
if(selectionKey.isAcceptable()){ // 연결 수락 작업 처리 }
else if(selectionKey.isReadable()) {// 읽기 작업 처리}
else if(selectionKey.isWritable()) {// 쓰기 작업 처리}
iterator.remove();
}
SelectionKey 로부터 채널 객체 얻기
작업 스레드가 채널 작업을 처리하려면 채널 객체가 필요하다.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) selectionKey.channel();
첨부 객체 저장과 얻기
작업 스레드가 채널 작업을 처리하다보면 채널 객체 이외에 다른 객체가 필요
이러한 객체는 SelectionKey에 첨부해두고, 사용
attach() 메소드는 객체를 첨부하고, attachment() 메소드는 첨부된 객체를 얻을 때 사용
[객체 첨부하기]
Client client = new Client(socketChannel);
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_READ);
selectionKey.attach(client);
[첨부된 객체 얻기]
if(selectionKey.isReadable()){
Client client = (Client) selectionKey.attachment();
}
--------------------------------------------------------------------------------------
TCP 비동기 채널의 특징
connect(), accept(), read(), write(), 를 호출하면 즉시 리턴
실질적인 입출력 작업 처리는 스레드 풀의 스레드가 담당한다.
스레드가 작업 처리를 완료하면 콜백 메소드를 호출한다.
read() 메소드 호출 예
애플리케이션에서 read()메소드를 호출하면 즉시 리턴
내부적으로 스레드풀의 작업 스레드가 read() 메소드를 실질적으로 실행
작업 스레드가 read() 메소드를 모두 실행하고 나면 completed() 콜백
-> completed() 메소드를 실행하는 스레드는 스레드풀의 작업 스레드
<비동기 채널 그룹 생성 및 종료>
비동기 채널 그룹(AsynchronousChannelGroup)
같은 스레드풀을 공유하는 비동기 채널들의 묶음을 말한다.
하나의 스레드풀을 사용한다면 모든 비동기 채널은 같은 채널 그룹에 속해야 한다.
-> 이들채널이 사용하는 스레드풀을 공유하기 위해서 같은 비동기 채널 그룹에 포함시키는 것이다.
-> 일반적으로 어플리케이션에서는 스레드풀을 하나만 사용한다. 그래서 각각의 비동기 채널들이 하나의 비동기 채널 그룹에 포함되어야 한다.
비동기 채널 그룹 생성
비동기 채널을 생성할 때 채널 그룹을 지정하지 않으면 기본 비동기 채널 그룹에 속한다. -> 기본비동기 채널그룹은 잘 쓰이지 않는다. 보통 새로 만들어서 사용한다.
기본 비동기 채널 그룹은 내부적으로 생성되는 스레드풀을 이용한다.
new ThreadPoolExecutor{
0, Integer.MAX_VALUE,
Long.MAX_VALUE, TimeUnit.MILLISECONS,
new SynchronousQueue<Runnable>(),
threadFactory()
};
비동기 채널 그룹이 사용하는 스레드의 개수를 지정하려면 새로운 비동기 채널 그룹을 생성한다.
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
최대 스레드 수,
Executors.defaultThreadFactory()
);
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory()
);
비동기 채널 그룹 종료
channelGroup.shutdown();
channelGroup.shutdownNow();
shutdown()
비동기 채널 그룹을 종료하겠다는 의사만 전달할 뿐 즉시 비동기 채널 그룹을 종료하지는 않는다.
비동기 채널 그룹에 포함된 모든 비동기 채널이 닫히면 비로소 비동기 채널 그룹이 종료된다.
새로운 비동기 채널을 포함시키려고 하면 ShutdownChannelGroupException 이 발생한다.
shutdownNow()
강제적으로 비동기 채널 그룹에 포함된 모든 비동기 채널을 닫고 비동기 채널 그룹을 종료한다.
완료 콜백을 실행하고 있는 스레드는 종료되거나, 인터럽트 되지 않는다. -> 이미 작업이 다 끝나서 완료 콜백을 하는 애는 냅둔다는 뜻.
<비동기 서버소켓 채널 생성 및 연결 수락 >
기본 비동기 채널 그룹에 포함되는 비동기 서버 채널 생성
AsynchronousServerSocketChannel asynchronousServerSocketChannel =
AsynchronousServerSocketChannel.open(); // <- 이걸 사실상 잘 쓰이지 않는다.
일반적으로 이 방식이 더 자주 쓰인다.
새로 생성한 비동기 채널 그룹에 포함되는 비동기 서버 채널 생성
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory()
);
AsynchronousServerSocketChannel asynchronousServerSocketChannel =
AsynchronousServerSocketChannel.open(channelGroup);
포트 바인딩
asynchronousServerSocketChannel.bind(new InetSocketAddress(5001));
닫기
asynchronousServerSocketChannel.close();
연결수락
accept(A attachment, CompletionHandler<AsynchronousSocketChannel, A> handler);
CompletionHandler<AsynchronousSocketChannel, A> 에서 첫번째는 리턴타입이고, 두번째는 첨부 객체 타입이다.
accept에서
첫번쟤 매개값은 콜백 메소드의 매개값으로 제공할 첨부 객체
두번째 매개값은 콜백 메소드를 가지고 있는 CompletionHandler<AsynchronousSocketChannel,A> 구현 객체
asynchronousServerSocketChannel.accept(null,new CompletionHandler<AsynchronousSocketChannel,Void> () { //첨부객체 없다는 뜻,
@Override
public void completed(AsynchronousSocketChannel asynchronousSocketChannel, Void attachment){
//연결 수락 후 실행할 코드
//클라이언트의 연결 수락 작업은 1회로 끝나는 것이 아니다. 계속해서 다른 클라이언트의 연결을 수락해야 하기 때문에
//completed 메소드 에서는 마지막에 또 다른 클라이언트의 연결 수락을 위해서 다시 accept()를 호출해줘야만 한다.
//끝부분에 accept()가 들어가지 않으면 딱 한번만 클라이언트의 연결 수락을 하고 끝나버린다. 그래서 반드시 재호출 해줘야 한다.
asynchronousServerSocketChannel.accept(null,this); //accept() 재호출 // 여기서 this는 해당 CompletionHandler 구현 객체를 재사용하겠다는 뜻이다.
}
@Override
public void failed(Throwable exc, Void attachment){
//연결 수락 실패시 실행할 코드
}
});
<클라이언트에서 사용하는 비동기 소켓 채널 생성 및 닫기 >
기본 비동기 채널 그룹에 포함되는 비동기 소켓 채널 생성
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open(); // 이 방식은 잘 쓰이지 않는다.
새로 생성한 비동기 채널 그룹에 포함되는 비동기 소켓 채널 생성
AsynchronousChannelGroup channelGroup = AsynchronousChannelGroup.withFixedThreadPool(
Runtime.getRuntime().availableProcessors(),
Executors.defaultThreadFactory()
);
AsynchronousSocketChannel asynchronousSocketChannel = AsynchronousSocketChannel.open(channelGroup);
닫기
asynchronousSocketChannel.close();
<연결 요청>
connect(SocketAddress remote, A attachment, CompletionHandler<Void,A> handler); //여기서 Void는 고정값이다. 고정되어 있다. 아무것도 리턴하지 않는다.
첫번쨰 매개값은 서버IP와 연결 포트 정보를 가진 InetSocketAddress 객체
두번째 매개값은 콜백 메소드의 매개값으로 제공할 첨부 객체
세번째 매개값은 CompletionHandler<Void,A> 구현 객체
asynchronousSocketChannel.connect(new InetSocketAddress("localhost",5001),null, //서버에 연결 요청하고 싶다면 connect를 호출한다. 두번째 매개값은 (콜백메소드에 제공할)첨부객체가 없어서 null이다.
new CompletionHandler<Void,Void>(){ //connect는 결과타입이 없기 때문에 항상 첫번째 타입 파라미터는 Void이다. 두번째는 첨부객체가 없어서 Void로 했다.
@Override
public void completed(Void result, Void attachment){ //성공할 경우 자동으로 실행// 매개로 둘다 null이 대입될것이다.
//연결 성공후 실행할 코드
}
@Override
public void failed(Throwable exc, Void attachment){ //실패했을 때 자동으로 실행 //첫번째로 예외객체가 들어오고, 두번째는 null이 들어올 것이다.
//연결 실패후 실행할 코드
}
});
<비동기 소켓 채널 데이터 통신>
AsynchronousSocketChannel의 read()와 write() 메소드로 데이터 통신
read(ByteBuffer dst, A attachment, CompletionHandler<Integer, A> handler);
write(ByteBuffer src, A attachment, CompletionHandler<Integer, A> handler);
첫번째 매개값은 읽고 쓰기 위한 ByteBuffer 객체
두번째 매개값은 콜백 메소드의 매개값으로 제공할 첨부 객체
세번째 매개값은 CompletionHandler<Integer,A> 구현 객체
<read 메소드>
asynchronousSocketChannel.read(byteBuffer, attachment, new CompletionHandler<Integer,A>(){
@Override
public void completed(Integer result, A attachment){
//받은 데이터를 처리하는 코드
//계속적으로 read메소드를 실행하고 싶으면 completed 완료 콜백 끝에서 다시 read()를 호출해주면 된다. 그러면 한번의 read로 계속지속적으로 데이터를 읽을 수 있다.
asynchronousSocketChannel.read(byteBuffer, attachment, this); //read() 재호출
}
@Override
public void failed(Throwable exc, A attachment){
//실패된 경우 실행할 코드
}
});
<write메소드>
asynchronousSocketChannel.write(byteBuffer, attachment, new CompletionHandler<Integer,A>(){
@Override
public void completed(Integer result, A attachment){
//성공한 경우 실행할 코드
}
@Override
public void failed(Throwable exc, A attachment){
//실패된 경우 실행할 코드
}
});
---------------------------------------------------------------------------------------------------
<UDP 채널 (DatagramChannel)>
NIO에서의 UDP 채널
DatagramChannel
동기(블로킹)과 넌블로킹 방식 모두 사용 가능
<발신자 만들기>
DatagramChannel 생성
DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
StandardProtocolFamily 열거 상수
IPv4 -> StandardProtocolFamily.INET
IPv6 -> StandardProtocolFamily.INET6
데이터 보내기
int byteCount = datagramChannel.send(byteBuffer, new InetSocketAddress("localhost",5001));
닫기
datagramChannel.close();
<수신자 만들기>
DatagramChannel 생성 및 포트 바인딩
DatagramChannel datagramChannel = DatagramChannel.open(StandardProtocolFamily.INET);
datagramChannel.bind(new InetSocketAddress(5001));
데이터 받기
SocketAddress socketAddress = datagramChannel.receive(ByteBuffer dst);
데이터를 받기 전까지 receive()메소드는 블로킹되고, 데이터를 받으면 리턴
작업 스레드르 생성해서 receive() 메소드를 반복적으로 호출 해야 한다. (이벤트를 처리하는 스레드에서 이 메소드를 사용하면 안되기 때문이다. )
작업 스레드 종료 방법:
작업 스레드의 interrupt()를 호출시켜 ClosedByInterruptException 예외를 발생
DatagramChannel의 close() 를 호출시켜 AsynchronousCloseException 예외를 발생
닫기:
datagramChannel.close();
*/
'Java' 카테고리의 다른 글
[Java] 객체지향 UML (0) | 2021.01.02 |
---|---|
[Java] 자바 리플렉션 - java Reflection이란? (0) | 2021.01.02 |
[이것이 자바다] 자바 NIO 공부 정리 (0) | 2020.12.30 |
[이것이 자바다] 자바 네트워크1 공부 정리 (0) | 2020.12.27 |
[이것이 자바다] 자바 IO패키지 공부 정리 (0) | 2020.12.26 |