IT

여러 프로세스 간에 결과 대기열 공유

itgroup 2023. 7. 27. 21:52
반응형

여러 프로세스 간에 결과 대기열 공유

에 대한 설명서multiprocessing모듈은 다음으로 시작된 프로세스에 큐를 전달하는 방법을 보여줍니다.multiprocessing.Process그러나 어떻게 하면 다음으로 시작된 비동기 작업자 프로세스와 대기열을 공유할 수 있습니까?apply_async저는 역동적인 가입이나 다른 것이 필요하지 않습니다. 단지 근로자들이 그들의 결과를 기지로 (반복적으로) 보고할 수 있는 방법입니다.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    q = multiprocessing.Queue()
    workers = pool.apply_async(worker, (33, q))

다음과 같은 경우 실패:RuntimeError: Queue objects should only be shared between processes through inheritance이것이 무엇을 의미하는지 잘 알고 있으며, 피클링/피클링 해제(및 모든 특별한 Windows 제한 사항)를 요구하는 것보다 상속을 받으라는 조언을 잘 알고 있습니다.하지만 어떻게 하면 작동하는 방식으로 대기열을 전달할 수 있을까요?저는 사례를 찾을 수 없고, 여러 가지 방법으로 실패한 대안들을 시도해 보았습니다.도와주시겠습니까?

다중 처리를 사용해 보십시오.관리자: 대기열을 관리하고 다른 작업자가 대기열에 액세스할 수 있도록 합니다.

import multiprocessing
def worker(name, que):
    que.put("%d is done" % name)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=3)
    m = multiprocessing.Manager()
    q = m.Queue()
    workers = pool.apply_async(worker, (33, q))

multiprocessing.Pool이미 공유된 결과 변수가 있으므로 추가적으로 사용할 필요가 없습니다.Manager.Queue.Manager.Queue는 별도의 서버 프로세스에 위치하고 프록시를 통해 노출되는 후드 아래의 (멀티스레딩-프로세스)입니다.이렇게 하면 풀의 내부 대기열에 비해 오버헤드가 추가됩니다.Pool의 기본 결과 처리에 의존하는 것과 달리, 결과는Manager.Queue또한 주문이 보장되지 않습니다.

작업자 프로세스는 다음으로 시작되지 않습니다..apply_async()이것은 당신이 인스턴스화할 때 이미 발생합니다.Pool호출 시 시작되는 항목pool.apply_async()는 새로운 "작업"입니다.풀의 worker-process는multiprocessing.pool.worker- 후드 아래의 기능이 기능은 풀의 내부를 통해 전송되는 새로운 "작업"을 처리합니다.Pool._inqueue그리고 부모님께 결과를 다시 보내는 것.Pool._outqueue지정한 항목func다음 시간 내에 실행됩니다.multiprocessing.pool.worker.func하기만 하면 되는return결과가 자동으로 부모에게 전송됩니다.

.apply_async() 즉시(연속적으로) 객체를 반환합니다(다음 시간 동안).ApplyResult) 전화를 하셔야 합니다..get()실제 결과를 수신하기 위해 해당 개체에 대해 (차단 중입니다).또 다른 옵션은 콜백 함수를 등록하는 것입니다. 콜백 함수는 결과가 준비되는 즉시 실행됩니다.

from multiprocessing import Pool

def busy_foo(i):
    """Dummy function simulating cpu-bound work."""
    for _ in range(int(10e6)):  # do stuff
        pass
    return i

if __name__ == '__main__':

    with Pool(4) as pool:
        print(pool._outqueue)  # DEMO
        results = [pool.apply_async(busy_foo, (i,)) for i in range(10)]
        # `.apply_async()` immediately returns AsyncResult (ApplyResult) object
        print(results[0])  # DEMO
        results = [res.get() for res in results]
        print(f'result: {results}')       

출력 예:

<multiprocessing.queues.SimpleQueue object at 0x7fa124fd67f0>
<multiprocessing.pool.ApplyResult object at 0x7fa12586da20>
result: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

참고: 지정timeout-에 대한 매개 변수..get()작업자 내에서 작업의 실제 처리를 중지하지 않습니다. 대기 중인 부모를 차단 해제할 뿐입니다.multiprocessing.TimeoutError.

언급URL : https://stackoverflow.com/questions/9908781/sharing-a-result-queue-among-several-processes

반응형