parallel-rendering fixes

dev-jobs-flag
Matthew Butterick 5 years ago
parent 88fe03e83b
commit 7b47cac273

@ -21,36 +21,43 @@
(define (preheat-cache starting-dir)
(unless (and (path-string? starting-dir) (directory-exists? starting-dir))
(raise-argument-error 'preheat-cache "directory" starting-dir))
(define worker-places (for/list ([i (in-range (processor-count))])
(place ch
(let loop ()
(define result (with-handlers ([exn:fail? (λ (e) #false)])
(path->hash (place-channel-get ch))))
(place-channel-put ch result)
(loop)))))
(define paths-that-should-be-cached
;; if a file is already in the cache, no need to hit it again.
;; this allows partially completed preheat jobs to resume.
(define uncached-paths
(for/list ([path (in-directory starting-dir)]
#:when (for/or ([proc (in-list (list preproc-source?
markup-source?
markdown-source?
pagetree-source?))])
(proc path)))
(proc path))
#:unless (path-cached? path))
path))
;; if a file is already in the cache, no need to hit it again.
;; this allows partially completed preheat jobs to resume.
(define uncached-paths (filter-not path-cached? paths-that-should-be-cached))
(define worker-evts
(for/list ([wpidx (in-range (processor-count))])
(define wp
(place ch
(let loop ()
(define path (place-channel-put/get ch (list 'want-job)))
(place-channel-put ch (list 'job-finished path
(with-handlers ([exn:fail? (λ (e) #f)])
(path->hash path))))
(loop))))
(handle-evt wp (λ (val) (list* wpidx wp val)))))
;; compile the paths in groups, so they can be incrementally saved.
;; that way, if there's a failure, the progress is preserved.
;; but the slowest file in a group will prevent further progress.
(for ([path-group (in-list (slice-at uncached-paths (length worker-places)))])
(for ([path (in-list path-group)]
[wp (in-list worker-places)])
(message (format "caching: ~a" (find-relative-path starting-dir path)))
(place-channel-put wp path))
(for ([path (in-list path-group)]
[wp (in-list worker-places)])
(match (place-channel-get wp)
[#false (message (format "compile failed: ~a" path))]
[result (cache-ref! (paths->key path) (λ () result))]))))
(let loop ([paths uncached-paths][actives null])
(unless (and (null? paths) (null? actives))
(match (apply sync worker-evts)
[(list wpidx wp 'want-job)
(match paths
[(? null?) (loop null actives)]
[(cons path rest)
(place-channel-put wp path)
(message (format "caching on core ~a: ~a" (add1 wpidx) (find-relative-path starting-dir path)))
(loop rest (cons wpidx actives))])]
[(list wpidx wp 'job-finished path result)
(if result
(cache-ref! (paths->key path) (λ () result))
(message (format "caching failed on core ~a: ~a" (add1 wpidx) (find-relative-path starting-dir path))))
(loop paths (remq wpidx actives))]))))

@ -1 +1 @@
1560481034
1561571116

@ -4,6 +4,7 @@
racket/match
racket/place
racket/list
racket/dict
sugar/test
sugar/define
sugar/file
@ -55,55 +56,79 @@
;; Using reset-modification-dates is sort of like session control.
(reset-mod-date-hash!)
(cond
;; disable parallel processing until concurrency problems are sorted
#;[parallel?
(define worker-places
(for/list ([i (in-range (processor-count))])
(place ch
(let loop ()
(match-define (list path poly-target) (place-channel-get ch))
(define render-result
(let render ([failures 0][exn-msg #f])
(cond
[(= 3 failures) exn-msg]
[else
(with-handlers ([exn:fail?
(λ (e)
(sleep 0.01)
(render (add1 failures) (exn-message e)))])
(parameterize ([current-poly-target poly-target])
(match-define-values (_ _ ms _)
(time-apply render-from-source-or-output-path (list path)))
ms))])))
(place-channel-put ch render-result)
(loop)))))
[parallel?
(define source-paths
(let ()
(define flattened-paths
(remove-duplicates
(sort
(let loop ([paths paths])
(if (null? paths)
null
(match (->complete-path (car paths))
[(? pagetree-source? pt) (append (loop (pagetree->paths pt)) (loop (cdr paths)))]
[path (cons path (loop (cdr paths)))]))))
(define source-paths (for*/list ([p (in-list flattened-paths)]
[path (cons path (loop (cdr paths)))])))
string<?
#:key path->string)))
(for*/list ([p (in-list flattened-paths)]
[maybe-source-path (in-value (->source-path p))]
#:when (and maybe-source-path (file-exists? maybe-source-path)))
maybe-source-path))
(for ([source-path-group (in-list (slice-at (shuffle source-paths) (length worker-places)))])
(for ([source-path (in-list source-path-group)]
[(wp wpidx) (in-indexed worker-places)])
(place-channel-put wp (list source-path (current-poly-target))))
(for ([source-path (in-list source-path-group)]
[(wp wpidx) (in-indexed worker-places)])
(match (place-channel-get wp)
[(? number? ms)
(message (format "rendered parallel on core ~a /~a ~a"
maybe-source-path)))
;; initialize the workers
(define worker-evts
(for/list ([wpidx (in-range (processor-count))])
(define wp (place ch
(let loop ()
(match-define (cons path poly-target)
(place-channel-put/get ch (list 'wants-job)))
(parameterize ([current-poly-target poly-target])
(place-channel-put/get ch (list 'wants-lock (->output-path path)))
(match-define-values (_ _ ms _)
(time-apply render-from-source-or-output-path (list path)))
(place-channel-put ch (list 'finished-job path ms)))
(loop))))
(handle-evt wp (λ (val) (list* wpidx wp val)))))
(define poly-target (current-poly-target))
;; `locks` and `blocks` are (listof (cons/c evt? path?))
(let loop ([source-paths source-paths][locks-in null][blocks-in null])
;; try to unblock blocked workers
(define-values (locks blocks)
(for/fold ([locks locks-in]
[blocks null])
([block (in-list blocks-in)])
(match-define (cons wp path) block)
(cond
[(member path (dict-values locks))
(values locks (cons block blocks))]
[else
(place-channel-put wp 'lock-approved)
(values (cons block locks) blocks)])))
;; no source paths means all jobs have been assigned
;; no locks means no jobs are in progress
;; therefore we must be done.
(unless (and (null? source-paths) (null? locks))
(match (apply sync worker-evts)
[(list wpidx wp 'wants-job)
(match source-paths
[(? null?) (loop null locks blocks)]
[(cons path rest)
(place-channel-put wp (cons path poly-target))
(loop rest locks blocks)])]
[(list wpidx wp 'finished-job path ms)
(message
(format "rendered parallel on core ~a /~a ~a"
(add1 wpidx)
(find-relative-path (current-project-root) (->output-path source-path))
(if (< ms 1000) (format "(~a ms)" ms) (format "(~a s)" (/ ms 1000.0)))))]
[exn-msg (raise (exn:fail exn-msg (current-continuation-marks)))])))]
(find-relative-path (current-project-root) (->output-path path))
(if (< ms 1000) (format "(~a ms)" ms) (format "(~a s)" (/ ms 1000.0)))))
(loop source-paths (match (assoc wp locks)
[#false locks]
[lock (remove lock locks)]) blocks)]
[(list wpidx wp 'wants-lock path)
(loop source-paths locks (append blocks (list (cons wp path))))])))]
[else (for-each render-from-source-or-output-path paths)]))
(define (pagetree->paths pagetree-or-path)
@ -273,21 +298,27 @@
(unless template-path
(raise-argument-error 'render-markup-or-markdown-source (format "valid template path~a" (if (has-inner-poly-ext? source-path) (format " for target ~a" (current-poly-target)) "")) template-path))
(render-from-source-or-output-path template-path) ; because template might have its own preprocessor source
;; use a temp file so that multiple (possibly parallel) renders
;; do not compete for write access to the same template
(define temp-template (make-temporary-file "pollentmp~a"
(or (->source-path template-path) template-path)))
(render-from-source-or-output-path temp-template) ; because template might have its own preprocessor source
(parameterize ([current-output-port (current-error-port)]
[current-namespace (make-base-namespace)])
(define outer-ns (namespace-anchor->namespace render-module-ns))
(namespace-attach-module outer-ns 'pollen/setup)
(begin0
(eval (with-syntax ([MODNAME (gensym)]
[SOURCE-PATH-STRING (->string source-path)]
[TEMPLATE-PATH-STRING (->string template-path)])
[TEMPLATE-PATH-STRING (->string temp-template)])
#'(begin
(module MODNAME pollen/private/render-helper
#:source SOURCE-PATH-STRING
#:template TEMPLATE-PATH-STRING
#:result-id result)
(require 'MODNAME)
result)))))
result)))
(delete-file temp-template))))
(define (templated-source? path)
(or (markup-source? path) (markdown-source? path)))

Loading…
Cancel
Save