Python выборка с помощью генератора / итератора / итерируемого объекта
Описание проблемы на StackOverflow
У меня возникла проблема с использованием функции random.sample
в Python для генератора. Я пытаюсь получить случайную выборку из очень большого текстового корпуса, но встречаю следующую ошибку:
TypeError: object of type 'generator' has no len()
Я думал, что можно решить эту задачу с помощью модуля itertools
, но пока не смог найти подходящее решение. Привожу немного вымышленный пример кода:
import random
def list_item(ls):
for item in ls:
yield item
random.sample(list_item(range(100)), 20)
Обновление
По просьбе MartinPieters
я провел замеры времени выполнения трех предложенных методов. Результаты следующие:
Выборка 1000 из 10000:
Используя iterSample: 0.0163 с
Используя sample_from_iterable: 0.0098 с
Используя iter_sample_fast: 0.0148 с
Выборка 10000 из 100000:
Используя iterSample: 0.1786 с
Используя sample_from_iterable: 0.1320 с
Используя iter_sample_fast: 0.1576 с
Выборка 100000 из 1000000:
Используя iterSample: 3.2740 с
Используя sample_from_iterable: 1.9860 с
Используя iter_sample_fast: 1.4586 с
Выборка 200000 из 1000000:
Используя iterSample: 7.6115 с
Используя sample_from_iterable: 3.0663 с
Используя iter_sample_fast: 1.4101 с
Выборка 500000 из 1000000:
Используя iterSample: 39.2595 с
Используя sample_from_iterable: 4.9994 с
Используя iter_sample_fast: 1.2178 с
Выборка 2000000 из 5000000:
Используя iterSample: 798.8016 с
Используя sample_from_iterable: 28.6618 с
Используя iter_sample_fast: 6.6482 с
Как оказалось, метод array.insert
имеет серьезный недочет при работе с большими размерами выборки. Ниже привожу код, который я использовал для замеров времени выполнения:
from heapq import nlargest
import random
import timeit
def iterSample(iterable, samplesize):
results = []
for i, v in enumerate(iterable):
r = random.randint(0, i)
if r < samplesize:
if i < samplesize:
results.insert(r, v) # добавляем первые items в случайном порядке
else:
results[r] = v # с уменьшающейся вероятностью заменяем случайные элементы
if len(results) < samplesize:
raise ValueError("Sample larger than population.")
return results
def sample_from_iterable(iterable, samplesize):
return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))
def iter_sample_fast(iterable, samplesize):
results = []
iterator = iter(iterable)
# Заполняем первые samplesize элементов:
for _ in range(samplesize):
results.append(next(iterator))
random.shuffle(results) # перемешиваем их
for i, v in enumerate(iterator, samplesize):
r = random.randint(0, i)
if r < samplesize:
results[r] = v # с уменьшающейся вероятностью заменяем случайные элементы
if len(results) < samplesize:
raise ValueError("Sample larger than population.")
return results
if __name__ == '__main__':
pop_sizes = [int(10e+3), int(10e+4), int(10e+5), int(10e+5), int(10e+5), int(10e+5) * 5]
k_sizes = [int(10e+2), int(10e+3), int(10e+4), int(10e+4) * 2, int(10e+4) * 5, int(10e+5) * 2]
for pop_size, k_size in zip(pop_sizes, k_sizes):
pop = range(pop_size)
k = k_size
t1 = timeit.Timer(stmt='iterSample(pop, %i)' % (k_size), setup='from __main__ import iterSample,pop')
t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)' % (k_size), setup='from __main__ import sample_from_iterable,pop')
t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)' % (k_size), setup='from __main__ import iter_sample_fast,pop')
print('Выборка', k, 'из', pop_size)
print('Используя iterSample', '%1.4f с' % (t1.timeit(number=100) / 100.0))
print('Используя sample_from_iterable', '%1.4f с' % (t2.timeit(number=100) / 100.0))
print('Используя iter_sample_fast', '%1.4f с' % (t3.timeit(number=100) / 100.0))
print('')
Я также проверил, что все методы действительно производят непредвзятую выборку из генератора, проводя выборку 1000
элементов из 10000
, 100000
раз и вычисляя среднюю частоту появления каждого элемента в популяции, которая оказалась равной ~0.1
, как и следовало ожидать для всех трех методов.
5 ответ(ов)
Ответ на вопрос:
Хотя ответ Мартина Питерса правильный, его решение замедляется при больших значениях samplesize
, так как использование list.insert
в цикле может привести к квадратичной сложности.
Вот альтернатива, которая, на мой взгляд, сохраняет равномерность выборки и при этом повышает производительность:
def iter_sample_fast(iterable, samplesize):
results = []
iterator = iter(iterable)
# Заполнение первых samplesize элементов:
try:
for _ in range(samplesize):
results.append(next(iterator))
except StopIteration:
raise ValueError("Размер выборки больше, чем популяция.")
random.shuffle(results) # Перемешивание позиций
for i, v in enumerate(iterator, samplesize):
r = random.randint(0, i)
if r < samplesize:
results[r] = v # С уменьшающейся вероятностью заменяем случайные элементы
return results
Разница начинает проявляться при значениях samplesize
, превышающих 10000
. Время выполнения для вызовов с параметрами (1000000, 100000)
:
- iterSample: 5.05с
- iter_sample_fast: 2.64с
Таким образом, iter_sample_fast
показывает значительно лучшую производительность с увеличением размера выборки.
Вот однострочное решение, которое выбирает k элементов без замены из n элементов с временной сложностью O(n lg k):
from heapq import nlargest
import random
def sample_from_iterable(it, k):
return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))
Этот код использует функцию nlargest
из модуля heapq
, чтобы получить k элементов с наибольшим значением случайного числа, сгенерированного для каждого элемента из итератора it
. В результате вы получите выборку из k элементов, при этом каждый элемент выбирается случайно и без повторений.
Если размер популяции n известен, вот пример эффективного по памяти кода, который проходит по генератору, извлекая только целевые образцы:
from random import sample
from itertools import count, compress
targets = set(sample(range(n), k=10))
for selection in compress(pop, map(targets.__contains__, count())):
print(selection)
Этот код выводит выборки в том порядке, в котором они производятся генератором популяции.
Суть техники заключается в использовании стандартной библиотеки random.sample() для случайного выбора индексов целевых образцов. Вторая строка проверяет, находится ли данный индекс среди целевых, и если да, возвращает соответствующее значение из генератора.
Например, если целевые индексы — это {6, 2, 4}
:
0 1 2 3 4 5 6 7 8 9 10 ... вывод count()
F F T F T F T F F F F ... является ли индекс в целях?
A B C D E F G H I J K ... выходные данные генератора популяции
- - C - E - G - - - - ... выборки, сгенерированные compress
Эта техника подходит для перебора корпуса, слишком большого, чтобы поместиться в памяти (в противном случае можно было бы просто использовать sample() непосредственно на популяции).
Если количество элементов в итераторе известно (например, было посчитано ранее), вы можете использовать следующий подход:
def iter_sample(iterable, iterlen, samplesize):
if iterlen < samplesize:
raise ValueError("Размер выборки больше, чем популяция.")
indexes = set()
while len(indexes) < samplesize:
indexes.add(random.randint(0, iterlen))
indexesiter = iter(sorted(indexes))
current = next(indexesiter)
ret = []
for i, item in enumerate(iterable):
if i == current:
ret.append(item)
try:
current = next(indexesiter)
except StopIteration:
break
random.shuffle(ret)
return ret
Лично я нахожу этот метод более быстрым, особенно когда размер выборки (samplesize) невелик по сравнению с общим количеством элементов (iterlen). Однако, если запрашивается вся выборка или близкая к ней, возникают определенные проблемы.
Вот результаты времени выполнения для iter_sample
и его оптимизированной версии iter_sample_fast
для различных размеров выборки:
iter_sample (iterlen=10000, samplesize=100)
время: (1, 'ms')iter_sample_fast (iterlen=10000, samplesize=100)
время: (15, 'ms')iter_sample (iterlen=1000000, samplesize=100)
время: (65, 'ms')iter_sample_fast (iterlen=1000000, samplesize=100)
время: (1477, 'ms')iter_sample (iterlen=1000000, samplesize=1000)
время: (64, 'ms')iter_sample_fast (iterlen=1000000, samplesize=1000)
время: (1459, 'ms')iter_sample (iterlen=1000000, samplesize=10000)
время: (86, 'ms')iter_sample_fast (iterlen=1000000, samplesize=10000)
время: (1480, 'ms')iter_sample (iterlen=1000000, samplesize=100000)
время: (388, 'ms')iter_sample_fast (iterlen=1000000, samplesize=100000)
время: (1521, 'ms')iter_sample (iterlen=1000000, samplesize=1000000)
время: (25359, 'ms')iter_sample_fast (iterlen=1000000, samplesize=1000000)
время: (2178, 'ms')
Как можно заметить, подход iter_sample
показывает значительно лучшее время выполнения при меньших размерах выборки в сравнении с полным набором данных. При увеличении размера выборки эффективность метода несколько ухудшается.
Ваш код для выборки элементов из генератора с заданным размером выборки и длиной итерации действительно кажется оптимальным, особенно когда у вас есть представление о длине генератора. Вот перевод ответа с StackOverflow:
Вы предложили отличный подход для выборки элементов из генератора, основываясь на том, насколько вы знаете длину генератора. Ваш метод использует случайное распределение для выбора элементов, и результаты действительно показывают его эффективность как для небольших, так и для больших итерабельных объектов.
Вот краткий обзор вашего кода и его эффективности:
def gen_sample(generator_list, sample_size, iterlen):
num = 0
inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen)
results = []
iterator = iter(generator_list)
gotten = 0
while gotten < sample_size:
try:
b = iterator.next()
if inds[num]:
results.append(b)
gotten += 1
num += 1
except:
num = 0
iterator = iter(generator_list)
inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen)
return results
Ваш метод сохраняет свою скорость как для небольших (например, xrange(10000)
) выборок, так и для очень больших (например, xrange(5000000)
), что описано в ваших тестах времени выполнения:
# Huge
res = gen_sample(xrange(5000000), 200000, 5000000)
timing: 1.22s
# Small
z = gen_sample(xrange(10000), 1000, 10000)
timing: 0.000441
Это действительно впечатляющие результаты и подтверждает, что ваш метод может быть одним из самых быстрых, пока он не будет опровергнут другими подходами!
Если у вас есть дополнительные идеи по улучшению кода или советы по его применению в других сценариях, было бы интересно их услышать!
Как случайно выбрать элемент из списка?
Генерация случайной строки с заглавными буквами и цифрами
Что делает yield без значения в контекстном менеджере?
Как задать верхние и нижние границы при использовании numpy.random.normal
Как работает это выражение с лямбдой/yield/генератором?