6

Как получить возвращаемое значение из потока?

5

У меня есть функция foo, которая возвращает строку 'foo'. Я пытаюсь получить значение 'foo', которое возвращается из целевой функции потока. Вот мой код:

from threading import Thread

def foo(bar):
    print('hello {}'.format(bar))
    return 'foo'
    
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()

Я ожидал, что thread.join() вернет значение 'foo', но, как видно из моего примера, это не так: thread.join() возвращает None.

Как мне получить возвращаемое значение из функции, выполняемой в потоке?

5 ответ(ов)

4

Один из способов, который я видел, это передать изменяемый объект, такой как список или словарь, в конструктор потока вместе с индексом или другим идентификатором. Затем поток может сохранить свои результаты в выделенной ячейке этого объекта. Например:

def foo(bar, result, index):
    print('hello {0}'.format(bar))
    result[index] = "foo"

from threading import Thread

threads = [None] * 10
results = [None] * 10

for i in range(len(threads)):
    threads[i] = Thread(target=foo, args=('world!', results, i))
    threads[i].start()

# делаем что-то еще

for i in range(len(threads)):
    threads[i].join()

print(" ".join(results))  # какой звук издает метасинтаксический локомотив?

Если вы действительно хотите, чтобы метод join() возвращал значение, возвращаемое вызываемой функцией, вы можете сделать это с помощью подкласса Thread, как показано ниже:

from threading import Thread

def foo(bar):
    print('hello {0}'.format(bar))
    return "foo"

class ThreadWithReturnValue(Thread):
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs, Verbose)
        self._return = None
        
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args,
                                                **self._Thread__kwargs)
    def join(self):
        Thread.join(self)
        return self._return

twrv = ThreadWithReturnValue(target=foo, args=('world!',))

twrv.start()
print(twrv.join())   # выводит 'foo'

Это может быть немного сложно из-за некоторых особенностей именования и доступа к "частным" структурам данных, специфичным для реализации Thread, но это работает.

Для Python 3:

class ThreadWithReturnValue(Thread):
    
    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs={}, Verbose=None):
        Thread.__init__(self, group, target, name, args, kwargs)
        self._return = None

    def run(self):
        if self._target is not None:
            self._return = self._target(*self._args,
                                        **self._kwargs)

    def join(self, *args):
        Thread.join(self, *args)
        return self._return
3

Для справки, модуль multiprocessing имеет удобный интерфейс для этого, используя класс Pool. Если вы хотите использовать потоки вместо процессов, вы можете просто воспользоваться классом multiprocessing.pool.ThreadPool как заменой.

Вот пример:

def foo(bar, baz):
    print('hello {0}'.format(bar))
    return 'foo' + baz

from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)

async_result = pool.apply_async(foo, ('world', 'foo'))  # кортеж аргументов для foo

# выполняем какие-то другие операции в главном потоке

return_val = async_result.get()  # получаем возвращаемое значение от вашей функции.

Этот код создает пул потоков и асинхронно вызывает функцию foo, передавая ей аргументы. После этого в основном процессе можно выполнять дополнительные задачи, а затем получить результат вызова функции с помощью метода get().

0

Вот решение, которое не требует изменения вашего существующего кода:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x

from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result             # Python 2.x
#print(result)           # Python 3.x

Это решение также можно легко адаптировать для многопоточной среды:

import Queue             # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread

def foo(bar):
    print 'hello {0}'.format(bar)     # Python 2.x
    #print('hello {0}'.format(bar))   # Python 3.x
    return 'foo'

que = Queue.Queue()      # Python 2.x
#que = Queue()           # Python 3.x

threads_list = list()

t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)

# Здесь можно добавить больше потоков
...
threads_list.append(t2)
...
threads_list.append(t3)
...

# Дождаться завершения всех потоков
for t in threads_list:
    t.join()

# Проверить значение, возвращаемое потоками
while not que.empty():
    result = que.get()
    print result         # Python 2.x
    #print(result)       # Python 3.x

В этом коде реализовано создание потоков и использование очереди для получения результатов вычислений из разных потоков, что обеспечивает чистоту и безопасность данных в многопоточной среде.

0

Вот перевод вашего текста в стиле ответа на StackOverflow:


Я адаптировал ответ от Kindall и немного его улучшил.

Ключевым моментом является добавление *args и **kwargs в метод join(), чтобы обеспечить обработку параметра timeout.

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super(ThreadWithReturn, self).__init__(*args, **kwargs)
        
        self._return = None
    
    def run(self):
        if self._Thread__target is not None:
            self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
    
    def join(self, *args, **kwargs):
        super(ThreadWithReturn, self).join(*args, **kwargs)
        
        return self._return

ОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ

Это мой самый популярный ответ, поэтому я решил обновить его, добавив код, который будет работать как на Python 2, так и на Python 3.

Кроме того, я заметил множество ответов на этот вопрос, показывающих нехватку понимания работы метода Thread.join(). Многие из них полностью игнорируют параметр timeout. Также существует особый случай, о котором вам следует знать, когда вы имеете (1) целевую функцию, которая может возвращать None, и (2) вы также передаете параметр timeout в join(). Пожалуйста, смотрите "ТЕСТ 4", чтобы понять этот особый случай.

Вот класс ThreadWithReturn, который работает как с Python 2, так и с Python 3:

import sys
from threading import Thread
from builtins import super  # https://stackoverflow.com/a/30159479

_thread_target_key, _thread_args_key, _thread_kwargs_key = (
    ('_target', '_args', '_kwargs')
    if sys.version_info >= (3, 0) else
    ('_Thread__target', '_Thread__args', '_Thread__kwargs')
)

class ThreadWithReturn(Thread):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self._return = None
    
    def run(self):
        target = getattr(self, _thread_target_key)
        if target is not None:
            self._return = target(
                *getattr(self, _thread_args_key),
                **getattr(self, _thread_kwargs_key)
            )
    
    def join(self, *args, **kwargs):
        super().join(*args, **kwargs)
        return self._return

Вот несколько тестов:

import time, random

# ТЕСТОВАЯ ЦЕЛЕВАЯ ФУНКЦИЯ
def giveMe(arg, seconds=None):
    if seconds is not None:
        time.sleep(seconds)
    return arg

# ТЕСТ 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')

# ТЕСТ 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)

# ТЕСТ 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # потому что join() вышел по таймауту до завершения giveMe()

# ТЕСТ 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

Можете ли вы определить особый случай, который мы можем встретить в ТЕСТЕ 4?

Проблема в том, что мы ожидаем, что giveMe() вернет None (смотрите ТЕСТ 2), но также ожидаем, что join() вернет None, если он вышел по таймауту.

returned is None означает либо:

(1) это то, что вернула giveMe(), либо

(2) join() вышел по таймауту.

Этот пример тривиален, так как мы знаем, что giveMe() всегда вернет None. Но в реальных случаях (где целевая функция может легитимно возвращать None или что-то другое) нам следует явно проверить, что произошло.

Вот как можно решить этот особый случай:

# ТЕСТ 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))

if my_thread.isAlive():
    # returned is None, потому что join() вышел по таймауту
    # это также означает, что giveMe() все еще выполняется в фоновом режиме
    pass
    # обработайте логику в зависимости от вашей программы
else:
    # join() завершился, и giveMe() тоже
    # НО мы также можем находиться в состоянии гонки, так что нам нужно обновить returned на случай
    returned = my_thread.join()

Если у вас есть дополнительные вопросы или требуется помощь, не стесняйтесь спрашивать!

0

Ваш код на Python использует многопоточность и очередь для вычисления квадратов чисел в заданном массиве. Давайте разберем его подробнее:

  1. Импортируем необходимые модули: Вы импортируете threading для работы с потоками и queue для создания очереди, в которую вы будете помещать результаты.

  2. Определяем функцию calc_square: Эта функция принимает список чисел и очередь, в которую она будет помещать результаты. В цикле она вычисляет квадрат каждого числа и добавляет его в локальный список l.

  3. Создаем массив arr: Этот массив содержит числа от 1 до 10, для которых вы будете вычислять квадраты.

  4. Создаем очередь out_queue1: Это экземпляр класса Queue, куда будет помещен результат вычислений.

  5. Создаем и запускаем поток: Вы создаете объект Thread, передавая ему целевую функцию calc_square и аргументы (массив и очередь). Затем вы запускаете поток с помощью start().

  6. Ожидаем завершения потока: Вы используете join(), чтобы дождаться завершения потока перед продолжением выполнения основной программы.

  7. Получаем и выводим результат: После завершения потока вызывается get() на очереди, чтобы получить список квадратов и вывести его на экран.

Вот код с комментариями для лучшего понимания:

import threading
import queue

# Функция для вычисления квадратов чисел
def calc_square(num, out_queue1):
    l = []  # Локальный список для хранения квадратов
    for x in num:
        l.append(x*x)  # Вычисляем квадрат и добавляем в список
    out_queue1.put(l)  # Помещаем результат в очередь

# Исходный массив чисел
arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
out_queue1 = queue.Queue()  # Создаем очередь для передачи результата
t1 = threading.Thread(target=calc_square, args=(arr, out_queue1))  # Создаем поток
t1.start()  # Запускаем поток
t1.join()  # Ждем завершения потока
print(out_queue1.get())  # Получаем и выводим результат из очереди

Примечания:

  • Использование очередей в многопоточных приложениях обеспечивает безопасность передачи данных между потоками.
  • Вы можете добавить больше потоков для параллельной обработки массивов, что позволит вам использовать ресурсы системы более эффективно.
Чтобы ответить на вопрос, пожалуйста, войдите или зарегистрируйтесь