10

Как завершить поток в Java?

14

Можно ли завершить работающий поток, не устанавливая и не проверяя какие-либо флаги, семафоры и т.д.?

5 ответ(ов)

1

На данный момент нет официального API для этого.

Вам нужно использовать платформенный API для завершения потока, например, pthread_kill или TerminateThread. Доступ к такому API можно получить, например, через pythonwin или с использованием модуля ctypes.

Обратите внимание, что это inherently небезопасно. Это может привести к образованию не собираемого мусора (из локальных переменных стековых фреймов, которые становятся мусором) и может вызвать взаимные блокировки, если завершенный поток владел GIL в момент его завершения.

0

Как уже упоминали другие, обычная практика - устанавливать флаг остановки потока. Для легковесного решения (без создания подклассов Thread и без глобальных переменных) можно использовать лямбда-обратный вызов. (Обратите внимание на скобки в if stop().)

Вот пример кода:

import threading
import time

def do_work(id, stop):
    print("Я поток", id)
    while True:
        print("Я поток {} делаю что-то".format(id))
        if stop():
            print("  Выход из цикла.")
            break
    print("Поток {}, завершаю работу".format(id))

def main():
    stop_threads = False
    workers = []
    for id in range(0, 3):
        tmp = threading.Thread(target=do_work, args=(id, lambda: stop_threads))
        workers.append(tmp)
        tmp.start()
    time.sleep(3)
    print('main: закончили спать; время остановить потоки.')
    stop_threads = True
    for worker in workers:
        worker.join()
    print('Конец.')

if __name__ == '__main__':
    main()

Замена print() на функцию pr(), которая всегда сбрасывает буфер (sys.stdout.flush()), может повысить точность вывода в консоль.

(Тестировалось только на Windows/Eclipse/Python 3.3.)

0

Ни в коем случае не следует forcibly убивать поток без его согласия.

Принудительное завершение потока нарушает любые гарантии, установленные блоками try/finally, что может привести к тому, что замки останутся заблокированными, файлы останутся открытыми и т.д.

Единственное оправдание для принудительного завершения потоков - это необходимость быстрой остановки программы, но никогда не стоит убивать отдельные потоки.

0

Если вы явно вызываете time.sleep() в вашем потоке (например, для опроса какого-либо внешнего сервиса), улучшением метода Филиппа будет использование таймаута в методе wait() события, вместо того чтобы использовать sleep().

Вот пример:

import threading

class KillableThread(threading.Thread):
    def __init__(self, sleep_interval=1):
        super().__init__()
        self._kill = threading.Event()
        self._interval = sleep_interval

    def run(self):
        while True:
            print("Do Something")

            # Если сигнал остановки не установлен, "спим" в течение интервала,
            # Если сигнал остановки приходит во время сна, мы сразу
            # пробуждаемся и обрабатываем его
            is_killed = self._kill.wait(self._interval)
            if is_killed:
                break

        print("Killing Thread")

    def kill(self):
        self._kill.set()

Для его запуска используйте следующий код:

t = KillableThread(sleep_interval=5)
t.start()
# Каждые 5 секунд выводится:
# : Do Something
t.kill()
# : Killing Thread

Преимущество использования wait() вместо sleep() с регулярной проверкой события заключается в том, что вы можете задавать более длительные интервалы сна, при этом поток будет остановлен почти мгновенно (в то время как вы иначе "спали") и, на мой взгляд, код для обработки завершения становится значительно проще.

0

Да, можно реализовать метод Thread.stop, как показано в следующем примере кода:

import sys
import threading
import time

class StopThread(StopIteration):
    pass

threading.SystemExit = SystemExit, StopThread

class Thread2(threading.Thread):

    def stop(self):
        self.__stop = True

    def _bootstrap(self):
        if threading._trace_hook is not None:
            raise ValueError('Нельзя запустить поток с трассировкой!')
        self.__stop = False
        sys.settrace(self.__trace)
        super()._bootstrap()

    def __trace(self, frame, event, arg):
        if self.__stop:
            raise StopThread()
        return self.__trace

class Thread3(threading.Thread):

    def _bootstrap(self, stop_thread=False):
        def stop():
            nonlocal stop_thread
            stop_thread = True
        self.stop = stop

        def tracer(*_):
            if stop_thread:
                raise StopThread()
            return tracer
        sys.settrace(tracer)
        super()._bootstrap()

###############################################################################

def main():
    test1 = Thread2(target=printer)
    test1.start()
    time.sleep(1)
    test1.stop()
    test1.join()
    test2 = Thread2(target=speed_test)
    test2.start()
    time.sleep(1)
    test2.stop()
    test2.join()
    test3 = Thread3(target=speed_test)
    test3.start()
    time.sleep(1)
    test3.stop()
    test3.join()

def printer():
    while True:
        print(time.time() % 1)
        time.sleep(0.1)

def speed_test(count=0):
    try:
        while True:
            count += 1
    except StopThread:
        print('Count =', count)

if __name__ == '__main__':
    main()

Класс Thread3 работает примерно на 33% быстрее, чем класс Thread2.


Дополнение:

При достаточном знании C API Python и использовании модуля ctypes можно написать намного более эффективный способ остановки потока по мере необходимости. Проблема использования sys.settrace заключается в том, что функция трассировки запускается после каждой инструкции. Если вместо этого будет вызвано асинхронное исключение в потоке, который нужно прервать, не будет накладных расходов на скорость выполнения. Следующий код предоставляет гибкость в этом отношении:

#! /usr/bin/env python3
import _thread
import ctypes as _ctypes
import threading as _threading

_PyThreadState_SetAsyncExc = _ctypes.pythonapi.PyThreadState_SetAsyncExc
# noinspection SpellCheckingInspection
_PyThreadState_SetAsyncExc.argtypes = _ctypes.c_ulong, _ctypes.py_object
_PyThreadState_SetAsyncExc.restype = _ctypes.c_int

if __debug__:
    def _set_async_exc(id, exc):
        if not isinstance(id, int):
            raise TypeError(f'{id!r} не является экземпляром int')
        if not isinstance(exc, type):
            raise TypeError(f'{exc!r} не является экземпляром type')
        if not issubclass(exc, BaseException):
            raise SystemError(f'{exc!r} не является подклассом BaseException')
        return _PyThreadState_SetAsyncExc(id, exc)
else:
    _set_async_exc = _PyThreadState_SetAsyncExc

def set_async_exc(id, exc, *args):
    if args:
        class StateInfo(exc):
            def __init__(self):
                super().__init__(*args)

        return _set_async_exc(id, StateInfo)
    return _set_async_exc(id, exc)

def interrupt(ident=None):
    if ident is None:
        _thread.interrupt_main()
    else:
        set_async_exc(ident, KeyboardInterrupt)

def exit(ident=None):
    if ident is None:
        _thread.exit()
    else:
        set_async_exc(ident, SystemExit)

class ThreadAbortException(SystemExit):
    pass

class Thread(_threading.Thread):
    def set_async_exc(self, exc, *args):
        return set_async_exc(self.ident, exc, *args)

    def interrupt(self):
        self.set_async_exc(KeyboardInterrupt)

    def exit(self):
        self.set_async_exc(SystemExit)

    def abort(self, *args):
        self.set_async_exc(ThreadAbortException, *args)

Этот подход предоставляет более эффективный способ прерывания потоков в Python.

Чтобы ответить на вопрос, пожалуйста, войдите или зарегистрируйтесь