small refac

pull/218/head
Matthew Butterick 5 years ago
parent 0345260119
commit 618c410062

@ -1 +1 @@
1573004144 1573008313

@ -48,6 +48,110 @@
(define (list-of-pathish? x) (and (list? x) (andmap pathish? x))) (define (list-of-pathish? x) (and (list? x) (andmap pathish? x)))
(define (parallel-render paths-in job-count-arg)
(define source-paths
(let loop ([paths paths-in] [acc null])
(match (and (pair? paths) (->complete-path (car paths)))
[#false (remove-duplicates acc)]
[(? pagetree-source? pt) (loop (append (pagetree->paths pt) (cdr paths)) acc)]
[(app ->source-path (and (not #false) (? file-exists?) sp)) (loop (cdr paths) (cons sp acc))]
[_ (loop (cdr paths) acc)])))
(define job-count
(min
(length source-paths)
(match job-count-arg
[#true (processor-count)]
[(? exact-positive-integer? count) count]
[_ (raise-argument-error 'render-batch "exact positive integer" job-count-arg)])))
;; initialize the workers
(define worker-evts
(for/list ([wpidx (in-range job-count)])
(define wp (place ch
(let loop ()
(match-define (cons path poly-target)
(place-channel-put/get ch (list 'wants-job)))
(parameterize ([current-poly-target poly-target])
(place-channel-put/get ch (list 'wants-lock (->output-path path)))
;; trap any exceptions and pass them back as crashed jobs.
;; otherwise, a crashed rendering place can't recover, and the parallel job will be stuck.
(with-handlers ([exn:fail? (λ (e) (place-channel-put ch (list 'crashed-job path #f)))])
(match-define-values (_ _ ms _)
(time-apply render-from-source-or-output-path (list path)))
(place-channel-put ch (list 'finished-job path ms))))
(loop))))
(handle-evt wp (λ (val) (list* wpidx wp val)))))
(define poly-target (current-poly-target))
;; `locks` and `blocks` are (listof (cons/c evt? path?))
(define starting-source-paths (sort source-paths string<? #:key path->string))
(let loop ([source-paths starting-source-paths]
[locks-in null]
[blocks-in null]
;; `completed-jobs` is (listof (cons/c path? boolean?))
[completed-jobs null]
[completed-job-count 0])
;; try to unblock blocked workers
(define-values (locks blocks)
(for/fold ([locks locks-in]
[blocks null])
([block (in-list blocks-in)])
(match-define (cons wp path) block)
(cond
[(member path (dict-values locks))
(values locks (cons block blocks))]
[else
(place-channel-put wp 'lock-approved)
(values (cons block locks) blocks)])))
(cond
[(eq? completed-job-count (length starting-source-paths))
;; second bite at the apple for crashed jobs.
;; 1) many crashes that arise in parallel rendering are
;; a result of concurrency issues (e.g. shared files not being readable at the right moment).
;; That is, they do not appear under serial rendering.
;; 2) even if a crash is legit (that is, there is a real flaw in the source)
;; and should be raised, we don't want to do it inside a parallel-rendering `place`
;; because then the place will never return, and the whole parallel job will never finish.
;; so we take the list of crashed jobs and try rendering them again serially.
;; if it was a concurrency-related error, it will disappear.
;; if it was a legit error, the render will stop and print a trace.
;; crashed jobs are completed jobs that weren't finished
(for/list ([(path finished?) (in-dict completed-jobs)]
#:unless finished?)
path)]
[else
(match (apply sync worker-evts)
[(list wpidx wp 'wants-job)
(match source-paths
[(? null?) (loop null locks blocks completed-jobs completed-job-count)]
[(cons path rest)
(place-channel-put wp (cons path poly-target))
(loop rest locks blocks completed-jobs completed-job-count)])]
[(list wpidx wp (and (or 'finished-job 'crashed-job) tag) path ms)
(match tag
['finished-job
(message
(format "rendered parallel @ job ~a /~a ~a"
(add1 wpidx)
(find-relative-path (current-project-root) (->output-path path))
(if (< ms 1000) (format "(~a ms)" ms) (format "(~a s)" (/ ms 1000.0)))))]
[_
(message
(format "render crash @ job ~a /~a (retry pending)"
(add1 wpidx)
(find-relative-path (current-project-root) (->output-path path))))])
(loop source-paths
(match (assoc wp locks)
[#false locks]
[lock (remove lock locks)])
blocks
(cons (cons path (eq? tag 'finished-job)) completed-jobs)
(add1 completed-job-count))]
[(list wpidx wp 'wants-lock path)
(loop source-paths locks (append blocks (list (cons wp path))) completed-jobs completed-job-count)])])))
(define+provide/contract (render-batch #:parallel [wants-parallel-render #false] . paths-in) (define+provide/contract (render-batch #:parallel [wants-parallel-render #false] . paths-in)
((#:parallel any/c) #:rest list-of-pathish? . ->* . void?) ((#:parallel any/c) #:rest list-of-pathish? . ->* . void?)
;; Why not just (for-each render ...)? ;; Why not just (for-each render ...)?
@ -55,114 +159,10 @@
;; And with render, they would be rendered repeatedly. ;; And with render, they would be rendered repeatedly.
;; Using reset-modification-dates is sort of like session control. ;; Using reset-modification-dates is sort of like session control.
(reset-mod-date-hash!) (reset-mod-date-hash!)
(cond (for-each render-from-source-or-output-path (if wants-parallel-render
[wants-parallel-render ;; returns crashed jobs for serial rendering
(parallel-render paths-in wants-parallel-render)
(define source-paths paths-in)))
(let loop ([paths paths-in] [acc null])
(match (and (pair? paths) (->complete-path (car paths)))
[#false (remove-duplicates acc)]
[(? pagetree-source? pt) (loop (append (pagetree->paths pt) (cdr paths)) acc)]
[(app ->source-path (and (not #false) (? file-exists?) sp)) (loop (cdr paths) (cons sp acc))]
[_ (loop (cdr paths) acc)])))
(define job-count
(min
(length source-paths)
(match wants-parallel-render
[#true (processor-count)]
[(? exact-positive-integer? count) count]
[_ (raise-argument-error 'render-batch "exact positive integer" wants-parallel-render)])))
;; initialize the workers
(define worker-evts
(for/list ([wpidx (in-range job-count)])
(define wp (place ch
(let loop ()
(match-define (cons path poly-target)
(place-channel-put/get ch (list 'wants-job)))
(parameterize ([current-poly-target poly-target])
(place-channel-put/get ch (list 'wants-lock (->output-path path)))
;; trap any exceptions and pass them back as crashed jobs.
;; otherwise, a crashed rendering place can't recover, and the parallel job will be stuck.
(with-handlers ([exn:fail? (λ (e) (place-channel-put ch (list 'crashed-job path #f)))])
(match-define-values (_ _ ms _)
(time-apply render-from-source-or-output-path (list path)))
(place-channel-put ch (list 'finished-job path ms))))
(loop))))
(handle-evt wp (λ (val) (list* wpidx wp val)))))
(define poly-target (current-poly-target))
;; `locks` and `blocks` are (listof (cons/c evt? path?))
(define starting-source-paths (sort source-paths string<? #:key path->string))
(define crashed-jobs
(let loop ([source-paths starting-source-paths]
[locks-in null]
[blocks-in null]
;; `completed-jobs` is (listof (cons/c path? boolean?))
[completed-jobs null]
[completed-job-count 0])
;; try to unblock blocked workers
(define-values (locks blocks)
(for/fold ([locks locks-in]
[blocks null])
([block (in-list blocks-in)])
(match-define (cons wp path) block)
(cond
[(member path (dict-values locks))
(values locks (cons block blocks))]
[else
(place-channel-put wp 'lock-approved)
(values (cons block locks) blocks)])))
(cond
[(eq? completed-job-count (length starting-source-paths))
;; crashed jobs are completed jobs that weren't finished
(for/list ([(path finished?) (in-dict completed-jobs)]
#:unless finished?)
path)]
[else
(match (apply sync worker-evts)
[(list wpidx wp 'wants-job)
(match source-paths
[(? null?) (loop null locks blocks completed-jobs completed-job-count)]
[(cons path rest)
(place-channel-put wp (cons path poly-target))
(loop rest locks blocks completed-jobs completed-job-count)])]
[(list wpidx wp (and (or 'finished-job 'crashed-job) tag) path ms)
(match tag
['finished-job
(message
(format "rendered parallel @ job ~a /~a ~a"
(add1 wpidx)
(find-relative-path (current-project-root) (->output-path path))
(if (< ms 1000) (format "(~a ms)" ms) (format "(~a s)" (/ ms 1000.0)))))]
[_
(message
(format "render crash @ job ~a /~a (retry pending)"
(add1 wpidx)
(find-relative-path (current-project-root) (->output-path path))))])
(loop source-paths
(match (assoc wp locks)
[#false locks]
[lock (remove lock locks)])
blocks
(cons (cons path (eq? tag 'finished-job)) completed-jobs)
(add1 completed-job-count))]
[(list wpidx wp 'wants-lock path)
(loop source-paths locks (append blocks (list (cons wp path))) completed-jobs completed-job-count)])])))
;; second bite at the apple for crashed jobs.
;; 1) many crashes that arise in parallel rendering are
;; a result of concurrency issues (e.g. shared files not being readable at the right moment).
;; That is, they do not appear under serial rendering.
;; 2) even if a crash is legit (that is, there is a real flaw in the source)
;; and should be raised, we don't want to do it inside a parallel-rendering `place`
;; because then the place will never return, and the whole parallel job will never finish.
;; so we take the list of crashed jobs and try rendering them again serially.
;; if it was a concurrency-related error, it will disappear.
;; if it was a legit error, the render will stop and print a trace.
(for-each render-from-source-or-output-path crashed-jobs)]
[else (for-each render-from-source-or-output-path paths-in)]))
(define (pagetree->paths pagetree-or-path) (define (pagetree->paths pagetree-or-path)
(define pagetree (if (pagetree? pagetree-or-path) (define pagetree (if (pagetree? pagetree-or-path)

Loading…
Cancel
Save