본문으로 바로가기

1. Runnable vs Callable

Runnable

  • 스레드를 사용하려면 Runnable을 구현해야 합니다. 그러나 Runnable의 run 메소드는 리턴 값이 없습니다. 
@FunctionalInterface
public interface Runnable {
    public abstract void run();
}

 

Callable

  • 그러나 리턴값이 필요한 경우를 위해 Callable이 등장합니다.
  • Runnable과 거의 비슷하지만 Callable은 작업의 결과를 받을 수 있습니다.  
  • 단 Thread 클래스에 바로 구현을 할 수는 없고 Executors 같은 스레드 풀과 Future를 이용해야 합니다.
  • Callable 객체를 만들고 ExecutorService에 등록한 다음 Future 객체를 반환받아서 핸들링하는 형태로 사용합니다.
@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

 

[구현 형태]

// 익명 구현 클래스
Callable<String> implementAnonymousClass = new Callable<String>() {
    @Override
    public String call() throws Exception {
        return "AnonymousClass";
    }
};
String call1 = implementAnonymousClass.call();

// 람다
Callable<String> implementLambda = () -> "Lambda";
String call2 = implementLambda.call();

2. Callable 사용법

  1. Callable 작업(구현체) 생성
  2. Thread Pool 생성
  3. ExecutorService 인스턴스의 submit()을 이용해 Callable 작업을 전달하고 Future 인스턴스를 받음(작업이 시작됨)
  4. 받은 Future 인스턴스를 이용하여 Callable 작업 결과를 받음
  5. 잊지 말고 Thread Pool 종료
// 1. Callable 작업(구현체) 생성
Callable<String> implementLambda = () -> "Lambda";

// 2. Thread Pool 생성
ExecutorService executorService = Executors.newCachedThreadPool();

// 3. submit()을 이용해 Callable 작업을 전달하고 Future 인스턴스를 리턴받음 (작업이 시작됨)
Future<String> submit = executorService.submit(implementLambda);

// 4. Future 인스턴스를 이용하여 Callable 작업 결과를 받음
String result = submit.get();

// 5. 잊지말고 Thread Pool 종료
executorService.shutdown();

 

ExecutorService submit()

public interface ExecutorService extends Executor {
   <T> Future<T> submit(Callable<T> task);
   
   // (. . .) 생략
}

 

Future get()

public interface Future<V> {
    V get() throws InterruptedException, ExecutionException;
    
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
        
    //(. . .) 생략
}

 

3. Future get()

  • get()을 이용하여 Callable 작업의 결과 값을 받아 올 수 있습니다.
  • get() 메소드 호출 시 블로킹 콜(Blocking Call)이 발생하여 결괏값을 가지고 올 때까지 멈춥니다. 
  • 따라서 get() 메소드 호출 뒤에 작업은 Callable 작업이 끝날 때까지 기다려야 되는 문제가 생기게 됩니다.
// 0. 시간 체크
System.out.println("start: " + LocalDateTime.now());

// 1. Callable 작업(구현체) 생성 (5초 뒤 값 리턴)
Callable<String> implementLambda = () -> {
    Thread.sleep(5000);
    return "Lambda";
};

// 2. Thread Pool 생성
ExecutorService executorService = Executors.newCachedThreadPool();

// 3. submit()을 이용해 Callable 작업을 전달하고 Future 인스턴스를 리턴받음 (작업이 시작됨)
Future<String> submit = executorService.submit(implementLambda);

// 4. Future 인스턴스를 이용하여 Callable 작업 결과를 받음
String result = submit.get();

// 5. main Thread Job
System.out.println("main Thread Job: " + LocalDateTime.now()); // get() 호출 시 main Thread는 작업을 계속 기다려야 함

// 6. 잊지말고 Thread Pool 종료
executorService.shutdown();

 

[결과 값]

  • 결과 값을 보면 Callable 작업이 끝날 때까지 기다린 것을 알 수 있습니다. 
start: 2022-01-18T23:53:08.454156
main Thread Job: 2022-01-18T23:53:13.490110

3. Future isDone()

isDone() 메소드를 사용하여 Thread Pool에 submit 되어 실행 중인 Callable 작업이 언제 끝난 지를 확인할 수 있습니다. 

isDone() 반환 값이 true인 경우 get()으로 데이터를 가지고 갈 수 있다는 의미입니다.

// 0. 시간 체크
System.out.println("start: " + LocalDateTime.now());

// 1. Callable 작업(구현체) 생성 (5초 뒤 값 리턴)
Callable<String> implementLambda = () -> {
    Thread.sleep(5000);
    return "Lambda";
};

// 2. Thread Pool 생성
ExecutorService executorService = Executors.newCachedThreadPool();

// 3. submit()을 이용해 Callable 작업을 전달하고 Future 인스턴스를 리턴받음 (작업이 시작됨)
Future<String> submit = executorService.submit(implementLambda);

// 4. 작업이 끝난지 체크 후 끝난 경우 Future 인스턴스를 이용하여 Callable 작업 결과를 받음
if(submit.isDone()) {
    String result = submit.get();
    System.out.println("Callable Thread Job: " + result + LocalDateTime.now());
}

// 5. main Thread Job
System.out.println("main Thread Job: " + LocalDateTime.now()); // get() 호출 시 main Thread는 작업을 계속 기다려야 함

// 6. 다시 확인
while (true) {
    if (submit.isDone()) {
        String result = submit.get();
        System.out.println("Callable Thread Job: " + result + LocalDateTime.now());
        break;
    }
}

// 7. 잊지말고 Thread Pool 종료
executorService.shutdown();

 

[결과 값]

  • 결과 값을 보면 IsDone을 이용하여 Callable 작업이 끝나지 않으면 다른 작업을 먼저 처리하는 것을 알 수 있습니다.
start: 2022-01-19T00:13:32.656119
main Thread Job: 2022-01-19T00:13:32.689086
Callable Thread Job: 2022-01-19T00:13:37.695002

4. Future cancel()

  • cancel() 메소드 호출 시 해당 Callable 작업을 멈추고 isDone()를 호출할 경우 true로 반환합니다.
  • cancel() 메소드 호출로 인해 isDone 반환 값이 true가 된 것은 값을 가지고 갈 수 있다는 의미가 아닙니다. cancel() 호출해서 작업이 종료된 것입니다. 
// 1. Callable 작업(구현체) 생성 (5초 뒤 값 리턴)
Callable<String> implementLambda = () -> {
    Thread.sleep(5000);
    return "Lambda";
};

// 2. Thread Pool 생성
ExecutorService executorService = Executors.newCachedThreadPool();

// 3. submit()을 이용해 Callable 작업을 전달하고 Future 인스턴스를 리턴받음 (작업이 시작됨)
Future<String> submit = executorService.submit(implementLambda);

// 4. Callable 작업 끝 여부 확인
System.out.println("Callable Thread Job end? : " + submit.isDone());

// 5. Callable 작업 취소
boolean cancel = submit.cancel(false);

// 6. Callable 작업 끝 여부 확인
System.out.println("Callable Thread Job end? : " + submit.isDone());

// 7. 잊지말고 Thread Pool 종료
executorService.shutdown();

 

[결과값]

Callable Thread Job end? : false
Callable Thread Job end? : true

5. ExecutorService invokeAll()

  • 여러 Callable을 작업을 실행할 수 있습니다.
  • 모든 작업이 끝날 때까지 기다립니다.
ExecutorService executorService = Executors.newCachedThreadPool();

Callable<String> java = () -> {
    Thread.sleep(2000L);
    return "java";
};

Callable<String> spring = () -> {
    Thread.sleep(3000L);
    return "spring";
};

Callable<String> veneas = () -> {
    Thread.sleep(1000L);
    return "veneas";
};

// invokeAll은 모두 끝나길 기다렸다가 결과값을 가지고 옴
List<Future<String>> futures = executorService.invokeAll(Arrays.asList(java, spring, veneas));
for(Future<String> f : futures) {
    System.out.println(f.get());
}

executorService.shutdown();

 

[결과 값]

java
spring
veneas

6. ExecutorService invokeAny()

  • 여러 Callable을 작업을 실행할 수 있습니다.
  • 가장 먼저 작업이 종료된 결괏값만 반환합니다.
  • 싱글 스레드의 경우에는 먼저 실행된 것이 반환하게 됩니다.
  • 작업은 여러 개이지만 스레드가 부족하여 먼저 들어간 작업이 수행될 것이고 나머지 작업들은 Blocking Queue에서 대기하고 있게 됩니다. 
ExecutorService executorService = Executors.newCachedThreadPool();

Callable<String> java = () -> {
    Thread.sleep(2000L);
    return "java";
};

Callable<String> spring = () -> {
    Thread.sleep(3000L);
    return "spring";
};

Callable<String> veneas = () -> {
    Thread.sleep(1000L);
    return "veneas";
};

// invokeAny는 가장 빠르게 처리된 것만 반환하고 종료합니다. (싱글스레드의 경우에는 먼저 실행 된 것이 반환하게 됩니다.)
String firstResult = executorService.invokeAny(Arrays.asList(java, spring, veneas));
System.out.println(firstResult);

executorService.shutdown();

 

[결과 값]

[Executors.newCachedThreadPool() 경우]
veneas

[Executors.newSingleThreadExecutor() 경우]
java