본문으로 바로가기
반응형

한빛미디어에서 펴낸 "이것이 자바다" 책에 대한 강의가 유튜브에 올라와있는데, 상당히 친절하게 깊게 설명해준다. 조금 오래되긴 했는데, 그 점과는 별개로 내용이 좋다. 쓰레드 관련 강의부분에 대한 코드와 필기 내용을 올려둔다. 새로 접한 내용이 많아서 재밌다. 

 

 

 

Create Thread 

package sec02.exam01_createthread;

import java.awt.Toolkit;

public class BeepPrintExample1 {

	public static void main(String[] args) {
		//비프음을 5번 반복해서 소리나게 하는 작업. 
		Toolkit toolkit = Toolkit.getDefaultToolkit();
		for(int i = 0;i<5;i++) {
			toolkit.beep();
			try {
				Thread.sleep(500);
			}catch(Exception e) {
				e.printStackTrace();
			}
		}
		//띵 문자열을 5번 출력하는 작업 
		for(int i = 0;i<5;i++) {
			System.out.println("띵");
			try {
				Thread.sleep(500);
			}catch(Exception e) {
				e.printStackTrace();
			}
		}
	}

}
package sec02.exam01_createthread;

import java.awt.Toolkit;

public class BeepPrintExample2 {

	public static void main(String[] args) {
//		//how1
//		Runnable beepTask = new BeepTask();
//		Thread thread = new Thread(beepTask);
//		thread.start();
		
		//how2
//		Thread thread = new Thread(new Runnable() {
//
//			@Override
//			public void run() {
//				// 비프음을 5번 반복해서 소리나게 하는 작업.
//				Toolkit toolkit = Toolkit.getDefaultToolkit();
//				for (int i = 0; i < 5; i++) {
//					toolkit.beep();
//					try {
//						Thread.sleep(500);
//					} catch (Exception e) {
//						e.printStackTrace();
//					}
//				}
//			}		
//		});
		//how3
		
		Thread thread = new Thread(() -> {
			// 비프음을 5번 반복해서 소리나게 하는 작업.
			Toolkit toolkit = Toolkit.getDefaultToolkit();
			for (int i = 0; i < 5; i++) {
				toolkit.beep();
				try {
					Thread.sleep(500);
				} catch (Exception e) {
					e.printStackTrace();
				}
			}
		});
		
		
		
		thread.start();
		
		//띵 문자열을 5번 출력하는 작업 
		for(int i = 0;i<5;i++) {
			System.out.println("띵");
			try {
				Thread.sleep(500);
			}catch(Exception e) {
				e.printStackTrace();
			}
		}
	}

}
package sec02.exam01_createthread;

import java.awt.Toolkit;

public class BeepPrintExample3 {

	public static void main(String[] args) {	
		
		//how1
//		Thread thread = new BeepThread(); //BeepThread thread로 해도 상관없음. 
//		thread.start();
		
		//how2
		Thread thread = new Thread(){
			public void run() {
				// 비프음을 5번 반복해서 소리나게 하는 작업.
				Toolkit toolkit = Toolkit.getDefaultToolkit();
				for (int i = 0; i < 5; i++) {
					toolkit.beep();
					try {
						Thread.sleep(500);
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			}
		};
		thread.start();
		
		//띵 문자열을 5번 출력하는 작업 
		for(int i = 0;i<5;i++) {
			System.out.println("띵");
			try {
				Thread.sleep(500);
			}catch(Exception e) {
				e.printStackTrace();
			}
		}
	}

}
package sec02.exam01_createthread;

import java.awt.Toolkit;

public class BeepTask implements Runnable {

	@Override
	public void run() {
		// 비프음을 5번 반복해서 소리나게 하는 작업.
		Toolkit toolkit = Toolkit.getDefaultToolkit();
		for (int i = 0; i < 5; i++) {
			toolkit.beep();
			try {
				Thread.sleep(500);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}

}
package sec02.exam01_createthread;

import java.awt.Toolkit;

public class BeepThread extends Thread {
	@Override
	public void run() {
		// 비프음을 5번 반복해서 소리나게 하는 작업.
		Toolkit toolkit = Toolkit.getDefaultToolkit();
		for (int i = 0; i < 5; i++) {
			toolkit.beep();
			try {
				Thread.sleep(500);
			} catch (Exception e) {
				e.printStackTrace();
			}
		}
	}
}

 

Thread Name 

package sec02.exam02_threadname;

public class ThreadA extends Thread{
	
	public ThreadA(){
		setName("ThreadA");
	}
	@Override
	public void run() {
		for(int i = 0;i<2;i++) {
			System.out.println(getName()+"가 출력한 내용");
		}
	}
}
package sec02.exam02_threadname;

public class ThreadB extends Thread{
	
	@Override
	public void run() {
		for(int i = 0;i<2;i++) {
			System.out.println(getName()+"가 출력한 내용");
		}
	}
}
package sec02.exam02_threadname;

public class ThreadNameExample {

	public static void main(String[] args) {
		Thread thread = Thread.currentThread();
		System.out.println("프로그램 시작 스레드 이름: "+thread.getName());
		
		Thread threadA = new ThreadA();
		System.out.println("작업 스레드 이름: "+threadA.getName());
		threadA.start();
		
		Thread threadB = new ThreadB();
		System.out.println("작업 스레드 이름: "+threadB.getName());
		threadB.start();
		
	}
}

 

Priority

package sec03.exam01_priority;

public class CalcThread extends Thread{
	public CalcThread(String name) {
		setName(name);
	}
	
	@Override
	public void run() {
		for(int i = 0;i<2000000000;i++) {}
		System.out.println(getName());
	}
	
}
package sec03.exam01_priority;

public class PriorityExample {

	public static void main(String[] args) {
		for(int i = 1;i<=10;i++) {
			Thread thread = new CalcThread("Thread" +i);
			if(i != 10) {
				thread.setPriority(Thread.MIN_PRIORITY);
			}else {
				thread.setPriority(Thread.MAX_PRIORITY);
			} 
			thread.start();
		}	
	}
}

 

 

State

package sec05.exam01_state;

public class TargetThread extends Thread{
	
	@Override
	public void run() {
		for(long i = 0; i< 1000000000;i++) {}
		
		try {  //이때 TIMED_WAITING이 출력된다. 
			Thread.sleep(1500);
		}catch(InterruptedException e) {}
		
		for(long i = 0; i< 1000000000;i++) {}
	}
	
}
package sec05.exam01_state;

public class StatePrintThread extends Thread {
	private Thread targetThread;

	public StatePrintThread(Thread targetThread) {
		this.targetThread = targetThread;
	}

	@Override
	public void run() {
		while (true) {
			Thread.State state = targetThread.getState();
			System.out.println("타겟 스레드 상태: " + state);

			if (state == Thread.State.NEW) {//처음 만들어지면, 
				targetThread.start();
			}

			if (state == Thread.State.TERMINATED) {
				break;
			}
			try {
				Thread.sleep(500);
			} catch (InterruptedException e) {}
		}
	}

}
package sec05.exam01_state;

public class ThreadStateExample {

	public static void main(String[] args) {
		StatePrintThread statePrintThread = new StatePrintThread(new TargetThread());
		statePrintThread.start();
	}
}

 

 

synchronized

package sec05.exam02_synchronized;

public class Calculator {
	private int memory;
	
	public int getMemory() {
		return memory;
	}
	
	public synchronized void setMemory(int memory) {
		this.memory = memory;
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {}
		
		System.out.println(Thread.currentThread().getName() + ": " + this.memory);
	}
	
}

 

package sec05.exam02_synchronized;

public class User1 extends Thread{
	private Calculator calculator;

	public void setCalculator(Calculator calculator) {
		this.setName("User1");
		this.calculator = calculator;
	}

	@Override
	public void run() {
		calculator.setMemory(100);
	}
}
package sec05.exam02_synchronized;

public class User2 extends Thread{
	private Calculator calculator;

	public void setCalculator(Calculator calculator) {
		this.setName("User2");
		this.calculator = calculator;
	}

	@Override
	public void run() {
		calculator.setMemory(50);
	}
}
package sec05.exam02_synchronized;

public class MainThreadExample {

	public static void main(String[] args) {
		Calculator calculator = new Calculator(); //공유 객체 생성
		
		User1 user1 = new User1(); //쓰레드 객체 생성 
		user1.setCalculator(calculator);
		user1.start();
		
		User2 user2  = new User2();
		user2.setCalculator(calculator);
		user2.start();
		
	}

}

 

 

sleep

package sec06.exam01_sleep;

import java.awt.Toolkit;

public class SleepExample {

	public static void main(String[] args) {
		Toolkit toolkit = Toolkit.getDefaultToolkit();
		for(int i = 0;i<10;i++) {
			toolkit.beep();
			try {
				Thread.sleep(3000);
			}catch(InterruptedException e) {}
		}
	}
}

 

 

 

yield

package sec06.exam02_yield;

public class ThreadA extends Thread{
	public boolean stop = false; //쓰레드를 종료할 목적 
	public boolean work = true;
	
	@Override
	public void run() {
		while(!stop) {
			if(work) {
				System.out.println("ThreadA 작업 내용");
			}else {
				Thread.yield(); //실행대기상태로 들어간다. 
			}
		}
		System.out.println("ThreadA 종료");
	}

}
package sec06.exam02_yield;

public class ThreadB extends Thread{
	public boolean stop = false; //쓰레드를 종료할 목적 
	public boolean work = true;
	
	@Override
	public void run() {
		while(!stop) {
			if(work) {
				System.out.println("ThreadB 작업 내용");
			}else {
				Thread.yield();
			}
		}
		System.out.println("ThreadB 종료");
	}

}
package sec06.exam02_yield;

public class YieldExample {

	public static void main(String[] args) {
		ThreadA threadA = new ThreadA();
		ThreadB threadB = new ThreadB();
		
		threadA.start();
		threadB.start();
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {}
		threadA.work = false;
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {}
		threadA.work = true; 
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {}
		
		
		threadA.stop = true;
		threadB.stop = true;
		
						
	}

}

 

 

 

join

package sec06.exam03_join;

public class SumThread extends Thread{
	private long sum;

	public long getSum() {
		return sum;
	}

	public void setSum(long sum) {
		this.sum = sum;
	}
	
	@Override
	public void run() {
		for(int i = 1;i<=100;i++) {
			sum += i;
		}
	}
}
package sec06.exam03_join;

public class JoinExample {

	public static void main(String[] args) {
		SumThread sumThread = new SumThread();
		sumThread.start();
		try {
			sumThread.join();
		} catch (InterruptedException e) {}

		System.out.println("1~100의 합: "+sumThread.getSum());
	}
}

 

 

 

wait_notify

package sec06.exam04_wait_notify;

public class WorkObject { //공유 객체 //동기화처리가 되어야 한다. synchronized 
	public synchronized void methodA(){
		System.out.println("ThreadA의 methodA() 작업 실행");
		notify(); //다른 쓰레드는 실행대기로 바뀐다. 
		try {
			wait();   //나는 일시정지한다. 
		} catch (InterruptedException e) {}
	}
	public synchronized void methodB(){
		System.out.println("ThreadB의 methodB() 작업 실행");
		notify();
		try {
			wait();
		} catch (InterruptedException e) {}
	}
}
package sec06.exam04_wait_notify;

public class ThreadA extends Thread{
	private WorkObject workObject;
	public ThreadA(WorkObject workObject) {
		this.workObject = workObject;
	}
	
	@Override
	public void run() {
		for(int i = 0;i<10;i++) {
			workObject.methodA();
		}
	}
}
package sec06.exam04_wait_notify;

public class ThreadB extends Thread{
	private WorkObject workObject;
	public ThreadB(WorkObject workObject) {
		this.workObject = workObject;
	}
	
	@Override
	public void run() {
		for(int i = 0;i<10;i++) {
			workObject.methodB();
		}
	}
}
package sec06.exam04_wait_notify;

public class WaitNotifyExample { //실행 메소드 

	public static void main(String[] args) {
		WorkObject sharedObject = new WorkObject();
		ThreadA threadA = new ThreadA(sharedObject);
		ThreadB threadB = new ThreadB(sharedObject);
		
		threadA.start();
		threadB.start();
	}
}

 

 

 

wait_noftify2

package sec06.exam05_wait_notify;

public class DataBox {
	private String data;

	public synchronized String getData() {
		if(this.data == null) {
			try {
				wait();
			} catch (InterruptedException e) {}
		}
		
		String returnValue = data;
		System.out.println("ConsumerThread가 읽은 데이터: "+returnValue);
		data = null;
		
		notify();
		return returnValue; 
	}

	public synchronized void setData(String data) { 
		if(this.data != null) { //아직 소비자가 읽지 않았다면, 
			try {
				wait();
			} catch (InterruptedException e) {}
		}
		this.data = data;
		System.out.println("ProducerThread가 읽은 데이터: "+data);
		notify();
		
	}
	
}
package sec06.exam05_wait_notify;

public class ProducerThread extends Thread{ //생성자 스레드 
	private DataBox dataBox;
	
	public ProducerThread(DataBox dataBox) {
		this.setName("ProducerThread");
		this.dataBox = dataBox;
	}
	@Override
	public void run() {
		for(int i = 1;i<=3;i++) {
			String data = "Data-" + i;
			dataBox.setData(data);
		}
	}
	
}
package sec06.exam05_wait_notify;

public class ConsumerThread extends Thread{ //소비자 스레드 
	private DataBox dataBox;
	
	public ConsumerThread(DataBox dataBox) {
		this.setName("ConsumerThread");
		this.dataBox = dataBox;
	}
	@Override
	public void run() {
		for(int i = 1;i<=3;i++) {
			String data = dataBox.getData();
			
		}
	}
}
package sec06.exam05_wait_notify;

public class WaitNotifyExample {

	public static void main(String[] args) { //메인 실행 메소드 
		DataBox dataBox = new DataBox();
		ProducerThread producerThread = new ProducerThread(dataBox);
		ConsumerThread consumerThread = new ConsumerThread(dataBox);
		
		producerThread.start();
		consumerThread.start();
			
	
	}
}

 

 

 

stop

package sec06.exam06_stop;

public class PrintThread1 extends Thread{
	private boolean stop; //이게 stop 플래그 

	public void setStop(boolean stop) {
		this.stop = stop;
	}
	@Override
	public void run() {
		while(!stop) {
			System.out.println("실행 중");
		}
		System.out.println("자원 정리");
		System.out.println("실행 종료");
	}
}
package sec06.exam06_stop;

public class PrintThread2 extends Thread {
	@Override
	public void run() {
//		try {
//			while (true) {
//				System.out.println("실행 중");
//				Thread.sleep(1);
//			}
//		} catch (InterruptedException e) {}
		
		//두번째 방법임. 
		while (true) {
			System.out.println("실행 중");
			if(Thread.interrupted()){ //얘는 정적메소드이고, 이거 대신에 isInterrupted() 하나만 써줘도 됨. 인스턴스메소드임 얘는. 
				break;
			}
		}
		
		System.out.println("자원 정리");
		System.out.println("실행 종료");
	}
}
package sec06.exam06_stop;

public class InterruptedExample { 

	public static void main(String[] args) {
		Thread thread = new PrintThread2();
		thread.start();
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {}
		
		thread.interrupt();
	}
}
package sec06.exam06_stop;

public class StopFlagExample { //stop 플래그를 이용한 중지/ 대신 이건 일시정지 상태에 있는 스레드를 종료하지는 못함. 

	public static void main(String[] args) {
		PrintThread1 printThread = new PrintThread1();
		printThread.start();
	
		try {
			Thread.sleep(1000);
		}catch(InterruptedException e) {}
		
		printThread.setStop(true);
	}

}

 

 

 

Daemon

package sec07.exam01_daemon;

public class AutoSaveThread extends Thread{
	public void save() {
		System.out.println("작업 내용을 저장함.");
	}
	@Override
	public void run() {
		while(true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {break;}
			save();
		}
	}
}
package sec07.exam01_daemon;

public class DaemonExample { //데몬 스레드는 메인이 종료될때 같이 종료됨. 

	public static void main(String[] args) { //데몬 스레드 생성. 
		AutoSaveThread autoSaveThread = new AutoSaveThread();
		autoSaveThread.setDaemon(true);
		autoSaveThread.start();
		
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {}
		
		System.out.println("메인 스레드 종료");
	}

}

 

 

 

ThreadGroup

package sec08.exam01_threadgroup;

public class AutoSaveThread extends Thread{
	public void save() {
		System.out.println("작업 내용을 저장함.");
	}
	@Override
	public void run() {
		while(true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {break;}
			save();
		}
	}
}
package sec08.exam01_threadgroup;

public class WorkThread extends Thread{
	//그룹에 포함되려면 쓰레드의 생성자를 다음과 같이 선언한다. 
	public WorkThread(ThreadGroup threadGroup, String threadName) {
		super(threadGroup,threadName);
	}
	
	@Override
	public void run() {
		while(true) {
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				System.out.println(getName() + " interrupted");
				break;
			}
		}
		System.out.println(getName() + "종료됨.");
	}
}
package sec08.exam01_threadgroup;

import java.util.Map;
import java.util.Set;
/*관련된 스레드를 묶어서 관리할 목적으로 이욯나다. 
  스레드 그룹은 계층적으로 하위 스레드 그룹을 가질 수 있다. 
  system그룹 : JVM 운영에 필요한 스레드들을 포함,. 
  system/main그룹: 메인 스레드를 포함. 
  
  스레드는 반드시 하나의 스레드 그룹에 포함도니다. 
  기본적으로 자신을 생성한 스레드와 같은 스레드 그룹에 속하게 된다. 
  명시적으로 스레드 그룹에 포함시키지 않으면, 기본적으로 system/main그룹에 속하게 된다. 
*/

public class ThreadInfoExample {

	public static void main(String[] args) {
		AutoSaveThread autoSaveThread = new AutoSaveThread();
		autoSaveThread.setName("AutoSaveThread");
		autoSaveThread.setDaemon(true);
		autoSaveThread.start();

		// 스택에 있는 모든 스레드에 대한 정보를 가져온다. (Map<컬렉션>타입 리턴)
		Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();
		Set<Thread> threads = map.keySet();
			
		for(Thread thread : threads) {
			System.out.println("Name: "+thread.getName()+((thread.isDaemon())?"(데몬)":"(주)"));
			System.out.println("\t" + "소속그룹: "+thread.getThreadGroup().getName()); //스레드 그룹의 이름 얻기
			System.out.println();
			
		}
	}

}
package sec08.exam01_threadgroup;

public class ThreadGroupExample {

	public static void main(String[] args) {
		ThreadGroup myGroup = new ThreadGroup("myGroup");//메인 그룹의 하위 그룹으로 만들어진다. 
		WorkThread workThreadA = new WorkThread(myGroup,"workThreadA");
		WorkThread workThreadB = new WorkThread(myGroup,"workThreadB");
		
		workThreadA.start();
		workThreadB.start();
		
		System.out.println("[main 스레드 그룹의 list() 메소드 출력 내용]");
		ThreadGroup mainGroup = Thread.currentThread().getThreadGroup();
		mainGroup.list(); //해당 그룹 내의 상태들을 모두 출력함. 실제로 직접 출력을 해줌! 
		System.out.println();
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {}
		
		System.out.println("[myGroup 스레드 그룹의 interrupt() 메소드 호출]");
		myGroup.interrupt(); //그룹에 속한 모든 스레드가 interrupt 된다. 
		
		
	}

}

 

 

 

Execute submit 

package sec09.exam01_execute_submit;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;

public class ExecuteVsSubmitExample {

	public static void main(String[] args) throws Exception {
		ExecutorService executorService = Executors.newFixedThreadPool(2);

		for (int i = 0; i < 10; i++) {
			Runnable runnable = new Runnable() {
				@Override
				public void run() {
					ThreadPoolExecutor threadPoolExecutor = (ThreadPoolExecutor) executorService;
					int poolSize = threadPoolExecutor.getPoolSize();
					String threadName = Thread.currentThread().getName();
					System.out.println("[총 스레드 개수: "+poolSize + "] 작업 스레드 이름: " + threadName);
				
					int value = Integer.parseInt("삼");//여기서 일부러 에러를 발생시킴. 
				}
			}; 
			
			//execute와 submit의 차이 
//			executorService.execute(runnable);// 10개의 Runnable 객체가 들어감.//작업객체를 작업 큐에 넣어준다. 
			executorService.submit(runnable); //작업큐에 넣어줍시다. 
			//submit은 스레드를 재사용한다. 
			
			Thread.sleep(10); //throws로 던졌음. (JVM이 처리하세요.) 
		}
		executorService.shutdown(); //끝났으면 종료 
	}

}

 

 

 

blocking

package sex09.exam02_blocking;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class NoResultExample {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors() // 프로그램에서 사용할 수 있는 코어의 수 
		);
		
		System.out.println("[작업 처리 요청]");
		Runnable runnable = new Runnable() { //작업 객체
			@Override
			public void run() {
				int sum = 0;
				for(int i = 1;i<=10;i++) {
					sum += i;
				}
				System.out.println("[처리 결과]" + sum);
			}
		};
		
		//submit은 Future를 리턴한다. future는 지연완료객체이다.  Future는 결과값이 아니다. 
		//future에서 get메소드를 호출하면, 스레드가 작업을 완료할때까지 기다렸다가 결과값을 얻는다. 
		//작업이 완료될 때까지 blocking 된다. 
		Future future = executorService.submit(runnable); 
		
		
		//어차피 runnable객체는 리턴값이 없는 객체라서 null을 리턴한다. get은 blocking이 된다. 
		//풀에서 다 실행해야만 작동한다. 
		try {
			future.get();
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {System.out.println("[예외 발생함]"+e.getMessage());}
		
		executorService.shutdown();
	}
}
package sex09.exam02_blocking;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultByCallableExample {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors() // 프로그램에서 사용할 수 있는 코어의 수 
		);
		
		System.out.println("[작업 처리 요청]");
		Callable<Integer> task = new Callable<Integer>() { //작업 객체
			@Override
			public Integer call() {
				int sum = 0;
				for(int i = 1;i<=10;i++) {
					sum += i;
				}
				return sum;
			}
		};
		
		Future<Integer> future = executorService.submit(task); 
		
		try {
			int sum = future.get();
			System.out.println("[처리 결과]"+sum);
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {System.out.println("[예외 발생함]"+e.getMessage());}
		
		executorService.shutdown();
	}
}
package sex09.exam02_blocking;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ResultByRunnableExample { //스레드의 결과를 취합한 걸 확인할 수 있는 예제임. 

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
		System.out.println("[작업 처리 요청]");
		
		class Task implements Runnable{
			Result result; //공유객체
			Task(Result result){
				this.result = result;
			}
			@Override
			public void run() {
				int sum = 0;
				for(int i = 1;i<=10;i++) {
					sum+=i;
				}
				result.addValue(sum);
			}
		};
		
		Result result = new Result(); // 공유객체이다. 스레드의 외부에 있으므로, 외부객체이다. 
		//두개의 작업을 정의한다. 
		Runnable task1 = new Task(result);
		Runnable task2 = new Task(result);
				
		Future<Result> future1 = executorService.submit(task1,result); //스레드풀에 작업 처리 요청 
		Future<Result> future2 = executorService.submit(task2,result);
		
		
		try {
			result = future1.get();//여기선 Result객체가 반환된다. 
			result = future2.get();//여기선 Result객체가 반환된다. 
			//task1 과 task2가 취합된 결과를 밑에서 확인할 수 있다. 
			System.out.println("[처리 결과]"+result.accumValue);
			System.out.println("[작업 처리 완료]");
		} catch (Exception e) {
			System.out.println("[실행 예외 발생함]"+e.getMessage());
		} 
	}

}
class Result { //공유객체다. 동기화의 개념을 떠올려라!. 공유객체 -> 동기화 염두하기. 
	int accumValue;
	synchronized void addValue(int value) { //여러 스레드에의해 접근되므로 동기화시켜준다. 
		accumValue += value;
	}
}
package sex09.exam02_blocking;

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
//블로킹 방식의 작업 완료 통보 방법. 
//take 메소드는 작업이 완료된 순서대로 그 작업의 결과를 가져오는 future를 얻는다. 

//이 부분은 조금 내용이 어렵다. 스레드 풀에서의 블로킹방식..? 다시 한번 봐야 할듯. 
public class CompletionServiceExample {

	public static void main(String[] args) {
		ExecutorService executorService = Executors.newFixedThreadPool(
			Runtime.getRuntime().availableProcessors()
		);
		CompletionService<Integer> completionService = 
				new ExecutorCompletionService<Integer>(executorService);
		
		System.out.println("[작업 처리 요청]");
		for(int i = 0;i<3;i++) { //작업 객체를 3개를 만들게요. 작업큐에 3개의 callable객체를 저장할게요. 
			completionService.submit(new Callable<Integer>() { // completionService의 submit임. 명심하기. 
				@Override
				public Integer call() throws Exception {
					int sum = 0;
					for(int i =1;i<=10;i++) {
						sum += i;}
					return sum;
				}
			});
		} 
		System.out.println("[처리 완료된 작업 확인]");
		
		//take메소드는 블로킹이 된다.take를 호출하는 스레드가 이벤트orUI작업하는 거라면  위험하기 때문에 항상 다른 쓰레드에서 사용해야만 한다. 
		//그래서 굳이 executorService에 넣어서 사용하는거다. take를 사용하려고 스레드를 만들었다! 
		executorService.submit(new Runnable() {
			@Override
			public void run() {
				while(true) {
					try {
						Future<Integer> future = completionService.take(); //take는 while문안에서 사용하는게 맞다. 
						int value = future.get(); //이때의 get은 블로킹이 되지 않는다. 이미 take에서 결과값을 가져오기 때문에 / get이 이미 완료된 상태이기 때문에 
						System.out.println("[처리 결과]"+value);
					} catch (Exception e) { break;}
				}
			}
		});
		
		//3초 정지한 이유 --
		//completionService가 실행할 시간을 준다. 
		// 작업 큐에 callable을 저장했다고 해서 작업이 끈난 것은 아니다. 
		//스레드가 작업 큐에서 작업을 가져와서 실행을 하기 위해  시간이 필요하다. 
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {}
		executorService.shutdownNow();
	}

}

 

 

callback

package sec09.exam03_callback;

import java.nio.channels.CompletionHandler;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class CallbackExample { //콜백  방식 
	private ExecutorService executorService;
	
	public CallbackExample(){ 
		executorService = Executors.newFixedThreadPool(
				Runtime.getRuntime().availableProcessors()
		);
	}
	
	//콜백객체 생성. 결과 타입은 Integer이고, 첨부객체는 사용하지 않으면, Void라고 지정한다.  
	private CompletionHandler<Integer, Void> callback = 
		new  CompletionHandler<Integer, Void>() {
		@Override
		public void completed(Integer result, Void attachment) {
			System.out.println("completed() 실행 "+result);
		}
		@Override
		public void failed(Throwable exc, Void attachment) {
			System.out.println("failed() 실행 " +exc.toString());
		}
	};
	
	//스레드 풀 작업큐에 작업 넣기. 
	public void doWork(String x, String y) {
		Runnable task = new Runnable() {
			@Override
			public void run() {
				try {
					int intX = Integer.parseInt(x);
					int intY = Integer.parseInt(y);
					int result = intX + intY;
					callback.completed(result,null);
				}catch(NumberFormatException e) { //parseInt에서 예외 발생. 
					callback.failed(e, null); //실패 골백 호출하기. 
				}
				
			}
		};
		executorService.submit(task); //작업 큐에 Runnable 객체를 넣어준다. 
	}
	public void finish() {
		executorService.shutdown();
	}
	
	public static void main(String[] args) {
		CallbackExample example = new CallbackExample();
		example.doWork("3","3");  //정상 처리 
		example.doWork("3", "삼"); //예외 발생함. 
		
		example.finish();
	}

}
/*
					스레드 풀
<스레드 폭증>
병렬작업처리가 많아지면 스레드의 개수가 증가한다. 
스레드 생성과 스케쥴링으로 인해 cpu가 바빠지고, 메모리 사용량이 늘어난다. 
따라서 애플리케이션의 성능이 급격히 저하된다. 

<스레드 풀>
작업 처리에 사용되는 스레드를 제한된 개수만큼 미리 생성한다. 
작업 큐(Queue)에 들어오는 작업들을 하나씩 스레트가 맡아 처리. 
 작업 처리가 끝난 스레드는 작업 결과를 애플리케이션으로 전달한다. 
 스레드는 다시 작업큐에서 새로운 작업을 가져와 처리한다. 
 
 <ExecutorService 인터페이스와 Executors 클래스 >
 스레드 풀을 생성하고 사용할 수 있도록 java.util.concurrent 패키지에서 제공
 Executors 의 정적 메소드를 이용해서 ExecutorService 구현 객체 생성
 스레드 풀 = ExecutorService 객체 
 
 --> 서버프로그램을 만들 때 많이 사용한다. 
 
 									초기 스레드 수      코어 스레드 수    최대 스레드 수
 newCachedThreadPool()   		 	0             0           Integer.MAX_VALUE
 newFixedThreadPool(int nThreads) 	0    		  nThreads    nThreads
 
newCachedThreadPool() 
int 값이 가질 수 있는 최대 값만큼 스레드가 추가되나, 운영체제의 상황에 따라 달라진다.
1개 이상의 스레드가 추가되었을 경우
60초 동안 추가된 스레드가 아무 작업을 하지 않을 경우
추가된 스레드를 종료하고, 풀에서 제거한다. 
ExecutorService executorService = Executors.newCachedThreadPool();

newFixedThreadPool(int nThreads)
코어 스레드 개수와 최대 스레드 개수가 매개값으로 준 nThreads이다. 
스레드가 작업을 처리하지 않고 놀고 있더라도 스레드 개수가 줄 지 않는다.
 
ExecutorService executorService = Executors.newFixedThreadPool(
	Runtime.getRuntime().availableProcessors(); //cpu가 가지고 있는 코어의 수만큼 줌. 
);


<ThreadPoolExecutor을 이용한 직접 생성. >
newCachedThreadPool() 와 newFixedThreadPool(int nThreads)가 내부적으로 생성
스레드의 수를 자동으로 관리하고 싶을 경우에 직접 생성해서 사용한다. 

ExecutorService threadPool = new ThreadPoolExecutor(
	3,                                //코어 스레드 개수 
	100, 							  //최대 스레드 개수 
	120L, 							  //놀고 있는 시간
	TimeUnit.SECONDS,				  //놀고 있는 시간 단위
	new SynchronousQueue<Runnable>()  //작업큐 
);

<스레드풀 종료> 
스레드풀의 스레드는 기본적으로 데몬 스레드가 아니다. 
main스레드가 종료되더도 스레드풀의 스레드는 작업을 처리하기 위해 계속 실행되므로
애플리케이션은 종료되지 않는다. 
따라서 스레드풀을 종료해서 모든 스레드를 종료시켜야 한다. 

스레드풀 종료 메소드 
shutdown()
shutdownNow() -> 가급적이면 이건 쓰지 마세용. 
...


<작업 생성>
하나의 작업은 Runnable 또는 Callable객체로 표현한다. 
(둘의 차이점은 작업 처리 완료 후 리턴값이 있느냐 없느냐 이다. )

<스레드풀에서 작업 처리>
작업 큐에서 Runnable 또는 Callable 객체를 가져와
스레드로 하여금 run()와 call()메소드를 실행토록 하는 것이다. 

<작업 처리 요청>
ExecutorService 의 작업 큐에 Runnable 또는 Callable 객체를 넣는 행위를 말한다. 
execute(Runnable command) // 작업 처리 결과를 받지 못한다 
submit(Runnable task)     // 리턴된 결과값을 받을 수 있음. ->3개로 오버로딩되있음. 

 <작업 처리 도중 예외가 발생한 경우>
 execute()
 스레드가 종료되고, 해당 스레드는 제거된다.
  따라서 스레드풀은 다른 작업 처리를 위해 새로운 스레드를 생성한다. 
  
 submit()  -> 더 효율적임. -> 스레드는 재사용하는 것이 좋다. 일반적으로 얘를 더 많이 사용한다. 
 스레드가 종료되지 않고, 다음 작업을 위해 재사용된다. 
 
 
 
< 블로킹 방식의 작업 완료 통보 받기.> 
 <Future>
 작업 결과가 아니라 지연완료 객체 
 작업이 완료될때까지 기다렸다가 최종 결과를 얻기 위해 get() 메소드 사용 
 
 Future의 get()은 UI스레드에서 호출하면 안된다. 
 UI를 변경하고, 이벤트를 처리하는 스레드가 get()메서드를 호출하면,
 작업을 완료하기 전까지는 UI를 변경할 수도 없고, 이벤트도 처리할 수 없게 된다. 
 
<작업 완료순으로 통보 받기>
작업 요청 순서대로 작업 처리가 완료되는 것은 아니다. 
작업의 양과 스레드 스케쥴링에 따라서 먼저 요청한 작업이 나중에 완료되는
경우도 발생한다. 
여러 개의 작업들이 순차적으로 처리될 필요성이 없고, 
처리 결과도 순차적으로 이용될 필요가 없다면 -> 작업처리가 완료된 것부터 결과를 얻어 이용하는 것이 좋다. 

<CompletionService 객체 얻기>
poll()과 take()메소드를 이용해서 처리 완료된 작업의 Future을 얻을려면 
CompletionService의 submit()메소드로 작업 처리 요청을 해야 한다. (ExcutorService의 submit과는 구분되는 별개의 것이다./ 근데 하는일은 비슷함. )


<콜백 방식의 작업 완료 통보 받기. >
콜백이란? 애플리케이션이 스레드에게 작업 처리를 요청한 후, 다른 기능을 수행할 동안,
스레드가 작업을 완료하면 애플리케이션의 메서드를 자동 실행하는 기법을 말한다.
이때 자동실행되는 메소드를 콜백 메소드라고 한다. 

콜백객체를  생성을 해야 한다. 
콜백객체: 콜백 메소드를 가지고 있는 객체 
콜백하기: 스레드에서 콜백 객체의 메소드 호출 
콜백메서드를 실행하는 것은 스레드풀의 스레드이다. 
 */

<추가 설명>

1. CompletionService의 submit을 이용하는 이유는 poll()과 take()를 사용하기 위함이다. 

2. take()메소드를 반복 실행해서 완료된 작업을 계속 통보 받을 수 있도록 한다. 

 

반응형

댓글을 달아 주세요

Razelo 기술노트
블로그 이미지 Razelo 님의 블로그
VISITOR 오늘23 / 전체59,370