How To Spawn Future Only If Free Worker Is Available
I am trying to send information extracted from lines of a big file to a process running on some server. To speed this up, I would like to do this with some threads in parallel. Usi
Solution 1:
You could iterate over chunks of the file using
for chunk in zip(*[f]*chunksize):
(This is an application of the grouper recipe, which collects items from the iterator f
into groups of size chunksize
. Note: This does not consume the entire file at once since zip
returns an iterator in Python3.)
import concurrent.futures as CF
import itertools as IT
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
defworker(line):
line = line.strip()
logger.info(line)
chunksize = 1024with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f:
for chunk inzip(*[f]*chunksize):
futures = [executor.submit(worker, line) for line in chunk]
# wait for these futures to complete before processing another chunk
CF.wait(futures)
Now, in the comments you rightly point out that this is not optimal. There could be some worker which takes a long time, and holds up a whole chunk of jobs.
Usually, if each call to worker takes roughly the same amount of time then this is not a big deal. However, here is a way to advance the filehandle on-demand. It uses a threading.Condition
to notify the sprinkler
to advance the filehandle.
import logging
import threading
import Queue
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.DEBUG,
format='[%(asctime)s %(threadName)s] %(message)s',
datefmt='%H:%M:%S')
SENTINEL = object()
defworker(cond, queue):
for line initer(queue.get, SENTINEL):
line = line.strip()
logger.info(line)
with cond:
cond.notify()
logger.info('notify')
defsprinkler(cond, queue, num_workers):
withopen("big_file") as f:
for line in f:
logger.info('advancing filehandle')
with cond:
queue.put(line)
logger.info('waiting')
cond.wait()
for _ inrange(num_workers):
queue.put(SENTINEL)
num_workers = 4
cond = threading.Condition()
queue = Queue.Queue()
t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers])
t.start()
threads = [threading.Thread(target=worker, args=[cond, queue])]
for t in threads:
t.start()
for t in threads:
t.join()
Post a Comment for "How To Spawn Future Only If Free Worker Is Available"