0

Python выборка с помощью генератора / итератора / итерируемого объекта

17

Описание проблемы на StackOverflow

У меня возникла проблема с использованием функции random.sample в Python для генератора. Я пытаюсь получить случайную выборку из очень большого текстового корпуса, но встречаю следующую ошибку:

TypeError: object of type 'generator' has no len()

Я думал, что можно решить эту задачу с помощью модуля itertools, но пока не смог найти подходящее решение. Привожу немного вымышленный пример кода:

import random

def list_item(ls):
    for item in ls:
        yield item

random.sample(list_item(range(100)), 20)

Обновление

По просьбе MartinPieters я провел замеры времени выполнения трех предложенных методов. Результаты следующие:

Выборка 1000 из 10000:
Используя iterSample: 0.0163 с
Используя sample_from_iterable: 0.0098 с
Используя iter_sample_fast: 0.0148 с

Выборка 10000 из 100000:
Используя iterSample: 0.1786 с
Используя sample_from_iterable: 0.1320 с
Используя iter_sample_fast: 0.1576 с

Выборка 100000 из 1000000:
Используя iterSample: 3.2740 с
Используя sample_from_iterable: 1.9860 с
Используя iter_sample_fast: 1.4586 с

Выборка 200000 из 1000000:
Используя iterSample: 7.6115 с
Используя sample_from_iterable: 3.0663 с
Используя iter_sample_fast: 1.4101 с

Выборка 500000 из 1000000:
Используя iterSample: 39.2595 с
Используя sample_from_iterable: 4.9994 с
Используя iter_sample_fast: 1.2178 с

Выборка 2000000 из 5000000:
Используя iterSample: 798.8016 с
Используя sample_from_iterable: 28.6618 с
Используя iter_sample_fast: 6.6482 с

Как оказалось, метод array.insert имеет серьезный недочет при работе с большими размерами выборки. Ниже привожу код, который я использовал для замеров времени выполнения:

from heapq import nlargest
import random
import timeit

def iterSample(iterable, samplesize):
    results = []
    for i, v in enumerate(iterable):
        r = random.randint(0, i)
        if r < samplesize:
            if i < samplesize:
                results.insert(r, v)  # добавляем первые items в случайном порядке
            else:
                results[r] = v  # с уменьшающейся вероятностью заменяем случайные элементы

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")

    return results

def sample_from_iterable(iterable, samplesize):
    return (x for _, x in nlargest(samplesize, ((random.random(), x) for x in iterable)))

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Заполняем первые samplesize элементов:
    for _ in range(samplesize):
        results.append(next(iterator))
    random.shuffle(results)  # перемешиваем их
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # с уменьшающейся вероятностью заменяем случайные элементы

    if len(results) < samplesize:
        raise ValueError("Sample larger than population.")
    return results

if __name__ == '__main__':
    pop_sizes = [int(10e+3), int(10e+4), int(10e+5), int(10e+5), int(10e+5), int(10e+5) * 5]
    k_sizes = [int(10e+2), int(10e+3), int(10e+4), int(10e+4) * 2, int(10e+4) * 5, int(10e+5) * 2]

    for pop_size, k_size in zip(pop_sizes, k_sizes):
        pop = range(pop_size)
        k = k_size
        t1 = timeit.Timer(stmt='iterSample(pop, %i)' % (k_size), setup='from __main__ import iterSample,pop')
        t2 = timeit.Timer(stmt='sample_from_iterable(pop, %i)' % (k_size), setup='from __main__ import sample_from_iterable,pop')
        t3 = timeit.Timer(stmt='iter_sample_fast(pop, %i)' % (k_size), setup='from __main__ import iter_sample_fast,pop')

        print('Выборка', k, 'из', pop_size)
        print('Используя iterSample', '%1.4f с' % (t1.timeit(number=100) / 100.0))
        print('Используя sample_from_iterable', '%1.4f с' % (t2.timeit(number=100) / 100.0))
        print('Используя iter_sample_fast', '%1.4f с' % (t3.timeit(number=100) / 100.0))
        print('')

Я также проверил, что все методы действительно производят непредвзятую выборку из генератора, проводя выборку 1000 элементов из 10000, 100000 раз и вычисляя среднюю частоту появления каждого элемента в популяции, которая оказалась равной ~0.1, как и следовало ожидать для всех трех методов.

5 ответ(ов)

0

Ответ на вопрос:

Хотя ответ Мартина Питерса правильный, его решение замедляется при больших значениях samplesize, так как использование list.insert в цикле может привести к квадратичной сложности.

Вот альтернатива, которая, на мой взгляд, сохраняет равномерность выборки и при этом повышает производительность:

def iter_sample_fast(iterable, samplesize):
    results = []
    iterator = iter(iterable)
    # Заполнение первых samplesize элементов:
    try:
        for _ in range(samplesize):
            results.append(next(iterator))
    except StopIteration:
        raise ValueError("Размер выборки больше, чем популяция.")
    random.shuffle(results)  # Перемешивание позиций
    for i, v in enumerate(iterator, samplesize):
        r = random.randint(0, i)
        if r < samplesize:
            results[r] = v  # С уменьшающейся вероятностью заменяем случайные элементы
    return results

Разница начинает проявляться при значениях samplesize, превышающих 10000. Время выполнения для вызовов с параметрами (1000000, 100000):

  • iterSample: 5.05с
  • iter_sample_fast: 2.64с

Таким образом, iter_sample_fast показывает значительно лучшую производительность с увеличением размера выборки.

0

Вот однострочное решение, которое выбирает k элементов без замены из n элементов с временной сложностью O(n lg k):

from heapq import nlargest
import random

def sample_from_iterable(it, k):
    return (x for _, x in nlargest(k, ((random.random(), x) for x in it)))

Этот код использует функцию nlargest из модуля heapq, чтобы получить k элементов с наибольшим значением случайного числа, сгенерированного для каждого элемента из итератора it. В результате вы получите выборку из k элементов, при этом каждый элемент выбирается случайно и без повторений.

0

Если размер популяции n известен, вот пример эффективного по памяти кода, который проходит по генератору, извлекая только целевые образцы:

from random import sample
from itertools import count, compress

targets = set(sample(range(n), k=10))
for selection in compress(pop, map(targets.__contains__, count())):
    print(selection)

Этот код выводит выборки в том порядке, в котором они производятся генератором популяции.

Суть техники заключается в использовании стандартной библиотеки random.sample() для случайного выбора индексов целевых образцов. Вторая строка проверяет, находится ли данный индекс среди целевых, и если да, возвращает соответствующее значение из генератора.

Например, если целевые индексы — это {6, 2, 4}:

0  1  2  3  4  5  6  7  8  9  10   ...  вывод count()
F  F  T  F  T  F  T  F  F  F  F    ...  является ли индекс в целях?
A  B  C  D  E  F  G  H  I  J  K    ...  выходные данные генератора популяции
-  -  C  -  E  -  G  -  -  -  -    ...  выборки, сгенерированные compress

Эта техника подходит для перебора корпуса, слишком большого, чтобы поместиться в памяти (в противном случае можно было бы просто использовать sample() непосредственно на популяции).

0

Если количество элементов в итераторе известно (например, было посчитано ранее), вы можете использовать следующий подход:

def iter_sample(iterable, iterlen, samplesize):
    if iterlen < samplesize:
        raise ValueError("Размер выборки больше, чем популяция.")
    indexes = set()
    while len(indexes) < samplesize:
        indexes.add(random.randint(0, iterlen))
    indexesiter = iter(sorted(indexes))
    current = next(indexesiter)
    ret = []
    for i, item in enumerate(iterable):
        if i == current:
            ret.append(item)
            try:
                current = next(indexesiter)
            except StopIteration:
                break
    random.shuffle(ret)
    return ret

Лично я нахожу этот метод более быстрым, особенно когда размер выборки (samplesize) невелик по сравнению с общим количеством элементов (iterlen). Однако, если запрашивается вся выборка или близкая к ней, возникают определенные проблемы.

Вот результаты времени выполнения для iter_sample и его оптимизированной версии iter_sample_fast для различных размеров выборки:

  • iter_sample (iterlen=10000, samplesize=100) время: (1, 'ms')

  • iter_sample_fast (iterlen=10000, samplesize=100) время: (15, 'ms')

  • iter_sample (iterlen=1000000, samplesize=100) время: (65, 'ms')

  • iter_sample_fast (iterlen=1000000, samplesize=100) время: (1477, 'ms')

  • iter_sample (iterlen=1000000, samplesize=1000) время: (64, 'ms')

  • iter_sample_fast (iterlen=1000000, samplesize=1000) время: (1459, 'ms')

  • iter_sample (iterlen=1000000, samplesize=10000) время: (86, 'ms')

  • iter_sample_fast (iterlen=1000000, samplesize=10000) время: (1480, 'ms')

  • iter_sample (iterlen=1000000, samplesize=100000) время: (388, 'ms')

  • iter_sample_fast (iterlen=1000000, samplesize=100000) время: (1521, 'ms')

  • iter_sample (iterlen=1000000, samplesize=1000000) время: (25359, 'ms')

  • iter_sample_fast (iterlen=1000000, samplesize=1000000) время: (2178, 'ms')

Как можно заметить, подход iter_sample показывает значительно лучшее время выполнения при меньших размерах выборки в сравнении с полным набором данных. При увеличении размера выборки эффективность метода несколько ухудшается.

0

Ваш код для выборки элементов из генератора с заданным размером выборки и длиной итерации действительно кажется оптимальным, особенно когда у вас есть представление о длине генератора. Вот перевод ответа с StackOverflow:


Вы предложили отличный подход для выборки элементов из генератора, основываясь на том, насколько вы знаете длину генератора. Ваш метод использует случайное распределение для выбора элементов, и результаты действительно показывают его эффективность как для небольших, так и для больших итерабельных объектов.

Вот краткий обзор вашего кода и его эффективности:

def gen_sample(generator_list, sample_size, iterlen):
    num = 0
    inds = numpy.random.random(iterlen) <= (sample_size * 1.0 / iterlen)
    results = []
    iterator = iter(generator_list)
    gotten = 0
    while gotten < sample_size: 
        try:
            b = iterator.next()
            if inds[num]: 
                results.append(b)
                gotten += 1
            num += 1    
        except: 
            num = 0
            iterator = iter(generator_list)
            inds = numpy.random.random(iterlen) <= ((sample_size - gotten) * 1.0 / iterlen)
    return results

Ваш метод сохраняет свою скорость как для небольших (например, xrange(10000)) выборок, так и для очень больших (например, xrange(5000000)), что описано в ваших тестах времени выполнения:

# Huge
res = gen_sample(xrange(5000000), 200000, 5000000)
timing: 1.22s

# Small
z = gen_sample(xrange(10000), 1000, 10000) 
timing: 0.000441    

Это действительно впечатляющие результаты и подтверждает, что ваш метод может быть одним из самых быстрых, пока он не будет опровергнут другими подходами!

Если у вас есть дополнительные идеи по улучшению кода или советы по его применению в других сценариях, было бы интересно их услышать!

Чтобы ответить на вопрос, пожалуйста, войдите или зарегистрируйтесь