19장. 동시성

동시성concurrency을 지원하는 것은 점점 더 중요해지고 있다. 과거에는 일반적으로 동시성 프로그래밍이라고 하면, 상대적으로 속도가 느린 네트워크, 디스크, 데이터베이스, 기타 입출력 자원과 상호 작용함으로 인하여 전반적인 성능이 저하되지 않도록 하는 것을 의미했다. 병렬성이란 슈퍼 컴퓨터에서 실행되는 과학 계산과 같은 영역에 한정된 것이었다.

그러나 이제는 업무에 새로운 변수가 생겨났다. 반도체 산업은 무어의 법칙을 따라 기하 급수적으로 칩의 밀도를 증가시켜가고 있다. 또한 마찬가지로 칩의 설계자들은 개별 CPU의 속도를 점점 높여가고 있다. 여러 이유로 해서 이러한 낡은 접근 방법은 더 이상 먹히지 않게 되었고, 이제 칩 설계자는 더 많은 CPU와 하드웨어 스레드를 가진 칩을 만들어내고 있다. 실행 속도를 높인다는 것은 하드웨어적인 병렬 처리를 뜻하게 되었고, 이제 우리 소프트웨어 개발자들도 그러한 일을 하게 되었다.

여기서 자바 환경이 도움이 될 수 있다. 오늘날 자바 플랫폼은 코드를 동시에 수행하는데 있어 더할 나위 없이 든든한 환경을 제공하며, 자이썬에서도 이러한 점을 활용할 수 있다. 문제는 동시성 코드를 작성하는 것이 그리 쉽지만은 않다는 점이다. 오늘날 하드웨어가 직면한 스레드 기반의 동시성 모델을 감안할 때 이것은 특히 그러하다.

스레드들 간에 공유하는 가변mutable개체의 존재로 인하여 스레드의 안전에 문제가 일어날 수 있음을 고려해야 한다. (함수형 프로그래밍에서는 가변 상태를 회피할 수 있으나, 파이썬에서는 아무리 하찮은 코드에서도 피해나가기가 쉽지 않을 것이다.) 동기화synchronization를 통하여 동시성 문제를 풀어나가려고 하면 또 다른 문제에 빠져들게 된다. 잠재적인 성능 문제는 제쳐두더라도, 데드락과 라이브락이 발생할 수 있는 것이다.

Note

(핫스팟과 같은) JVM 구현은 때로 동기화로 인한 부하를 피할 수 있다. 이 장의 후반부에서 이러한 시나리오를 위하여 무엇이 필요한지 논의하도록 하겠다.

이러한 모든 점을 감안할 때, 스레딩을 올바로 구현하기는 너무나 어렵다는 것이 중론이지만, 우리 생각은 다르다. JVM 상에서 쓸만한 동시성 시스템이 있으며, 그것은 자이썬으로 작성된 앱도 포함한다. 코드를 성공적으로 작성하기 위한 요건은 다음과 같다.

  • 동시성을 단순하게 유지한다.
  • 스레드 풀에 대응할 수 있도록 태스크를 사용한다.
  • 가능한 한 불변immutable 개체를 사용한다.
  • 가변 개체를 불필요하게 공유하지 않는다.
  • 가변 개체의 공유를 최소화한다. 대기열queue 및 그에 관련된 개체, 예컨대 동기화 장벽synchronization barrier과 같은 것은 스레드 간에 개체를 넘겨줄 수 있는 구조를 제공한다. 이를 통하여 어떤 개체의 상태가 변경되는 순간에는 단 하나의 스레드에만 노출시키는 것이 가능하다.
  • 방어적으로 코딩한다. 태스크에 끼어들거나interrupt 취소cancel할 수 있도록 작성한다. 시간 제한(timeout)을 둔다.

자바 API와 파이썬 API

동시성 코드를 작성하는데 있어서 자바 플랫폼에 대한 의존성을 어느 정도까지 갖도록 구현할 것인가를 고려해야 한다. 우리의 권고사항은 다음과 같다.

  • 동시성을 사용하는 기존의 파이썬 코드 기반을 이식하는 경우라면, 단지 파이썬 표준 스레딩 모듈을 사용하면 된다. 자이썬 스레드는 항상 자바 스레드와 대응되기 때문에, 그러한 코드도 자바와 상호 운용 가능하다. (이전에 자바로 개발해왔다면, 자바에 실질적인 기초를 두고 있는 이러한 API를 알고 있을 것이다).
  • 자이썬은 자바의 ConcurrentHashMap를 사용하여 사전dict과 집합set을 구현한다. 이는, 이러한 표준 파이썬 타입을 사용하기만 해도 고성능의 동시성을 구현할 수 있음을 의미한다. (뒤에서 설명하겠지만, 그것들은 CPython에서와 마찬가지로 원자적이다.)
  • 또한 java.util.concurrent의 컬렉션을 사용할 수도 있다. 애플리케이션의 요구에 맞다면 CopyOnWriteArrayList와 ConcurrentSkipListMap과 같은 컬렉션을 사용할 것을 고려해보기 바란다(자바 6에서 지원). 구글 컬렉션 라이브러리도 자이썬에 잘 어울리므로 고려해볼 만하다.
  • 직접 만들기보다는 자바에서 제공하는 고수준 원시 타입을 사용한다. 특히 스레드 풀에 대하여 태스크를 수행 및 관리하는 실행자 서비스의 경우에는 두말할 필요가 없다. 예를 들자면, threading.Timer의 사용을 피하는 것이다. 왜냐면 timed execution services로 대체 가능하기 때문이다. threading.Condition 및 threading.Lock은 여전히 사용한다. 특히, 이러한 구조체들은 with 문에서 사용하는 것에 최적화되어 있다.

실용적인 관점에서, 자바에서 지원하는 고수준의 원시 타입을 사용하는 것이 코드의 이식성에 많은 영향을 끼치지 않는다. 국부적으로 태스크를 사용하는 것은 이를 잘 분리되도록 하며, 그러한 스레드 안전에 대한 고려로서의 스레드 한정thread confinement이나 안전 발행safe publication도 마찬가지이다.

끝으로, 언제나 믹스 & 매치를 할 수 있음을 기억하기 바란다.

스레드로 작업하기

스레드를 생성하는 것은, 어떻게 보면 너무나 쉽다. 이 예제는 웹 페이지를 동시에 다운로드하는 것이다.

예제 19-1. test_thread_creation.py

from threading import Thread
import urllib2

downloaded_page = None # 전역

def download(url):
    """Download ``url`` as a single string"""
    global downloaded_page

    downloaded_page = urllib2.urlopen(url).read()
    print "Downloaded", downloaded_page[:200]


def main_work():
    # 바쁜 작업을 병렬로 수행
    print "Started main task"
    x = 0
    for i in xrange(100000000):
        x += 1
    print "Completed main task"

if __name__ == '__main__':
    # 뒤(background)에서 내려받기를 수행
    Thread(target=lambda: download("http://www.jython.org")).start()
    main_work()

함수를 부주의하게 호출하지 않도록 주의한다. target 인자는 함수 개체에 대한 참조(보통의 함수라면 이름)를 가진다. 여러분의 대상target 함수가 재미난 버그를 만들지는 않기 때문에 처음에는 모든 것이 괜찮아 보인다. 그러나 동시성은 일어나지 않는데, 그것은 함수가 실제로는 새로운 스레드가 아니라 이 새로운 스레드에 의해서 수행이 되기 때문이다.

대상 함수는 일반 함수가 될 수도 있고, 호출 가능한 개체(__call__을 구현하는 개체)가 될 수도 있다. 후자의 경우는 대상이 함수 개체라는 것을 알아보기가 더 어려울 수도 있다!

스레드가 완료될 때까지 기다리려면, join을 호출한다. 이를 통하여 동시성의 결과를 얻을 수 있다. 유일한 문제는 결과를 구하는 것이다. 앞으로 살펴보겠지만, 자이썬에서 값을 변수에 넣는 것은 안전하기는 해도 썩 좋은 방법은 아니다.

Note

데몬 스레드는 스레드의 생명주기를 관리하는 좋은 대안이다. 스레드는 시작되기 전에 데몬 스레드로 지정된다.

# 스레드 t를 생성

t.setDaemon(True)t.start()

데몬 상태는 임의의 자식 스레드에 의해 상속된다. JVM이 종료되면 어떤 데몬 스레드가 클린업이나 순서에 의한 정지를 할 새가 없이 —또는 그럴 필요없이— 단순히 종료된다.

이렇듯 클린업이 부족하다는 것은 데몬 스레드가 데이터베이스 연결이나 파일 핸들과 같은 어떠한 외부 자원도 점유하지 않는 것이 중요함을 뜻한다. 그러한 자원들은 JVM 종료시 제대로 닫히지 못할 것이다. 비슷한 이유로, 데몬 스레드가 import를 시도할 경우 자이썬의 정상적인 정지를 방해할 수 있기 때문에 절대 그렇게 하지 않도록 한다.

프로덕션에서는, 데몬 스레드의 유일한 용례는 대개 하우스키핑과 같은 일을 위해 메모리 내의 개체에 대하여 제한적으로 사용되는 것이다. 예를 들어, 캐시를 유지하거나 색인을 처리하는 데 사용할 수 있다.

그렇기는 해도, 데몬 스레드는 어떤 아이디어를 구현함에 있어서는 확실히 편리하다. 아마도 여러분의 프로그램에 대한 수명주기 관리 방법은 바로 “컨트롤-C”를 눌러서 종료하기일 것이다. 일반적인 스레드와 달리, 데몬 스레드를 실행하는 것은 JVM 정지를 방해하지는 않는다. 마찬가지로, 마지막 예제는 데드락이 이러한 데드락 걸린 스레드를 기다리지 않고 셧다운 할 수 있도록 하기 위해 데몬 스레드를 사용하는 것을 보여준다.

일반적으로 데몬 스레드를 사용하지 않는 것이 최선임을 기억하라. 최소한, 그것들을 사용하기 전에는 진지하게 검토하기 바란다.

스레드 로컬

threading.local 클래스는 각 스레드가 공유 환경 내에서도 자신만의 인스턴스를 가질 수 있도록 해준다. 사용법은 믿기지 않을 만큼 간단하다. 그저 threading.local의 인스턴스나 하위 클래스를 생성하고, 변수 또는 다른 이름을 할당하는 것이다. 이 변수는 글로벌일 수도 있고, 다른 이름 공간의 일부일 수도 있다. 지금까지는, 이것은 파이썬 내의 여느 개체들과 다를 바 없다.

그리하여 스레드는 변수를 공유할 수 있는데, 그 방법이 특별하다. 각 스레드에게는 개체가 서로 다른 모습으로 보이게 된다. 이 개체는 다른 스레드에서는 보이지 않는 임의의 속성을 추가할 수 있다.

다른 옵션으로는 threading.local의 하위 클래스를 만들 수 있다. 늘 그렇듯이, 이는 기본값을 정의하고, 또 보다 뉘앙스를 띠는 속성 모델을 정의하도록 해준다. 하지만 한 가지 특별하면서도 잠재적으로 상당한 유용성을 갖고 있는 것은, __slots__에 정의된 어떠한 속성이라도 스레드 간에 공유할 수 있다는 점이다.

그렇지만 스레드 로컬을 가지고 작업하는데에는 큰 문제점이 있다. 스레드가 올바른 범위 내에 있지 않기 때문에 납득이 되지 않는 것이 보통이지만, 개체 또는 함수는 특별히 괄호를 통한다. 스레드 로컬을 사용한다는 것은, 암시적으로 스레드가 작업을 분할하는 모델을 채택하고 있다는 것을 의미한다. 그런 다음 주어진 작업의 일부를 스레드에 맡기는 것이다. 이는 스레드를 사용한 후에 뒤처리를 필요로 하므로 스레드 풀의 사용에 문제를 일으킨다.

Note

사실, 우리는 바로 이 문제를 자이썬 실행환경에서 겪었다. 일정 부분은 파이썬 코드를 실행할 수 있도록 만들어져야 한다. 과거에는, 우리는 스레드로부터 이 “ThreadState”를 보았다. 역사적으로, 이는 사실상 빠른 것이었을 수도 있으나, 이제는 속도저하를 일으키며, 스레드가 할 수 있는 것에 대하여 불필요한 제약이다. 자이썬의 future 리팩토링을 통하여 “ThreadState”를 완전히 제거함으로써 보다 빠르고 깔끔하게 만들 것이다. 특정 케이스에서는 스레드 로컬이 유용할 수 있다고 한다. 일반적인 시나리오 하나는 여러분이 작성하지 않은 구성요소에 의해 여러분의 코드가 호출되는 것이다. 또는 스레드 로컬 싱글톤에 접근할 필요가 있을 수도 있다. 또한 스레드 로컬을 꼭 필요로 하는 아키텍처 때문에 어쩔 수 없이 사용해야 하는 경우도 물론 있을 것이다.

그렇지만 이것은 종종 불필요하다. 독자의 코드는 다를 수도 있겠지만, 파이썬에는 먼 곳에서의 행위를 피할 수 있는 좋은 도구가 있다. 괄호, 장식자, 때로는 원숭이 패치 모듈까지도 선별적으로 사용할 수 있다. 동적 언어인 동시에, 메타프로그래밍을 강력하게 지원하는 파이썬을 십분 활용하고, 자이썬 구현은 자바 코드에 접근할 수 있다는 점을 기억하라.

끝으로, 스레드 로컬은 흥미롭다. 태스크 기반 모델에서는 잘 작동하지 않는데, 그것은 장차 임의의 태스크에 할당되어버릴 컨텍스트를 작업자 스레드와 연관시키고 싶지 않기 때문이다. 신경을 바짝 쓰지 않으면, 엉망이 되어버리기 쉽다.

전역 인터프리터 잠금의 부재

자이썬에는 CPython에 구현되어 있는 전역 인터프리터 잠금 (GIL, Global Interpreter Lock)이 없다. CPython에 있어서, GIL이란 한 시점에 단 하나의 스레드만이 파이썬 코드를 수행시킬 수 있음을 뜻한다. 이러한 제한은 또한 런타임 지원뿐만 아니라 GIL을 릴리스하지 않은 확장 모듈에서도 똑같이 적용된다. (CPython으로부터 GIL을 제거하기 위해 애쓰고 있지만, 안타깝게도 그로 인하여 파이썬의 실행 속도가 현저히 감소되는 결과를 가져왔다.)

GIL로 인하여 CPython 프로그래밍에서는 스레드가 자이썬에서만큼 유용하지 않다. 동시성이 적용될 수 있는 곳은, 계산이 CPython의 런타임 외부에 있는 자료 구조에 의하여 관리되는 확장 모듈에서만 수행되는 것과 같은 시나리오에 한정될 것이다. 대신에, 개발자는 GIL의 엄격함을 교묘하게 피하기 위하여 프로세스 기반 모델을 사용하려고 할 것이다.

다시 말하지만, 자이썬에는 GIL의 구속이 없다. 이것은 모든 파이썬 스레드가 자바 스레드에 대응되며 자바 표준 쓰레기 수거(GC, garbage collection)의 지원을 받기 때문이다(CPython에서 GIL을 필요로 하는 주된 이유가 참조를 세는 GC 시스템 때문이다). 파이썬으로 쓰여진 계산 집약적인 태스크에 스레드를 사용할 수 있다는 중요한 결론에 도달하게 된다.

모듈 가져오기 잠금

그런데 파이썬에서는 모듈 가져오기 잠금(module import lock)을 정의하며, 이는 자이썬에서 구현되어 있다. 이러한 잠금은 어떠한 이름이든 가져올 때마다 이루어지게 된다. 이것은 import 문을 통하든, 그와 동등한 __import__ 빌트인을 쓰든, 혹은 관련된 코드를 쓰든 간에 성립한다. 중요한 점이 있는데, 간단히 말해서, 해당 모듈을 이미 가져온 이후라 하더라도, 모듈 가져오기 잠금은 여전히 이루어질 것이다.

그러므로 핫 루프에서 이러한 코드는 작성하면 안된다. 특히 스레드를 쓰는 코드에서는.

예제 19-2.

def slow_things_way_down():
    from foo import bar, baz
    ...

위의 코드는 가져오기를 지연시킨다는 점에서는 의미가 있을 것이다. 이러한 지연을 통하여 앱의 초기 시작 시간을 줄일 수 있기 때문이다. 하지만 잠금으로 인한 문제를 예방하려면 스레드가 이런 식으로 가져오기를 수행하는 것은 단일 스레드에 한정되어야 함을 염두에 두기 바란다. 지연된 import를 백그라운드 스레드에서 수행하도록 코드를 작성한다면 괜찮을 것이다.

예제 19-3. background_import.py

from threading import Thread
import time

def make_imports():
    import unicodedata

background_import = Thread(target=make_imports)
background_import.start()

print "Do something else while we wait for the import"
for i in xrange(10):
    print i
    time.sleep(0.1)
print "Now join..."
background_import.join()

print "And actually use unicodedata"
import unicodedata

위에서 보듯이, 가져오기를 최소 두 번 - 한번은 백그라운드에서, 또 한번은 모듈의 이름공간이 사용되는 실제 위치에서 - 수행할 필요가 있다.

여기서 우리는 모듈 가져오기 잠금이 왜 필요한지 알 수 있다. import가 처음 수행될 때, import 프로시져는 모듈의 최상위 함수를 수행한다. 많은 모듈이 선언적이기는 하지만, 파이썬에서는 모든 정의가 실행시간에 수행된다. 이러한 정의는 잠재적으로 더 많은 가져오기 (재귀적인 가져오기)를 포함할 것이며, 최상위 함수는 필시 더 복잡한 일을 수행하게 될 것이다. 모듈 가져오기 잠금이 있음으로해서 이러한 셋업을 단순화시켜 안전한 발행을 이끈다. 이 개념에 대해서는 이 장의 뒤에서 더 논의하겠다.

현재의 구현에서, 모듈 가져오기 잠금은 자이썬 런타임 전체에 대하여 글로벌하다는 것을 기억하라. 향후에는 변경될 수도 있다.

태스크를 가지고 작업하기

스레드의 생명주기를 직접 관리하는 것은 되도록 피하는 것이 좋으며, 그보다는 태스크 방식의 추상화가 바람직하다.

태스크(tasks)란 비동기적으로 수행되는 계산을 말한다. 다른 방법도 있기는 하지만, 개체를 수행시키기 위해서는 자바의 Callable 인터페이스를 구현하여야 한다. 그것이 파이썬의 메소드 또는 함수에 가장 잘 대응되기 때문이다. 태스크의 상태는 생성, (실행자에게) 제출, 시작, 완료로 변해간다. 또한 취소하거나 끼어드는 것도 가능하다.

실행자(executors)는 스레드의 집합을 사용하여 태스크를 수행시킨다. 이것은 한개의 스레드일 수도 있고, 스레드 풀일 수도 있고, 또는 현재 제출되어 있는 모든 태스크를 돌리는 데 필요한 만큼 많은 스레드일 수도 있다. 어느 것을 선택하느냐는 실행자의 정책에 따르지만, 일반적으로 하나의 스레드 풀을 써서 동시성을 어느 정도 제어하기를 원할 것이다.

Futures는 코드가 필요로 할 때에만 태스크 내에서 계산 결과에, 혹은 예외가 발생한 경우에는 예외에 접근할 수 있도록 해준다. 그 시점까지, 사용하는 코드는 그 태스크와 동시에 수행될 수 있다. 준비가 되어있지 않으면, wait-on 종속성이 도입된다.

우리는 웹 페이지를 다운로드하는 예제를 통하여 이러한 기능을 어떻게 사용하는지 살펴보려고 한다. 작업하기 쉽고, 다운로드 상태를 추적하며, 타이밍에 대한 정보도 알 수 있도록 구현해보겠다.

예제 19-4. downloader.py

import threading
import time
import urllib2
from java.util.concurrent import Callable

class Downloader(Callable):
    def __init__(self, url):
        self.url = url
        self.started = None
        self.completed = None
        self.result = None
        self.thread_used = None
        self.exception = None

    def __str__(self):
        if self.exception:
             return "[%s] %s download error %s in %.2fs" % \
                (self.thread_used, self.url, self.exception,
                 self.completed - self.started, ) #, self.result)
        elif self.completed:
            return "[%s] %s downloaded %dK in %.2fs" % \
                (self.thread_used, self.url, len(self.result)/1024,
                 self.completed - self.started, ) #, self.result)
        elif self.started:
            return "[%s] %s started at %s" % \
                (self.thread_used, self.url, self.started)
        else:
            return "[%s] %s not yet scheduled" % \
                (self.thread_used, self.url)

    # Callable 인터페이스 구현에 필요.
    # 어떠한 예외든지 ExecutionException 또는 InterruptedException으로 감싸짐
    def call(self):
        self.thread_used = threading.currentThread().getName()
        self.started = time.time()
        try:
            self.result = urllib2.urlopen(self.url).read()
        except Exception, ex:
            self.exception = ex
        self.completed = time.time()
        return self

자이썬에서는 데이터베이스 질의이건 파이썬으로 작성된 계산 집약적인 태스크이건, 어떠한 태스크라도 이런 식으로 할 수 있다. Callable 인터페이스를 지원하기만 하면 된다.

다음으로 우리는 futures를 생성할 필요가 있다. future가 완료될 때, 호출자에게는 결과가 반화되거나, 예외가 던져진다. 그 중 한 가지 경우로서 이러한 예외가 발생할 수 있다.

  • InterruptedException
  • ExecutionException. 코드에서는 원인의 속성을 가지고 밑바탕에 깔린 예외를 검출할 수 있다.

(이렇게 비동기적 호출자로 예외를 밀어내는 것은, send가 호출되었을 때 동시 루틴이 작동하는 방식과 비슷하다.)

우리는 이제 여러 개의 웹 페이지를 동시에 다운로드하는 데에 필요한 것을 갖추었다.

예제 19-5. test_futures.py

from downloader import Downloader
from shutdown import shutdown_and_await_termination
from java.util.concurrent import Executors, TimeUnit

MAX_CONCURRENT = 3
SITES = [
    "http://www.cnn.com/",
    "http://www.nytimes.com/",
    "http://www.washingtonpost.com/",
    "http://www.dailycamera.com/",
    "http://www.timescall.com/",
    ]

pool = Executors.newFixedThreadPool(MAX_CONCURRENT)
downloaders = [Downloader(url) for url in SITES]
futures = pool.invokeAll(downloaders)

for future in futures:
    print future.get(5, TimeUnit.SECONDS)

shutdown_and_await_termination(pool, 5)

반환된 future에 대하여 메소드를 얻을 때까지, 호출자는 이 태스크와 동시에 실행된다. get이 호출된 다음 태스크 완료에 대한 wait-on 종속성을 도입한다. (그래서 이것은 지원 스레드에 join을 호출하는 것과 비슷하다.)

스레드 풀을 닫는 것은 풀에 shutdown 메소드를 호출하는 것만큼 간단하다. 그렇지만 이 shutdown이 코드 내의 특별한 시점에 일어날 수 있음에 주의를 기울일 필요가 있다. 여기, 표준 자바 문서에 제공된 자이썬 버전의 튼튼한 shutdown 함수인, shutdown_and_await_termination 함수가 있다.

예제 19-6. shutdown.py

from java.util.concurrent import TimeUnit

def shutdown_and_await_termination(pool, timeout):
    pool.shutdown()
    try:
        if not pool.awaitTermination(timeout, TimeUnit.SECONDS):
            pool.shutdownNow()
            if (not pool.awaitTermination(timeout, TimeUnit.SECONDS)):
                print >> sys.stderr, "Pool did not terminate"
    except InterruptedException, ex:
        # 현재 스레드도 끼어들기를 당하면 (다시) 취소
        pool.shutdownNow()
        # 끼어들기 상태를 보존
        Thread.currentThread().interrupt()

CompletionService 인터페이스는 future를 가지고 작업하는 데에 있어 훌륭한 추상화를 제공한다. 시나리오는 다음과 같다. 우리의 코드가 invokeAll을 가지고 했던 것처럼 모든 futures가 완료될 때까지 기다린다든지, 그것들을 폴링하는 것이 아니라, 그것들이 완료되는 대로 completion 서비스가 동기화된 대기열에 밀어넣는 것이다. 그러면 이 대기열은 한 개 또는 그 이상의 스레드를 실행시키는 소비자에 의해 소비가능하게 된다.

예제 19-7. test_completion.py

from downloader import Downloader
from shutdown import shutdown_and_await_termination
from java.util.concurrent import Executors, ExecutorCompletionService
import os
import hashlib

MAX_CONCURRENT = 3
SITES = [
    "http://www.cnn.com/",
    "http://www.nytimes.com/",
    "http://www.washingtonpost.com/",
    "http://www.dailycamera.com/",
    "http://www.timescall.com/",
    # 존재할 가능성이 희박한 웹 사이트 이름을 무작위로 생성
    "http://" + hashlib.md5(
        "unlikely-web-site-" + os.urandom(4)).hexdigest() + ".com",
    ]

pool = Executors.newFixedThreadPool(MAX_CONCURRENT)
ecs = ExecutorCompletionService(pool)

# 이 함수는 다음의 루트들로부터 링크를 spider할 수 있다.
# 여기서는 이 루트들을 직접 스케줄한다.
def scheduler(roots):
    for site in roots:
        yield site

# 무기한으로 작업을 제출
for site in scheduler(SITES):
    ecs.submit(Downloader(site))

# 사용할 수 있게 되는대로 result를 가지고 작업
submitted = len(SITES)
while submitted > 0:
    result = ecs.take().get()
    # 여기서는 result를 어떻게 처리할 지에 대하여 그다지 상상력을 발휘하지 않았지만,
    # bautiful soup과 같은 도구를 사용하여 구문 분석을 할 수 있을 것이다.
    print result
    submitted -= 1

print "shutting pool down..."
shutdown_and_await_termination(pool, 5)
print "done"

이 설정은 자연스러운 흐름을 가능하게 한다. 완료 서비스의 대기열을 통해 모든 것을 스케줄링하고 싶은 유혹이 있겠지만, 한계가 있다. 예를 들어, 확장성 있는 웹 스파이더를 작성한다면, 이 작업(work) 대기열을 외부로 빼고 싶을 것이지만, 단순한 관리를 위해서는 이 정도면 족하다.

Note

왜 스레드 대신에 태스크를 사용하는가? 프로덕션 코드에서 너무나 자주 나타나는 공통적인 용례는 스레드를 위험한 방법으로 추가하는 것이다.

이기종 스레드. 데이터베이스를 쿼리 스레드를 하나쯤 만들 것이다. 또 다른 스레드는 연관된 인덱스를 재구축한다. 여기에 또 다른 질의를 추가하면 무슨 일이 벌어질까?

종속성은 포멀하게 구조화되기보다는 여러 경로를 통해 관리된다. 타이머와 기타 이벤트 소스가 뒤섞인 다양한 개체에 대하여 스레드가 동기화되다보면 결국 쥐덫과 같은 결과로 돌아올 수 있다.

명시적인 wait-on 종속성과 시간 스케줄링을 갖는 태스크를 사용하는 것보다는, 이러한 셋업 작업을 통하여 보다 구축하기 쉽고 확장성 있는 단순한 시스템을 구축할 수 있다.

스레드 안전

스레드 안전은 다음과 같은 질문을 제기한다.

  • (의도하지 않은) 상호작용이 둘 이상의 스레드에서 일어남으로 인하여 가변 개체에 손상을 끼칠수 있는가? 이것은 목록이나 사전과 같은 컬렉션에 대하여 특히 더 위험한데, 그 이유는 그러한 손상이 잠재적으로 기반 자료 구조의 사용을 불가능하게 만들거나 혹은 자료를 읽어들이는 동안 무한 반복을 야기할 수 있기 때문이다.
  • 갱신을 처리하던 것을 잃어버릴 수 있는가? 카운터를 증가시키는 것이 뚜렷한 예가 될 것이다. 이 경우, 현재의 값을 읽어들이고 나서 증가된 값으로 갱신하려는 동안, 다른 스레드와 자료 경합이 벌어질 수 있다.

자이썬은 사전, 목록, 집합과 같은 가변 개체가 손상되지 않음을 보증한다. 그렇지만 자료 경합으로 인하여 갱신 처리를 잃어버릴 수는 있다.

그렇지만, 아마도 여러분의 코드에서 사용하고 있을 지 모르는, 다른 자바 컬렉션 개체는 그러한 무손상 보증을 하지 않는다. 정렬된 사전을 지원하기 위하여 LinkedHashMap을 사용할 필요가 있다면, 그것이 공유도 되면서 변이도 일으킨다면 스레드 안전에 대한 고려가 필요할 것이다.

여기에 우리 예제에서 사용할 간단한 테스트 장치가 있다. ThreadSafetyTestCase는 unittest.TestCase에서 파생되며, assertContended 메소드를 추가로 갖는다.

예제 19-8. threadsafety.py

import threading
import unittest

class ThreadSafetyTestCase(unittest.TestCase):

    def assertContended(self, f, num_threads=20, timeout=2., args=()):
        threads = []
        for i in xrange(num_threads):
            t = threading.Thread(target=f, args=args)
            t.start()
            threads.append(t)
        for t in threads:
            t.join(timeout)
            timeout = 0.
        for t in threads:
            self.assertFalse(t.isAlive())

이 새로운 방법은 대상 함수를 실행하고 모든 스레드가 제대로 종료될 것으로 단언assert한다. 그런 다음 테스트 코드는 다른 불변식이 있는지 확인한다.

일례로, 우리는 이 아이디어를 자이썬에서 목록의 유형이 원자적인 어떤 연산을 테스트하는데에 사용한다. 아이디어는 한 가지 연산을 수행하는 여러 연산을 순서대로 실행한 다음, 그것을 거꾸로 실행하는 것이다. 한 걸음 앞으로, 뒤로 한 걸음. 최종 결과는 처음 시작한, 테스트 코드가 단언하는, 빈 목록으로 돌아와야한다.

예제 19-9. test_list.py

from threadsafety import ThreadSafetyTestCase
import threading
import time
import unittest

class ListThreadSafety(ThreadSafetyTestCase):

    def test_append_remove(self):
        lst = []
        def tester():
            # adding에 의하여 invariant를 보존한 다음, 고유한 값
            # (이 경우에는, 이 함수를 수행시키는 일꾼 스레드에
            # 대한 참조)을 제거
            ct = threading.currentThread()
            for i in range(1000):
                lst.append(ct)
                time.sleep(0.0001)
                lst.remove(ct)
        self.assertContended(tester)
        self.assertEqual(lst, [])


if __name__ == '__main__':
    unittest.main()

물론 불변 개체에 대해서는 이러한 우려를 할 필요가 전혀 없다. 문자열처럼 일반적으로 사용되는 객체, 숫자, datetimes는 튜플, 그리고 동결 집합은 불변이며, 여러분도 자신만의 불변 개체를 만들 수 있다.

스레드 안전 문제를 해결하는 여러 가지 다른 전략이 있다. 우리는 다음과 같은 것들을 살펴볼 것이다.

  • 동기화
  • 원자성
  • 스레드 한정
  • 안전 발행

동기화

우리는 동기화된 자원에 대응하는 코드 블록으로 진입하는 스레드를 제어하기 위하여 동기화를 사용한다. 올바른 동기화 프로토콜을 사용한다고 가정한다면, 이러한 제어를 통하여 자료 경합을 방지할 수 있다. (이러한 가정에는 무리가 따를 수도 있다!)

threading.Lock은 단 하나의 스레드만이 진입하는 것을 보장한다. (자이썬에서는 Cpython과는 달리, 그러한 잠금은 늘 reentrant이다.threading.Lock과 threading.RLock 사이에 구분이 없다.)다른 스레드는 그 스레드가 잠금을 끝낼 때까지 기다려야한다. 그러한 명시적인 잠금이 가장 단순하면서도 이식성이 높은 동기화 방법일 것이다.

일반적으로 그러한 잠금의 진입과 종료를 with 문으로 관리해야한다. 여의치 않을 경우에는 try-finally를 반드시 사용하여 코드 블록을 빠져나올 때에는 항상 잠금이 해제됨을 확실히 해두어야한다.

다음은 with 구문을 사용하는 예제 코드이다. 이 코드는 잠금을 하나 정해두고, 그것을 몇몇 태스크와 나눠갖는다.

예제 19-10. test_lock.py—LockTestCase.test_with_lock

def test_with_lock(self):
        counter = [0]
        lock = Lock()
        def loop100(counter):
            for i in xrange(100):
                with lock:
                    counter[0] += 1
                # 테스트 내의 모든 스레드가 수행되도록 하기 위하여 sleep을 사용
                time.sleep(0.0001)

        self.assertContended(loop100, args=(counter,), num_threads=20)
        self.assertEqual(counter[0], 2000) # 20 threads * 100 loops/thread

대안으로서, try-finally를 사용하여 구현할 수도 있다.

예제 19-11. test_lock.py—LockTestCase.test_try_finally_lock

def test_try_finally_lock(self):
        counter = [0]
        lock = Lock()
        def loop100(counter):
            for i in xrange(100):
                lock.acquire()
                try:
                    counter[0] += 1
                finally:
                    lock.release()
                time.sleep(0.0001)

        self.assertContended(loop100, args=(counter,), num_threads=20)
        self.assertEqual(counter[0], 2000)

하지만 이렇게는 하지 않도록 하라. with 구문에 비하여 느리기도 하고, with 구문을 사용한 버전이 보다 파이썬 코드답다.

자이썬에서만 적용 가능한 또 다른 방법으로서, synchronize 모듈을 사용할 수 있다. 이 모듈은 장식자(decorator) 함수인 “make_synchronized”를 제공하여, 자이썬에서 동기화된 블록 내의 어떠한 호출 가능한 것이든 감싸준다.

예제 19-12. test_synchronized.py

from synchronize import make_synchronized
from threadsafety import ThreadSafetyTestCase
import time
import unittest

@make_synchronized
def increment_counter(counter):
    counter[0] += 1
    # 테스트 내의 모든 스레드가 수행되도록 하기 위하여 sleep을 사용
    time.sleep(0.0001)

class SynchronizedTestCase(ThreadSafetyTestCase):

    def test_counter(self):
        def loop100(counter):
            for i in xrange(100):
                increment_counter(counter)

        counter = [0]
        self.assertContended(loop100, args=(counter,), num_threads=20)
        self.assertEqual(counter[0], 2000) # 20 스레드 * 100 루프/스레드

if __name__ == '__main__':
    unittest.main()

이 경우에는 아무 것도 명시적으로 해제하지 않아도된다. 예외에 대해서도, 함수가 복귀할 때에는 항상 동기화 잠금을 해제하게 된다. 다시 말하지만, 이 버전은 with 문 형태에 비하여 느리며, 명시적인 잠금을 사용하지 않는다.

Note

동기화와 with 문

자이썬의 현재 런타임(2.5.1)은 런타임 지원 및 컴파일 측면 양쪽을 통하여 보다 효율적으로 with 문을 실행할 수 있다. 그 이유는 다음의 두 가지 조건을 만족하는 한, 대부분의 JVM들은 동기화 오버헤드를 피하기 위해 코드 (모든 인라이닝을 포함하여 컴파일 단위)의 청크에 대한 분석을 수행할 수 있기 때문이다. 첫째, 청크는 lock과 unlock을 모두 가져야 한다. 둘째, 청크는 JVM이 분석을 수행하기에 너무 길지 않아야 한다. with 구문은 자바 런타임 reflection의 오버헤드를 피하는 데에 있어서 threading.Lock과 같은 내장 유형을 가지고 작업하는 것을 상대적으로 쉽게 만들어준다.

앞으로, 새로운 invokedynamic 바이트 코드의 지원은 이러한 성능 차이를 축소해야 한다.

threading 모듈은 이식성을 제공하지만 그것도 최소한에 그친다. threadin의 래퍼 대신에 Java.util.concurrent의 synchronizers를 사용하기를 원할 수도 있다. 특히, 시간제한을 두어 잠금을 대기하고자 하는 경우 이러한 접근이 필수적이다. 또한 기본 자바 개체가 원하는 동기화를 가지고 있음을 확실히 하기 위해 Collections.synchronizedMap과 같은 팩토리를 사용할 수 있을 때도 있다.

교착 상태

그러나 동기화의 사용에는 신중을 기해야 한다. 다음과 같은 코드는 갑자기 교착상태deadlock에 빠져들 것이 틀림없다.

예제 19-13. deadlock.py

from __future__ import with_statement
from threading import Thread, Lock
from java.lang.management import ManagementFactory
import time, threading

def cause_deadlock():
    counter = [0]
    lock_one = Lock()
    lock_two = Lock()
    threads = [
        Thread(
            name="thread #1", target=acquire_locks,
            args=(counter, lock_one, lock_two)),
        Thread(
            name="thread #2 (reversed)", target=acquire_locks,
            args=(counter, lock_two, lock_one))]
    for thread in threads:
        thread.setDaemon(True) # 데드락 후에 shutdown을 가능하게 함
        thread.start()

    thread_mxbean = ManagementFactory.getThreadMXBean()
    while True:
        time.sleep(1)
        print "monitoring thread", counter[0]
        thread_ids = thread_mxbean.findDeadlockedThreads()
        if thread_ids:
            print "monitoring thread: deadlock detected, shutting down", list(thread_ids)
            break

def acquire_locks(counter, lock1, lock2):
    # 다른 순서로 잠금을 얻으면 갑자기 데드락이 걸려버린다
    name = threading.currentThread().getName()
    while True:
        with lock1:
            with lock2:
                counter[0] += 1
                print name, counter[0]


if __name__ == '__main__':
    cause_deadlock()

교착상태는 wait-on 의존성의 어떠한 길이의 사이클에서든지 발생한다. 예를 들어, 영희는 철수를 기다리고, 철수는 영희를 기다리는 것이다. 시간 제한이라든지 다른 전략상의 변화 없이는, 영희는 철수를 기다리다 나가떨어질 것이므로, 이러한 교착상태는 풀리지 않을 것이다.

그와 같은 순환적인 잠금을 절대로 얻지 않음으로써 교착상태를 회피할 수 있다. 예제를 수정하여 잠금이 동일한 순서로 얻어지도록 한다면(철수가 항상 영희를 먼저 보낸다면) 교착상태가 일어나지 않을 것이다. 그렇지만, 순서라는 것이 그리 쉽지많은 않다. 때로는 시간 제한이 가장 튼튼한 전략이 될 수 있다.

그밖의 동기화 개체

Queue 모듈은 동기화된 선입선출 대기열을 구현한 것이다(동기화된 대기열은, java.util.concurrent에서는 blocking queue라고 부른다). 이러한 대기열은 개체를 하나 이상의 생산자 스레드로부터 하나 이상의 소비자 스레드로 보내는, 스레드에 안전한 방법을 나타낸다.

때때로, 여러분은 대기열을 정지시키는 독극물 개체를 정의한다. 이것은 대기 중인 어떠한 소비자 스레드든 즉시 종료시켜버릴 수 있도록 한다. 혹은 있는 방법을 그대로 사용하려면 실행자에 대한 자바의 지원을 사용하도록 한다.

후입선출이라든지 우선순위와 같은 다른 정책을 구현할 필요가 있다면, 비교 가능한 동기화된 대기열을 java.util.concurrent에서 구해다 쓸 수 있을 것이다. (이것들은 파이썬 2.6에 구현되어 있으므로, 자이썬 2.6이 릴리스될 때면 사용할 수 있을 것이다.)

조건 개체condition object는 하나의 스레드가 다른 스레드에게 깨어날 조건이 되었음을 알려줄 수 있도록 허용한다. 이러한 스레드를 모두 깨울 때에는 notifyAll을 사용한다. 대기열과 함께, 이것은 아마도 가장 실제 사용에 대한 동기화 개체의 다목적이다.

조건 개체는 항상 잠금lock과 연관되어 있다. 여러분의 코드는 대기하고 있다가 해당 잠금을 획득함으로서 조건을 알리는 bracket을 필요로 하며, 최종적으로는 (항상!) 그것을 해제한다. 늘 그렇듯이, 이것은 with 문을 통하여 더할 나위 없이 쉬운 방법으로 처리된다.

예를 들어, 다음과 같은 방법으로 자이썬의 표준 라이브러리에서 대기열을 실제로 구현할 수 있다(with 문을 사용하기 위한 부분만 수정하였다). 우리는 표준 자바 blocking queue를 사용할 수 없다. 왜냐하면 더 이상 수행할 작업이 없을 때 대기열에 join하기 위한 요구사항은 제 3의 조건 변수를 필요로 하기 때문이다.

예제 19-15. Queue.py

"""다중 생산자, 다중 소비자 대기열."""

from __future__ import with_statement
from time import time as _time
from collections import deque

__all__ = ['Empty', 'Full', 'Queue']

class Empty(Exception):
    "Exception raised by Queue.get(block=0)/get_nowait()."
    pass

class Full(Exception):
    "Exception raised by Queue.put(block=0)/put_nowait()."
    pass

class Queue:
    """Create a queue object with a given maximum size.

    If maxsize is <= 0, the queue size is infinite.
    """
    def __init__(self, maxsize=0):
        try:
            import threading
        except ImportError:
            import dummy_threading as threading
        self._init(maxsize)
        # 대기열을 변경하려면 그때마다 뮤텍스를 가지고 있어야 한다.
        # 뮤텍스를 얻은 메소드는 복귀하기 전에 뮤텍스를 반드시 내놓아야 한다.
        # 뮤텍스는 세 조건 사이에 공유되므로, 조건의 취득과 반납은
        # 곧 뮤텍스의 취득과 반납이 된다.
        self.mutex = threading.Lock()
        # 대기열에 항목이 추가될 때마다 not_empty를 통지.
        # get을 하려고 기다리는 스레드에게 알린다.
        self.not_empty = threading.Condition(self.mutex)
        # 대기열로부터 항목이 제거될 때마다 not_full을 통지.
        # put을 하려고 기다리는 스레드에게 알린다.
        self.not_full = threading.Condition(self.mutex)
        # 완료하지 않은 작업의 갯수가 0으로 떨어질 때마다 all_tasks_done을 통지.
        # join() 하려고 기다리는 스레드에게 복귀를 알린다.
        self.all_tasks_done = threading.Condition(self.mutex)
        self.unfinished_tasks = 0

    def task_done(self):
        """Indicate that a formerly enqueued task is complete.

        Used by Queue consumer threads.  For each get() used to fetch a task,
        a subsequent call to task_done() tells the queue that the processing
        on the task is complete.

        If a join() is currently blocking, it will resume when all items
        have been processed (meaning that a task_done() call was received
        for every item that had been put() into the queue).

        Raises a ValueError if called more times than there were items
        placed in the queue.
        """
        with self.all_tasks_done:
            unfinished = self.unfinished_tasks - 1
            if unfinished <= 0:
                if unfinished < 0:
                    raise ValueError('task_done() called too many times')
                self.all_tasks_done.notifyAll()
            self.unfinished_tasks = unfinished

    def join(self):
        """Blocks until all items in the Queue have been gotten and processed.

        The count of unfinished tasks goes up whenever an item is added to the
        queue. The count goes down whenever a consumer thread calls task_done()
        to indicate the item was retrieved and all work on it is complete.

        When the count of unfinished tasks drops to zero, join() unblocks.
        """
        with self.all_tasks_done:
            while self.unfinished_tasks:
                self.all_tasks_done.wait()

    def qsize(self):
        """Return the approximate size of the queue (not reliable!)."""
        self.mutex.acquire()
        n = self._qsize()
        self.mutex.release()
        return n

    def empty(self):
        """Return True if the queue is empty, False otherwise (not reliable!)."""
        self.mutex.acquire()
        n = self._empty()
        self.mutex.release()
        return n

    def full(self):
        """Return True if the queue is full, False otherwise (not reliable!)."""
        self.mutex.acquire()
        n = self._full()
        self.mutex.release()
        return n

    def put(self, item, block=True, timeout=None):
        """Put an item into the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until a free slot is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Full exception if no free slot was available within that time.
        Otherwise ('block' is false), put an item on the queue if a free slot
        is immediately available, else raise the Full exception ('timeout'
        is ignored in that case).
        """
        with self.not_full:
            if not block:
                if self._full():
                    raise Full
            elif timeout is None:
                while self._full():
                    self.not_full.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                endtime = _time() + timeout
                while self._full():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Full
                    self.not_full.wait(remaining)
            self._put(item)
            self.unfinished_tasks += 1
            self.not_empty.notify()

    def put_nowait(self, item):
        """Put an item into the queue without blocking.

        Only enqueue the item if a free slot is immediately available.
        Otherwise raise the Full exception.
        """
        return self.put(item, False)

    def get(self, block=True, timeout=None):
        """Remove and return an item from the queue.

        If optional args 'block' is true and 'timeout' is None (the default),
        block if necessary until an item is available. If 'timeout' is
        a positive number, it blocks at most 'timeout' seconds and raises
        the Empty exception if no item was available within that time.
        Otherwise ('block' is false), return an item if one is immediately
        available, else raise the Empty exception ('timeout' is ignored
        in that case).
        """
        with self.not_empty:
            if not block:
                if self._empty():
                    raise Empty
            elif timeout is None:
                while self._empty():
                    self.not_empty.wait()
            else:
                if timeout < 0:
                    raise ValueError("'timeout' must be a positive number")
                endtime = _time() + timeout
                while self._empty():
                    remaining = endtime - _time()
                    if remaining <= 0.0:
                        raise Empty
                    self.not_empty.wait(remaining)
            item = self._get()
            self.not_full.notify()
            return item

    def get_nowait(self):
        """Remove and return an item from the queue without blocking.

        Only get an item if one is immediately available. Otherwise
        raise the Empty exception.
        """
        return self.get(False)

    # 다른 대기열 조직(스택 또는 우선순위 대기열 등)을
    # 구현하기 위해 이러한 메소드들을 덮어씀.
    # 이러한 것들은 적절한 잠금을 가지고 호출해야 한다.

    # 대기열을 초기화
    def _init(self, maxsize):
        self.maxsize = maxsize
        self.queue = deque()

    def _qsize(self):
        return len(self.queue)

    # 대기열이 비어있는지 확인
    def _empty(self):
        return not self.queue

    # 대기열이 꽉 차있는지 확인
    def _full(self):
        return self.maxsize > 0 and len(self.queue) == self.maxsize

    # 새로운 항목을 대기열에 넣음
    def _put(self, item):
        self.queue.append(item)

    # 대기열로부터 항목을 얻음
    def _get(self):
        return self.queue.popleft()

교환기(exchangers), 장벽(barriers), 래치(latch) 등, 동기화를 위한 다른 기제들이 있다. 다중 스레드의 진입을 가능하게 하는 시나리오를 기술하기 위해 세마포어를 사용할 수도 있고, 읽기와 쓰기를 구별하는 잠금을 사용할 수도 있다. 자바 플랫폼에는 큰 가능성이 있다. 우리의 경험에 따르면, 그 어느 것이라도 자이썬에서 활용할 수 있다.

원자적 연산

원자적 연산은 본질적으로 스레드에 안전하다. 자료 경합이나 개체 손상이 발생하지 않으며, 그리고 다른 스레드의 관점에서 일관성이 결여되어 보이는 일도 없다.

그러므로 원자적 연산이 동기화보다 간단하게 사용할 수 있다. 덧붙여, 원자적 연산은 비교 후 치환 연산과 같은 CPU에 대한 기반 지원을 활용하곤 한다. 또한 원자적 연산은 잠금을 사용할 수 있다. 중요한 점은 잠금이 직접적으로 드러나지 않는다는 것이다. 또한, 동기화를 사용하는 경우, 동기화의 범위를 확장할 수 없다. 특히, 콜백 및 반복이 가능하지 않다.

비공식적인 문서에서나 언급되는 것이기는 하지만, 파이썬은 특정 연산의 원자성을 보장한다. 프레드릭 룬드(Fredrik Lundh)의 “파이썬에서의 스레드 동기화 방법(Thread Synchronization Methods in Python)”에 대한 문서는 메일링 리스트 상에서의 토론 및 CPython 구현의 상태를 요약한 것이다. 그의 글을 인용하자면, 파이썬 코드에서의 원자적인 연산은 다음과 같다.

  • 단일 인스턴스 속성을 읽거나 치환
  • 단일 전역 변수를 읽거나 치환
  • 목록으로부터 항목을 얻기
  • 목록을 수정 (append를 사용하여 항목을 추가하는 것 등)
  • 사전으로부터 항목을 얻기
  • 사전을 수정 (항목을 추가하거나, clear 메소드를 호출)

기술되어 있지는 않지만, 이는 내장된 set 유형에 대해서도 동일하게 적용된다.

CPython에 있어서, 이러한 원자성은 전역 인터프리터 잠금(Global Interpreter Lock, GIL), 즉 파이썬 바이트코드 가상 기계 실행 루프와 함께, dict와 list와 같은 형은 C에 내재된 것이며 GIL을 release하지 않는다는 사실이 결합됨으로 인하여 발현되는 것이다.

이는 어찌 보면 창발적이기는 하나, 개발자에게 유용한 단순화이다. 이는 기존 파이썬 코드가 기대하는 것이므로, 자이썬에 구현되어 있는 것이다.

특히 dict는 ConcurrentHashMap이므로, 우리는 사전을 자동으로 업데이트 하는 다음과 같은 메소드에 노출된다.

  • setifabsent
  • update

ConcurrentHashMap에 있어서도 반복은 원자적이지 않는 다는 점이 중요하다.

원자적 연산은 유용하지만, 상당히 제한적이기도 하다. 종종, 당신은 여전히 데이터 레이스를 방지하기 위해 동기화를 사용할 필요가, 그리고 이것은 교착 상태와 기아를 피하기 위해주의 일을해야 한다.

스레드 한정

스레드 한정thread confinement은 가변 개체와 관련된 문제를 해결하는 최선의 방법이다. 실제로, 당신은 아마 당신의 코드에 사용되는 가변 개체를 공유하는 비율을 높일 필요가 없을 것이다. 매우 단순한 입력을 사용하고 이를 공유하지 않는다면 스레드 안전에 대하여 걱정할 일이 없을 것이다.

스레드 한정 기법이 만병통치약은 아니다. 여러분의 시스템에서 일부 개체는 공유하겠지만, 실제로는 대부분은 제거할 수 있다. 종종 공유 상태는 다른 사람의 문제가 될 수 있다.

  • 매개체intermediate object는 공유를 필요로 하지 않는다. 예를 들어, 지역 변수로만 사용할 버퍼를 구축하는 경우에는 동기화를 필요로 하지 않는다. 이것이 따르기 쉬운 처방이기 때문에, 할당 오버헤드를 피하기 위해 이러한 매개체 주위에 있으려고 하지 않는가. 그렇게 하지 말라.
  • 생산자-소비자. 한 스레드가 개체를 만들어서 다른 스레드에 넘겨준다. 여러분이 할 일은 그저 Queue와 같은 적절한 동기화 개체를 사용하는 것이다.
  • 애플리케이션 컨테이너. 전형적인 데이터베이스 기반 웹 애플리케이션이 일으키는 고전적인 문제가 있다. 예를 들어, modjy를 사용한다면, 데이터베이스 접속 풀 및 스레드 풀은 서블릿 컨테이너의 책임 하에 있다. 그러므로 그것들을 직접 관찰할 수 없다. (그렇다고 해서 스레드들 간에 데이터베이스 접속을 공유하는 일은 금기이다.) 따라서 캐시와 데이터베이스의 공유 상태를 보게 될 것이다.
  • 행위자. 행위자actor 모델은 또 다른 좋은 예이다. 행위자(독립 스레드와 같은 효과를 지닌다)에게 메시지를 보내고 받을 때, 여러분 대신에 그 행위자가 개체를 조작하도록 하자. 이는 가변 개체의 일종인 메시지 대기열을 공유할 때 발생하는 문제를 효과적으로 줄일 수 있다. 그러면 메시지 대기열은 어떠한 프로세스라도 적절하게 직렬화됨을 보장함으로써 어떠한 스레드 안전 문제도 발생하지 않을 것이다.

안타깝게도 자이썬은 스레드 한정 문제로부터 자유로울 수 없다. 예를 들어, 여러분이 StringIO를 사용한다면 이 클래스가 동기화되는 목록list을 사용함에 따른 댓가를 치루어야 한다. 파이썬 표준 라이브러리에 대한 자이썬 구현이 최적화될 여지가 있기는 하지만, 코드의 섹션이 충분히는 뜨겁다면, 동기화 오버헤드를 일으키지 않음을 보장하기 위해 자바로 재작성하는 것을 고려해봄직하다.

마지막으로, 프레임 개체에 인트로스펙팅introspecting의 가능성으로 인하여, 파이썬에는 스레드 한정은 완벽하지 않다. 이는 여러분의 코드가 지역변수를 다른 스레드에서 볼 수 있으며, 그것이 가리키는 개체도 볼 수 있음을 뜻한다. 그러나 이것은 자이썬을 JVM 상에서 실행함에 있어서 어떻게 최적화할 것인가에 대한 이슈에 가깝다. 이 허점을 악용하지 않는다면 스레드 안전 문제를 발생시키지 않을 것이다. 다음 절에서 파이썬 메모리 모델에 대하여 좀 더 살펴보도록 하겠다.

파이썬 메모리 모델

파이썬의 동시성 원리는 자바에 비해 쉽다. 그 이유는 메모리 모델이라고 해서 프로그램이 작동하는 전통적인 원리에 비해 대단할 것이 없기 때문이다. 그렇지만, 이것은 또한 파이썬 코드가 그러한 단순함을 유지하기 위해 성능을 상당 부분 희생시킨다는 뜻이기도 하다.

그 이유는 다음과 같다. 자바 성능을 최대로 끌어올리기 위하여, happens-beforesynchronizes-with관계에 의하여 CPU가 자바 코드의 연산 순서를 임의로 변경re-order하는 것을 허용한다(이러한 제약에 대해서는 published 자바 메모리 모델이 보다 깊이 들어간다). 주어진 스레드 내에서 이러한 순서변경이 보이지 않는다 하더라도, 다른 스레드에서는 보인다는 것이 문제이다. 당연히, 가시성은 비지역적 개체에 대해서만 변경을 가하며, 스레드 한정은 여전히 적용된다. 특히, 이것은 두 개 이상의 스레드에 대해서는 자바 코드의 순차 지시sequential ordering에 의존할 수 없음을 의미한다.

파이썬은 다르다. 파이썬에 대하여 알아야 할 근본적인 점인 동시에 우리가 자이썬에 구현해 둔 점은, 파이썬에서 어떠한 속성을 지정하는 것은 휘발성의 쓰기라는 것이다. 그리고 어떠한 속성을 얻는 것은 휘발성의 읽기이다. 이것은 파이썬의 속성은 사전에 저장되기 때문이며, 자이썬에서는 기반이 되는ConcurrentHashMap의 의미를 따른다. 따라서 get과 set은 휘발성이다. 이는 파이썬 코드가 순차 일관성을 가지고 있음을 의미한다. 실행 순서는 코드의 구문 순서를 따른다. 뒤통수를 얻어맞을 걱정은 없다.

자바와 비교하여, 파이썬에서는 안전 발행은 일도 아니라는 것이다. 안전 발행safe publication이란 스레드에 대하여 안전한, 개체와 이름의 연관을 뜻한다. 파이썬에서는 언제나 메모리에 울타리가 쳐진 연산이 일어나므로, 개체 그 자체가 스레드에 대하여 안전한지만 확실히 해두면 된다. 그 다음에는 이 개체에 대하여 적당한 변수를 지정해주기만 하면 발행은 한번에 해결된다.

만약에 모듈 수준의 개체-싱글톤을 만들 필요가 있다면 모듈의 최상위 수준의 스크립트에서 만듦으로써 import lock이 효과가 있게끔 하라.

끼어들기

길이가 긴 스레드는 취소를 할 수 있는 여지를 남겨두어야 한다. 전형적인 패턴은 다음 예제와 비슷한 것이다.

예제 19-16.

class DoSomething(Runnable):
    def __init__(self):
        cancelled = False

    def run(self):
        while not self.cancelled:
            do_stuff()

파이썬 변수는 자바와는 달리, 항상 휘발성이 있음에 유의하라. 이런 식으로 cancelled 플래그를 사용함에 있어 아무런 문제가 없다.

스레드 끼어들기interruption를 통하여 보다 rosponsive한 취소가 가능하다. 특히, 스레드가 예컨대 조건 변수 혹은 파일 입출력과 같은 싱크로나이저를 대기하고 있다면, 이 액션은 waited-on 메소드가 InterruptedException과 함께 종료되는 원인을 유발할 것이다. (불행히도 interruptible되지 않는다, 그러한 기본 자바 잠금에 lockInterruptibly 사용하는 것과 특정한 경우를 제외하고 아래에, 인수를 잠금.)

파이썬의 스레딩 모듈은 자체가 끼어들기를 지원하지 않지만, 그것은 표준 자바 스레드 API를 통해 사용할 수 있다. 우선, 다음과 같이 클래스를 들여오도록 하자(파이썬 버전의 Thread와 충돌하지 않도록 하기 위해 JThread로 이름을 바꾼다).

예제 19-17.

from java.lang import Thread as JThread

위와 같이, 자바 스레드를 마치 파이썬 스레드인 것처럼 사용할 수 있다. 그렇다면, 역으로 파이썬 스레드를 자바 스레드처럼 사용하는 것도 가능해야 할 것이다. JThread.interrupt(obj)와 같이 호출할 수 있다면 좋을 것이다.

Note

말이 나온 김에, obj.interrupt() 대신에 위와 같이 표기해놓으니, 우리가 첫 인자로서 개체에 전달하는 한은, 마치 클래스의 정적 메소드처럼 보인다. 이러한 적응은 파이썬의 명시적인 self의 좋은 용례이다.

그러나 한 가지 문제가 있다. 최근에 출시된 버전(자이썬 2.5.1)에서, 우리는 Thread 클래스에 합당한 __tojava__ 메소드를 포함하는 것을 잊어버렸다! 그래서 결국에 이러한 술수를 부릴 수 없을 것 같다.

혹은 가능하다면? 이 버그가 수정될 때까지 기다릴 필요가 없다면 어떻게 하겠는가? 소스 코드를 뒤져보거나, dir로 클래스를 살펴볼 수 있다. 하나의 가능성은 Thread 개체의 private 속성인 _thread 속성을 사용하는 것이다. 결국 _thread는 기반이 되는 자바 스레드에 대한 속성이다. 그렇다. 이것은 구현의 세부사항이다. 그렇지만 아마 사용해도 괜찮을 것이다. 이것이 바뀌지는 않을 것으로 보인다.

하지만 더 좋은 방법이 있다. Thread 클래스가 적절한 __tojava__ 메소드를 갖도록 원숭이 패치(monkey patch)를 할 수도 있는데, 그것이 존재하지 않는 경우에만 가능하다. 따라서 이러한 패치를 통하여 미래의 자이썬 버전을 사용하게 된다고 하겠다. 왜냐하면 그 구현을 변경하여 _thread를 제거하는 것을 고려하기도 전에 빠뜨린 이 메소드를 수정할 것이기 때문이다.

다음은 귀도 반 로섬의 방법을 따라서 원숭이 패치하는 방법이다.

예제 19-18. monkeypatch.py

# http://mail.python.org/pipermail/python-dev/2008-January/076194.html
# - 귀도 반 로섬의 방법

def monkeypatch_method(cls):
    def decorator(func):
        setattr(cls, func.__name__, func)
        return func
    return decorator

# and a useful variant, with a good ugly name

def monkeypatch_method_if_not_set(cls):
    def decorator(func):
        if not hasattr(cls, func.__name__):
            setattr(cls, func.__name__, func)
        return func
    return decorator

이 monkeypatch_method 장식자는 사후에 클래스에 메소드를 추가할 수 있도록 해준다. (이것은 루비 개발자들이 클래스를 연다고 일컫는 방법이다.) 강력한 만큼 주의를 요한다. 그러나 다시, 당신은 본질적으로 이와 같은 버그 수정, 특히 때 당신은 최소한으로 이러한 수정 프로그램을 계속 할 때 걱정을 너무 많이해서는 안된다. 우리의 경우에는 이후 버전에서 고쳐지지 않았을 경우에만 패치함을 확실히 하기 위해서 변종, monkeypatch_method_if_not_set 장식자를 사용할 것이다.

한 곳에 모아놓으면 다음과 같은 코드가 된다.

예제 19-19. interrupt.py

from __future__ import with_statement
from threading import Condition, Lock, Thread
from java.lang import Thread as JThread, InterruptedException
from monkeypatch import monkeypatch_method_if_not_set
import time, threading

@monkeypatch_method_if_not_set(Thread)
def __tojava__(self, java_type):
    return self._thread

def be_unfair():
    unfair_condition = Condition()
    threads = [
        Thread(
            name="thread #%d" % i,
            target=wait_until_interrupted,
            args=(unfair_condition,))
        for i in xrange(5)]
    for thread in threads:
        thread.start()
    time.sleep(5)

    # 지금은 스레드가 아무 것도 하고 있지 않아야 하며, 어떤 공유상태를 보고 식별할 수 있다.

    # 알려주는(notifying) 대신에 스레드에 끼어든다.
    for thread in threads:
        JThread.interrupt(thread)
        # 또는 다음과 같은 동등한 연산을 할 수도 있다
        # thread.__tojava__(JThread).interrupt()
    for thread in threads:
        thread.join()

def wait_until_interrupted(cv):
    name = threading.currentThread().getName()
    with cv:
        while not JThread.currentThread().isInterrupted():
            try:
                print "Waiting pointlessly %s" % name
                cv.wait()
            except InterruptedException, e:
                break
    print "Finished %s" % name


if __name__ == '__main__':
    be_unfair()

(그것은 기다림을 위하여 threading.Condition의 사용에 의존하지 않는다.)

마침내 우리는 Future에서 제공하는 cancel 메소드를 통해 끼어들기에 간단히 접근할 수 있다. 원숭이 패치는 할 필요가 없다!

요약

자이썬에서는 동시성을 지원하는 자바 플랫폼의 이점을 한껏 누릴 수 있다. 파이썬의 표준 스레드 구조도 활용할 수 있는데, 이는 대부분의 경우에 자바의 기능을 감싸고 있다. 파이썬의 표준 가변 타입이 자이썬에서는 동시성을 염두에 두고 구현되어 있으며, 파이썬의 순차적 일관성이 잠재적인 버그를 제거한다.

그렇지만 동시성을 올바로 프로그래밍하는 것은 파이썬에서든 자바에서든 그리 쉽지 않은 일이다. 보다 고수준의 동시성, 이를테면 태스크와 같은 것을 고려하여야 하며, 가변 상태의 공유를 잘 관리해야 한다.