Параллельное программирование Python (шесть): многопоточная очередь синхронизации (очередь) для реализации модели производитель-потребитель

Python
Параллельное программирование Python (шесть): многопоточная очередь синхронизации (очередь) для реализации модели производитель-потребитель

удобная очередь

Когда мы имеем дело с многопоточным совместным использованием ресурсов, управление многопоточными модулями может усложниться. Как мы видели, модуль потоковой обработки Python предоставляет множество примитивов синхронизации, включая блокировки, семафоры, условные переменные, события и т. д. Хотя существует так много вариантов, использование очереди может быть лучшим партнером для управления синхронизацией потоков. Очереди просты в использовании, поскольку модуль обеспечивает синхронизированную безопасную последовательность пар, включая очередь FIFO (первым пришел, первым обслужен), очередь LIFO (последним пришел, первым обслужен) LifoQueue и приоритетную очередь PriorityQueue. Все эти очереди реализуют примитивы блокировки. которые можно использовать непосредственно в многопоточности. Очереди можно использовать для связи между потоками: Общие методы в модуле Queue:

  • Queue.qsize(): возвращает размер очереди
  • Queue.empty(): если очередь пуста, вернуть True, иначе False
  • Queue.full(): если очередь заполнена, вернуть True, иначе False.
  • Queue.full: соответствует максимальному размеру
  • Queue.get([block[ timeout]]): Получить очередь, время ожидания тайм-аута
  • Queue.get_nowait(): эквивалентно Queue.get(False)
  • Queue.put(item): запись в очередь, время ожидания тайм-аута
  • Queue.put_nowait(item): эквивалентно Queue.put(item, False)
  • Queue.task_done(): после завершения задания функция Queue.task_done() отправляет в очередь сигнал о том, что задание выполнено.
  • Queue.join(): на самом деле означает ожидание, пока очередь не станет пустой, прежде чем выполнять другие операции.

Модель производитель-потребитель

Реализуйте модель производитель-потребитель с помощью очередей:


import threading
import queue
import random
import time

# 创建一个队列
q = queue.Queue()

# 假定商品序号
item = 0


def produecr():
    global item
    while True:
        time.sleep(1)
        item = random.randint(1, 10)
        # 将一个“商品”推到队列中
        q.put(item)
        print('producer {}th gooos append to q.'.format(item))
        time.sleep(1)


def consumer():
    while True:
        # 在队列中删除一个“商品”,并返回该“商品”
        item = q.get()
        print(threading.currentThread().getName() +
              'consumer get {}th goods from q.'.format(item))
        q.task_done()


if __name__ == "__main__":
    threads_consumr = []
    for i in range(3):
        t = threading.Thread(target=consumer)
        t.start()
        threads_consumr .append(t)

    thread_producer = threading.Thread(target=produecr)
    thread_producer.start()
    q.join()
    for t in threads_consumr:
        t.join()
    thread_producer.join()

Рабочий скриншот выглядит следующим образом:

运行结果
Мы видим, что использование очередей подходит для таких часто встречающихся сценариев: например, когда нужно обработать тысячи данных, а потом на обработку вывозится по одной порции данных, как использовать многопоточность распределять задачи обработки для повышения эффективности обработки? Мы можем разделить данные и предоставить их для запуска нескольким потокам, но это не самый разумный подход. Здесь мы можем использовать комбинацию очередей и потоков для назначения задач. Можно использовать идею потоков очереди: во-первых, создайте глобально разделяемую очередь с ограниченным числом элементов в очереди, добавьте все данные в очередь один за другим и вызовите функцию соединения очереди для ждать. После этого может быть открыто несколько потоков, и задача потока — непрерывно извлекать данные из очереди для обработки.