Amy Rose (d75ddb6d) at 21 Jul 17:06
Amy Rose (03fb568d) at 21 Jul 17:06
Merge branch 'parallel-jobs' into 'main'
... and 27 more commits
Coordinator can only currently support running one job at once. Given that jobs can take a very long time to process, it is probably better to try to run several at once.
This MR implements parallel job execution for coordinator. The previous batching and queueing system has been completely replaced and simplified with a first-come-first-serve + priority system on a per-job basis.
This feature adds the MAX_JOBS
environment variable for configuration, with a default value of 5.
Resolves #6
REMAINING TO DO:
LGTM!
Amy Rose (d75ddb6d) at 21 Jul 16:53
use python resource acquisition for locks, add unsafe methods to Cu...
Resolved in 448772ba with custom subclassed PriorityQueue
Amy Rose (448772ba) at 21 Jul 03:29
add custom PriorityQueue to specify sorting key instead of using co...
Amy Rose (333f95df) at 20 Jul 18:59
fix MAX_JOBS_OVERRIDE mistake
Amy Rose (4d8f3862) at 20 Jul 18:58
add friendlier error messages for MAX_JOBS, add upper limit with ov...
... and 1 more commit
Oooh both of those seem like great ideas! I'm partial to the second, but I think either could make sense
That makes sense, but I want to run that through stress testing before committing it so I'll get back to you on that
Edit: it immediately breaks because we have Processor threads blocked waiting on next_job()
, so if we have them blocking on JobQueue's mutex then nothing else can use it until they all release it, including for essential functions like jobs_for_tag()
. it would also mean that jobs couldn't be marked as finished unless all threads were active running jobs
Oops, I thought I sent an email reply to this, but I guess GL didn't process it. I just said that that sounds like a great idea, and I think either of those solutions would work very well. I'm partial to the second, but either case seems good!
Were I implementing it, I'd probably say that the tuple-based solution was neater and go with that, but since you're implementing it, if you'd rather subclass PQ you definitely can. It'd definitely be a handy data structure to have on hand
Yes. Although given how other comments resolved, we could probably just use self.mutex
. PQ's mutex protects its internal state, but not the state of objects that it's used in. Our state (PQ + in_progress) has the implicit guarantee that a job is in one or the other list until it's finished or errors. The period brief period of time where the job is being moved between them breaks that guarantee, even if it doesn't break the internal state of the PQ.
That could manifest in something, e.g. a client not seeing a job as running even though it is, and thus signaling to the client that the graphs are done rendering when they aren't. This would also look like a new job appeared out of nowhere when the page was refreshed.
Also worth mentioning that the @total_ordering
isn't actually needed at all. I just tested it as-is but without the @total_ordering
and the priority sorting test still passes.
The misuse of @total_ordering
is a valid point, but I feel like the real ideal solution here would be for PriorityQueue to allow you to specify a key for sorting objects, like how nearly all other sorting-related Python functions let you do. What would you thoughts be on instead subclassing PriorityQueue to implement that? I have to admit I'm a bit averse to the tuple solution because it uglifies a lot of otherwise clean code.
The reason I didn't explicitly acquire pqueue
's mutex for the pqueue.get()
is that Queue classes (of which PriorityQueue extends) are already internally locking and thread-safe. submit_job()
has no mutex acquisitions for the same reason. The only reason I think we ever need to explicitly acquire the pqueue
's mutex is when we're poking at its internal list directly, like in jobs_for_tag()
.
Unless I'm misunderstanding, and what you're saying is we need:
def next_job():
self.mutex.acquire(blocking=True)
self.pqueue.mutex.acquire(blocking=True)
job = self.pqueue.get(block=True)
assert len(self.in_progress) < self.job_limit
self.in_progress.append(job)
self.pqueue.mutex.release()
self.mutex.release()
return job
like how it is in jobs_for_tag()
, to have the whole thing ensure that it has exclusive control of both pqueue
and in_progress
for the duration of the transaction