better handling of job-count and completed-jobs

pull/218/head
Matthew Butterick 5 years ago
parent 20ab9371d8
commit 0345260119

@ -30,9 +30,9 @@
markup-source?
markdown-source?
pagetree-source?))])
(proc path))
(proc path))
#:unless (path-cached? path))
(path->complete-path path)))
(path->complete-path path)))
(cond
[(null? uncached-paths)
@ -40,22 +40,24 @@
[wants-parallel-setup
(define job-count
(match wants-parallel-setup
[#true (processor-count)]
[(? exact-positive-integer? count) count]
[_ (raise-argument-error 'preheat-cache "exact positive integer" wants-parallel-setup)]))
(min
(length uncached-paths)
(match wants-parallel-setup
[#true (processor-count)]
[(? exact-positive-integer? count) count]
[_ (raise-argument-error 'preheat-cache "exact positive integer" wants-parallel-setup)])))
(define worker-evts
(for/list ([wpidx (in-range job-count)])
(define wp
(place ch
(let loop ()
(define path (place-channel-put/get ch (list 'want-job)))
(place-channel-put ch (list 'job-finished path
(with-handlers ([exn:fail? (λ (e) #f)])
(path->hash path))))
(loop))))
(handle-evt wp (λ (val) (list* wpidx wp val)))))
(define wp
(place ch
(let loop ()
(define path (place-channel-put/get ch (list 'want-job)))
(place-channel-put ch (list 'job-finished path
(with-handlers ([exn:fail? (λ (e) #f)])
(path->hash path))))
(loop))))
(handle-evt wp (λ (val) (list* wpidx wp val)))))
(let loop ([paths uncached-paths][actives null])
(unless (and (null? paths) (null? actives))
@ -73,7 +75,7 @@
(message (format "caching failed on job ~a: ~a" (add1 wpidx) (find-relative-path starting-dir path))))
(loop paths (remq wpidx actives))])))]
[else (for ([path (in-list uncached-paths)])
(message (format "caching: ~a" (find-relative-path starting-dir path)))
(match (with-handlers ([exn:fail? (λ (e) #f)]) (path->hash path))
[#false (message (format "caching failed: ~a" (find-relative-path starting-dir path)))]
[result (cache-ref! (paths->key path) (λ () result))]))]))
(message (format "caching: ~a" (find-relative-path starting-dir path)))
(match (with-handlers ([exn:fail? (λ (e) #f)]) (path->hash path))
[#false (message (format "caching failed: ~a" (find-relative-path starting-dir path)))]
[result (cache-ref! (paths->key path) (λ () result))]))]))

@ -1 +1 @@
1572998291
1573004144

@ -67,10 +67,12 @@
[_ (loop (cdr paths) acc)])))
(define job-count
(match wants-parallel-render
[#true (processor-count)]
[(? exact-positive-integer? count) count]
[_ (raise-argument-error 'render-batch "exact positive integer" wants-parallel-render)]))
(min
(length source-paths)
(match wants-parallel-render
[#true (processor-count)]
[(? exact-positive-integer? count) count]
[_ (raise-argument-error 'render-batch "exact positive integer" wants-parallel-render)])))
;; initialize the workers
(define worker-evts
@ -93,11 +95,14 @@
(define poly-target (current-poly-target))
;; `locks` and `blocks` are (listof (cons/c evt? path?))
(define starting-source-paths (sort source-paths string<? #:key path->string))
(define crashed-jobs
(let loop ([source-paths (sort source-paths string<? #:key path->string)]
(let loop ([source-paths starting-source-paths]
[locks-in null]
[blocks-in null]
[crashed-jobs null])
;; `completed-jobs` is (listof (cons/c path? boolean?))
[completed-jobs null]
[completed-job-count 0])
;; try to unblock blocked workers
(define-values (locks blocks)
(for/fold ([locks locks-in]
@ -109,20 +114,21 @@
(values locks (cons block blocks))]
[else
(place-channel-put wp 'lock-approved)
(values (cons block locks) blocks)])))
;; no source paths means all jobs have been assigned
;; no locks means no jobs are in progress
;; therefore we must be done (other than crashed jobs)
(values (cons block locks) blocks)])))
(cond
[(and (null? source-paths) (null? locks)) crashed-jobs]
[(eq? completed-job-count (length starting-source-paths))
;; crashed jobs are completed jobs that weren't finished
(for/list ([(path finished?) (in-dict completed-jobs)]
#:unless finished?)
path)]
[else
(match (apply sync worker-evts)
[(list wpidx wp 'wants-job)
(match source-paths
[(? null?) (loop null locks blocks crashed-jobs)]
[(? null?) (loop null locks blocks completed-jobs completed-job-count)]
[(cons path rest)
(place-channel-put wp (cons path poly-target))
(loop rest locks blocks crashed-jobs)])]
(loop rest locks blocks completed-jobs completed-job-count)])]
[(list wpidx wp (and (or 'finished-job 'crashed-job) tag) path ms)
(match tag
['finished-job
@ -141,11 +147,10 @@
[#false locks]
[lock (remove lock locks)])
blocks
(match tag
['crashed-job (cons path crashed-jobs)]
[_ crashed-jobs]))]
(cons (cons path (eq? tag 'finished-job)) completed-jobs)
(add1 completed-job-count))]
[(list wpidx wp 'wants-lock path)
(loop source-paths locks (append blocks (list (cons wp path))) crashed-jobs)])])))
(loop source-paths locks (append blocks (list (cons wp path))) completed-jobs completed-job-count)])])))
;; second bite at the apple for crashed jobs.
;; 1) many crashes that arise in parallel rendering are
;; a result of concurrency issues (e.g. shared files not being readable at the right moment).

Loading…
Cancel
Save