Skip to content Skip to sidebar Skip to footer

How To Spawn Future Only If Free Worker Is Available

I am trying to send information extracted from lines of a big file to a process running on some server. To speed this up, I would like to do this with some threads in parallel. Usi

Solution 1:

You could iterate over chunks of the file using

for chunk in zip(*[f]*chunksize):

(This is an application of the grouper recipe, which collects items from the iterator f into groups of size chunksize. Note: This does not consume the entire file at once since zip returns an iterator in Python3.)


import concurrent.futures as CF
import itertools as IT
import logging

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')

defworker(line):
    line = line.strip()
    logger.info(line)

chunksize = 1024with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
    for chunk inzip(*[f]*chunksize):
        futures = [executor.submit(worker, line) for line in chunk]
        # wait for these futures to complete before processing another chunk
        CF.wait(futures)

Now, in the comments you rightly point out that this is not optimal. There could be some worker which takes a long time, and holds up a whole chunk of jobs.

Usually, if each call to worker takes roughly the same amount of time then this is not a big deal. However, here is a way to advance the filehandle on-demand. It uses a threading.Condition to notify the sprinkler to advance the filehandle.

import logging
import threading
import Queue

logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
                    format='[%(asctime)s %(threadName)s] %(message)s',
                    datefmt='%H:%M:%S')
SENTINEL = object()

defworker(cond, queue):
    for line initer(queue.get, SENTINEL):
        line = line.strip()
        logger.info(line)
        with cond:
            cond.notify()
            logger.info('notify')

defsprinkler(cond, queue, num_workers):
    withopen("big_file") as f:
        for line in f:
            logger.info('advancing filehandle') 
            with cond:
                queue.put(line)
                logger.info('waiting')
                cond.wait()
        for _ inrange(num_workers):
            queue.put(SENTINEL)

num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()

threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
    t.start()
for t in threads:
    t.join()

Post a Comment for "How To Spawn Future Only If Free Worker Is Available"