Как получить возвращаемое значение из потока?
У меня есть функция foo
, которая возвращает строку 'foo'
. Я пытаюсь получить значение 'foo'
, которое возвращается из целевой функции потока. Вот мой код:
from threading import Thread
def foo(bar):
print('hello {}'.format(bar))
return 'foo'
thread = Thread(target=foo, args=('world!',))
thread.start()
return_value = thread.join()
Я ожидал, что thread.join()
вернет значение 'foo'
, но, как видно из моего примера, это не так: thread.join()
возвращает None
.
Как мне получить возвращаемое значение из функции, выполняемой в потоке?
5 ответ(ов)
Один из способов, который я видел, это передать изменяемый объект, такой как список или словарь, в конструктор потока вместе с индексом или другим идентификатором. Затем поток может сохранить свои результаты в выделенной ячейке этого объекта. Например:
def foo(bar, result, index):
print('hello {0}'.format(bar))
result[index] = "foo"
from threading import Thread
threads = [None] * 10
results = [None] * 10
for i in range(len(threads)):
threads[i] = Thread(target=foo, args=('world!', results, i))
threads[i].start()
# делаем что-то еще
for i in range(len(threads)):
threads[i].join()
print(" ".join(results)) # какой звук издает метасинтаксический локомотив?
Если вы действительно хотите, чтобы метод join()
возвращал значение, возвращаемое вызываемой функцией, вы можете сделать это с помощью подкласса Thread
, как показано ниже:
from threading import Thread
def foo(bar):
print('hello {0}'.format(bar))
return "foo"
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs, Verbose)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args,
**self._Thread__kwargs)
def join(self):
Thread.join(self)
return self._return
twrv = ThreadWithReturnValue(target=foo, args=('world!',))
twrv.start()
print(twrv.join()) # выводит 'foo'
Это может быть немного сложно из-за некоторых особенностей именования и доступа к "частным" структурам данных, специфичным для реализации Thread
, но это работает.
Для Python 3:
class ThreadWithReturnValue(Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs={}, Verbose=None):
Thread.__init__(self, group, target, name, args, kwargs)
self._return = None
def run(self):
if self._target is not None:
self._return = self._target(*self._args,
**self._kwargs)
def join(self, *args):
Thread.join(self, *args)
return self._return
Для справки, модуль multiprocessing
имеет удобный интерфейс для этого, используя класс Pool
. Если вы хотите использовать потоки вместо процессов, вы можете просто воспользоваться классом multiprocessing.pool.ThreadPool
как заменой.
Вот пример:
def foo(bar, baz):
print('hello {0}'.format(bar))
return 'foo' + baz
from multiprocessing.pool import ThreadPool
pool = ThreadPool(processes=1)
async_result = pool.apply_async(foo, ('world', 'foo')) # кортеж аргументов для foo
# выполняем какие-то другие операции в главном потоке
return_val = async_result.get() # получаем возвращаемое значение от вашей функции.
Этот код создает пул потоков и асинхронно вызывает функцию foo
, передавая ей аргументы. После этого в основном процессе можно выполнять дополнительные задачи, а затем получить результат вызова функции с помощью метода get()
.
Вот решение, которое не требует изменения вашего существующего кода:
import Queue # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar) # Python 2.x
#print('hello {0}'.format(bar)) # Python 3.x
return 'foo'
que = Queue.Queue() # Python 2.x
#que = Queue() # Python 3.x
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
t.join()
result = que.get()
print result # Python 2.x
#print(result) # Python 3.x
Это решение также можно легко адаптировать для многопоточной среды:
import Queue # Python 2.x
#from queue import Queue # Python 3.x
from threading import Thread
def foo(bar):
print 'hello {0}'.format(bar) # Python 2.x
#print('hello {0}'.format(bar)) # Python 3.x
return 'foo'
que = Queue.Queue() # Python 2.x
#que = Queue() # Python 3.x
threads_list = list()
t = Thread(target=lambda q, arg1: q.put(foo(arg1)), args=(que, 'world!'))
t.start()
threads_list.append(t)
# Здесь можно добавить больше потоков
...
threads_list.append(t2)
...
threads_list.append(t3)
...
# Дождаться завершения всех потоков
for t in threads_list:
t.join()
# Проверить значение, возвращаемое потоками
while not que.empty():
result = que.get()
print result # Python 2.x
#print(result) # Python 3.x
В этом коде реализовано создание потоков и использование очереди для получения результатов вычислений из разных потоков, что обеспечивает чистоту и безопасность данных в многопоточной среде.
Вот перевод вашего текста в стиле ответа на StackOverflow:
Я адаптировал ответ от Kindall и немного его улучшил.
Ключевым моментом является добавление *args
и **kwargs
в метод join()
, чтобы обеспечить обработку параметра timeout
.
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super(ThreadWithReturn, self).__init__(*args, **kwargs)
self._return = None
def run(self):
if self._Thread__target is not None:
self._return = self._Thread__target(*self._Thread__args, **self._Thread__kwargs)
def join(self, *args, **kwargs):
super(ThreadWithReturn, self).join(*args, **kwargs)
return self._return
ОБНОВЛЕННЫЙ ОТВЕТ НИЖЕ
Это мой самый популярный ответ, поэтому я решил обновить его, добавив код, который будет работать как на Python 2, так и на Python 3.
Кроме того, я заметил множество ответов на этот вопрос, показывающих нехватку понимания работы метода Thread.join()
. Многие из них полностью игнорируют параметр timeout
. Также существует особый случай, о котором вам следует знать, когда вы имеете (1) целевую функцию, которая может возвращать None
, и (2) вы также передаете параметр timeout
в join()
. Пожалуйста, смотрите "ТЕСТ 4", чтобы понять этот особый случай.
Вот класс ThreadWithReturn
, который работает как с Python 2, так и с Python 3:
import sys
from threading import Thread
from builtins import super # https://stackoverflow.com/a/30159479
_thread_target_key, _thread_args_key, _thread_kwargs_key = (
('_target', '_args', '_kwargs')
if sys.version_info >= (3, 0) else
('_Thread__target', '_Thread__args', '_Thread__kwargs')
)
class ThreadWithReturn(Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._return = None
def run(self):
target = getattr(self, _thread_target_key)
if target is not None:
self._return = target(
*getattr(self, _thread_args_key),
**getattr(self, _thread_kwargs_key)
)
def join(self, *args, **kwargs):
super().join(*args, **kwargs)
return self._return
Вот несколько тестов:
import time, random
# ТЕСТОВАЯ ЦЕЛЕВАЯ ФУНКЦИЯ
def giveMe(arg, seconds=None):
if seconds is not None:
time.sleep(seconds)
return arg
# ТЕСТ 1
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',))
my_thread.start()
returned = my_thread.join()
# (returned == 'stringy')
# ТЕСТ 2
my_thread = ThreadWithReturn(target=giveMe, args=(None,))
my_thread.start()
returned = my_thread.join()
# (returned is None)
# ТЕСТ 3
my_thread = ThreadWithReturn(target=giveMe, args=('stringy',), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=2)
# (returned is None) # потому что join() вышел по таймауту до завершения giveMe()
# ТЕСТ 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
Можете ли вы определить особый случай, который мы можем встретить в ТЕСТЕ 4?
Проблема в том, что мы ожидаем, что giveMe()
вернет None
(смотрите ТЕСТ 2), но также ожидаем, что join()
вернет None
, если он вышел по таймауту.
returned is None
означает либо:
(1) это то, что вернула giveMe()
, либо
(2) join()
вышел по таймауту.
Этот пример тривиален, так как мы знаем, что giveMe()
всегда вернет None
. Но в реальных случаях (где целевая функция может легитимно возвращать None
или что-то другое) нам следует явно проверить, что произошло.
Вот как можно решить этот особый случай:
# ТЕСТ 4
my_thread = ThreadWithReturn(target=giveMe, args=(None,), kwargs={'seconds': 5})
my_thread.start()
returned = my_thread.join(timeout=random.randint(1, 10))
if my_thread.isAlive():
# returned is None, потому что join() вышел по таймауту
# это также означает, что giveMe() все еще выполняется в фоновом режиме
pass
# обработайте логику в зависимости от вашей программы
else:
# join() завершился, и giveMe() тоже
# НО мы также можем находиться в состоянии гонки, так что нам нужно обновить returned на случай
returned = my_thread.join()
Если у вас есть дополнительные вопросы или требуется помощь, не стесняйтесь спрашивать!
Ваш код на Python использует многопоточность и очередь для вычисления квадратов чисел в заданном массиве. Давайте разберем его подробнее:
Импортируем необходимые модули: Вы импортируете
threading
для работы с потоками иqueue
для создания очереди, в которую вы будете помещать результаты.Определяем функцию
calc_square
: Эта функция принимает список чисел и очередь, в которую она будет помещать результаты. В цикле она вычисляет квадрат каждого числа и добавляет его в локальный списокl
.Создаем массив
arr
: Этот массив содержит числа от 1 до 10, для которых вы будете вычислять квадраты.Создаем очередь
out_queue1
: Это экземпляр классаQueue
, куда будет помещен результат вычислений.Создаем и запускаем поток: Вы создаете объект
Thread
, передавая ему целевую функциюcalc_square
и аргументы (массив и очередь). Затем вы запускаете поток с помощьюstart()
.Ожидаем завершения потока: Вы используете
join()
, чтобы дождаться завершения потока перед продолжением выполнения основной программы.Получаем и выводим результат: После завершения потока вызывается
get()
на очереди, чтобы получить список квадратов и вывести его на экран.
Вот код с комментариями для лучшего понимания:
import threading
import queue
# Функция для вычисления квадратов чисел
def calc_square(num, out_queue1):
l = [] # Локальный список для хранения квадратов
for x in num:
l.append(x*x) # Вычисляем квадрат и добавляем в список
out_queue1.put(l) # Помещаем результат в очередь
# Исходный массив чисел
arr = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
out_queue1 = queue.Queue() # Создаем очередь для передачи результата
t1 = threading.Thread(target=calc_square, args=(arr, out_queue1)) # Создаем поток
t1.start() # Запускаем поток
t1.join() # Ждем завершения потока
print(out_queue1.get()) # Получаем и выводим результат из очереди
Примечания:
- Использование очередей в многопоточных приложениях обеспечивает безопасность передачи данных между потоками.
- Вы можете добавить больше потоков для параллельной обработки массивов, что позволит вам использовать ресурсы системы более эффективно.
Как завершить поток в Java?
Как получить имя функции в виде строки?
Каковы преимущества использования лямбд? [закрыто]
Как применить функцию к двум столбцам DataFrame в Pandas
Как получить исходный код функции Python?