스레드란 무엇인가: 생산자 소비자 문제-2 (8)

728x90

이전글

 

스레드란 무엇인가: 생산자 소비자 문제 (7)

이전글 스레드란 무엇인가: Spring Boot 모니터링을 하다 생긴 궁금증(6)단2025.05.18 - [Developer/JAVA] - 스레드란 무엇인가: Spring Boot 모니터링을 하다 생긴 궁금증(5) 스레드란 무엇인가: Spring Boot 모니터

doitwojae.tistory.com

 

섬네일

생산자 소비자 문제를 해결하기 위해서 자바 Object의 wait, notify에 대해서 이전 글에서 다루었습니다. Obejct의 문제점으로는 락을 반납하고 다음 스레드에게 신호를 주는 시점입니다. 생산자가 생산자를 깨워 비효율이 발생하거나, 한 스레드가 락을 획득하지 못해 기아 문제가 발생합니다. 

이 문제를 해결하기 위해 Lock의 Condition과 관련해 포스트를 작성하겠습니다.

 

🐻 Lock과 Condition

이전 포스트에서는 wait 메서드를 통해 스레드를 waiting set에 보관하였습니다. 이때 notify를 수행하면 대기하고 있는 스레드 중 하나가 깨어나게 됩니다. 만약 해당 스레드가 여전히 조건에 부합하지 않다면 다시 대기 상태로 돌아가게 됩니다.

 

🐻 기아 상태와 경쟁 상태

waiting set에서 대기 중인 스레드가 계속 notify를 받지 못하고 오랫동안 기다리는 것을 기아현상이라고 표현합니다. 이 현상을 막기 위해 notifyAll을 사용할 수 있습니다. 이는 불필요한 스레드까지 통지 알림을 받아 lock을 얻기 위한 경쟁상태에 들어가게 됩니다. 

이 경쟁 상태를 개선하기 위해 필요한 스레드를 구별해서 통지하는 것이 필요합니다. 이때 Lock과 Condition을 사용하여 선별적인 통지를 할 수 있습니다.

 

🐻 Lock 구현체

ReentrantLock

재진입이 가능한 Lock입니다. 가장 일반적인 Lock으로 특정 조건에서는 lock을 풀고 나중에 다시 lock을 얻고 임계영역으로 들어와서 이후의 작업을 수행할 수 있습니다.

생성자의 인자로 true를 주게 되면 lock이 풀렸을 때 가장 오래 기다린 스레드 순서대로 (FIFO) lock을 획득할 수 있게 공정 처리합니다. 단, 성능이 떨어지게 되는 단점이 있습니다.

Lock lock = new ReentrantLock();
Lock fairLock = new ReentrantLock(true);

 

ReentrantReadWriteLock

읽기를 위한 lock과 쓰기를 위한 lock을 제공합니다. 

 

StampedLock

lock을 걸거나 해지할 때 스탬프라는 정수값을 사용하며 읽기와 쓰기를 위한 Lock 외에 optimistic reading lock (낙관적 읽기)가 추가된 것입니다. 

 

🐻 Condition

Condition은 이미 생성된 Lock으로부터 waiting set을 생성합니다.

private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();

wait(), notify()의 경우 동일한 waiting pool을 공유하여 지정된 스레드로 notify() 지정이 불가능했습니다. condition을 사용하면 스레드 별로 지정하여 wait()notify()가 가능합니다.

Condition에서 제공하는 await(), signal()을 사용하면 됩니다.

 

🐻 Lock Condition 예제 코드

자바는 1.0부터 존재한 synchronized와 BLOCKED 상태를 통한 임계 영역 관리의 단점을 해결하기 위해 자바 1.5부터 Lock 인터페이스와 ReentrantLock 구현체를 제공합니다.

기존 코드에서 ReentrantLock 구현체를 사용해서 동일하게 구현합니다.

 

public class BoundedQueueV4 implements BoundedQueue {
	private final Lock lock = new ReentrantLock();
    private final Condition condition = lock.newCondition();
    
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;
    
    @Override
    public void put(String data) {
    	lock.lock();
        try {
        	while(queue.size() == max) {
            	try {
                	condition.await();
                } catch (InterruptedException e) {
                	throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            condition.signal();
        } finally {
        	lock.unlock();
        }
    }
    
    public String take() {
    	lock.lock();
        try {
        	while(queue.isEmpty()) {
            	try {
                	condition.await();
                } catch (InterruptedException e) {
                	throw new RuntimeException(e);
                }
            }
            String data = queue.poll();
            condition.signal();
            return data;
        } finally {
        	lock.unlock();
        }
    }
}

코드의 뎁스가 깊어지면서 가독성이 많이 떨어졌습니다. 코드의 핵심은 condition의 await과 signal입니다. 반복문을 통해서 스레드가 공유자원에 데이터를 삽입하거나 소비할 수 있는지를 대기하는 코드와 임계영역에 접근해서 실제 작업하는 로직을 분리해서 살펴보면 됩니다.

 

🐻 Condition

condition은 Lock을 사용하는 스레드가 대기하는 스레드 대기 공간입니다. 

condition.await()

Object.wait()와 유사한 기능입니다. 지정한 condition에 현재 스레드를 대기 상태로 보관합니다. 이때 획득한 락을 반납하고 대기 상태로 condition에 보관합니다.

 

condition.signal()

Object.notify()와 유사한 기능입니다. 지정한 condition에서 대기 중인 스레드를 하나 깨웁니다.

 

condition 스레드 대기 공간

아직 condition 스레드 대기 공간을 분리하지 않았기 때문에 기존의 결과와 동일합니다.

 

🐻 생상자 소비자 대기 공간 분리

분리된 condition 대기 공간

public class BoundedQueueV5 implements BoundedQueue {
	private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();
    private final Queue<String> queue = new ArrayDeque<>();
    private final int max;
    
    @Override
    public void put(String data) {
    	lock.lock();
        try {
        	while(queue.size() == max) {
            	try {
                	notFull.await();
                } catch(InterruptedException e) {
                	throw new RuntimeException(e);
                }
            }
            queue.offer(data);
            notEmpty.signal();
        } finally {
        	lock.unlock();
        }
    }
    
    @Override
    public String take() {
    	lock.lock();
        try {
        	while(queue.isEmpty()) {
            	try {
                	notEmpty.await();
                } catch (InterruptedException e) {
                	throw new RuntimeException(e);
                }
            }
            String data = queue.poll();
            notFull.signal();
            return data;
        } finally {
        	lock.unlock();
        }
    }
}

 

🐻 Condition 분리

  • notFull : 생산자 스레드를 위한 대기 공간
  • notEmpty : 소비자 스레드를 위한 대기 공간

변수명이 위와 같은 이유는 signal 메서드라고 생각됩니다. 대기 영역에서 대기하는 스레드에게 signal을 보낸다라는 뜻으로 notEmpty 더 이상 비어있지 않아라고 신호를 보내는 것으로 생각하면 이해가 됩니다. 반대로 생산자에게 notFull 더 이상 꽉 차지 않았다고 시그널을 보내는 것으로 생각하면 됩니다.

 

🐻 Object.notify() vs Condition.signal()

  • Object.notify()
    • 대기 중인 스레드 중 임의의 하나를 선택해서 깨웁니다. 스레드가 깨어나는 순서는 정의되어 있지 않으며, JVM 구현에 따라 다릅니다. 보통은 먼저 들어온 스레드가 먼저 수행되지만 구현에 따라 다를 수 있습니다.
    • synchronized 블록 내에서 모니터 락을 가지고 있는 스레드가 호출해야 합니다.
  • Condition.signal()
    • 대기 중인 스레드 중 하나를 깨우며, 일반적으로는 FIFO 순서로 깨웁니다. 이 부분은 자바 버전과 구현에 따라 달라질 수 있지만 보통 Queue 구조를 가지기 때문에 FIFO 순서로 깨웁니다.
    • ReentrantLock을 가지고 있는 스레드가 호출해야 합니다.

🐻 정리

지금까지 임계영역에 접근하고 생산자-소비자 문제에 대해서 다루었습니다. synchronized의 모니터 락을 이용하였을 경우의 문제점을 살펴보았고, 이를 해결하기 위해 Lock Condition을 분리하여 문제를 해결했습니다. 이러한 구현은 자바 라이브러리 java.util.concurrent 에 BlockingQueue라는 인터페이스와 구현체가 있습니다.

문제를 해결하는 과정에서 어떤 것이 문제이고 이를 해결하기 위해서 어떻게 하였는지를 살펴보며 자바에서 제공하는 다양한 동시성 컬렉션에 대해서 알 수 있는 기초를 다졌습니다. 다음으로는 자바 동시성 컬렉션에 대해서 다루어보겠습니다.

 

🐻 참조

티스토리 - Thread를 알아보자

인프런 - 김영한 자바 스레드