From 618c4100628b3a3c38c82d8a8362a1991337a0c0 Mon Sep 17 00:00:00 2001 From: Matthew Butterick Date: Tue, 5 Nov 2019 18:45:13 -0800 Subject: [PATCH] small refac --- pollen/private/ts.rktd | 2 +- pollen/render.rkt | 216 ++++++++++++++++++++--------------------- 2 files changed, 109 insertions(+), 109 deletions(-) diff --git a/pollen/private/ts.rktd b/pollen/private/ts.rktd index cddf5eb..9dd51b6 100644 --- a/pollen/private/ts.rktd +++ b/pollen/private/ts.rktd @@ -1 +1 @@ -1573004144 +1573008313 diff --git a/pollen/render.rkt b/pollen/render.rkt index a25e602..9dedf0d 100644 --- a/pollen/render.rkt +++ b/pollen/render.rkt @@ -48,6 +48,110 @@ (define (list-of-pathish? x) (and (list? x) (andmap pathish? x))) +(define (parallel-render paths-in job-count-arg) + (define source-paths + (let loop ([paths paths-in] [acc null]) + (match (and (pair? paths) (->complete-path (car paths))) + [#false (remove-duplicates acc)] + [(? pagetree-source? pt) (loop (append (pagetree->paths pt) (cdr paths)) acc)] + [(app ->source-path (and (not #false) (? file-exists?) sp)) (loop (cdr paths) (cons sp acc))] + [_ (loop (cdr paths) acc)]))) + + (define job-count + (min + (length source-paths) + (match job-count-arg + [#true (processor-count)] + [(? exact-positive-integer? count) count] + [_ (raise-argument-error 'render-batch "exact positive integer" job-count-arg)]))) + + ;; initialize the workers + (define worker-evts + (for/list ([wpidx (in-range job-count)]) + (define wp (place ch + (let loop () + (match-define (cons path poly-target) + (place-channel-put/get ch (list 'wants-job))) + (parameterize ([current-poly-target poly-target]) + (place-channel-put/get ch (list 'wants-lock (->output-path path))) + ;; trap any exceptions and pass them back as crashed jobs. + ;; otherwise, a crashed rendering place can't recover, and the parallel job will be stuck. + (with-handlers ([exn:fail? (λ (e) (place-channel-put ch (list 'crashed-job path #f)))]) + (match-define-values (_ _ ms _) + (time-apply render-from-source-or-output-path (list path))) + (place-channel-put ch (list 'finished-job path ms)))) + (loop)))) + (handle-evt wp (λ (val) (list* wpidx wp val))))) + + (define poly-target (current-poly-target)) + + ;; `locks` and `blocks` are (listof (cons/c evt? path?)) + (define starting-source-paths (sort source-paths stringstring)) + (let loop ([source-paths starting-source-paths] + [locks-in null] + [blocks-in null] + ;; `completed-jobs` is (listof (cons/c path? boolean?)) + [completed-jobs null] + [completed-job-count 0]) + ;; try to unblock blocked workers + (define-values (locks blocks) + (for/fold ([locks locks-in] + [blocks null]) + ([block (in-list blocks-in)]) + (match-define (cons wp path) block) + (cond + [(member path (dict-values locks)) + (values locks (cons block blocks))] + [else + (place-channel-put wp 'lock-approved) + (values (cons block locks) blocks)]))) + (cond + [(eq? completed-job-count (length starting-source-paths)) + ;; second bite at the apple for crashed jobs. + ;; 1) many crashes that arise in parallel rendering are + ;; a result of concurrency issues (e.g. shared files not being readable at the right moment). + ;; That is, they do not appear under serial rendering. + ;; 2) even if a crash is legit (that is, there is a real flaw in the source) + ;; and should be raised, we don't want to do it inside a parallel-rendering `place` + ;; because then the place will never return, and the whole parallel job will never finish. + ;; so we take the list of crashed jobs and try rendering them again serially. + ;; if it was a concurrency-related error, it will disappear. + ;; if it was a legit error, the render will stop and print a trace. + ;; crashed jobs are completed jobs that weren't finished + (for/list ([(path finished?) (in-dict completed-jobs)] + #:unless finished?) + path)] + [else + (match (apply sync worker-evts) + [(list wpidx wp 'wants-job) + (match source-paths + [(? null?) (loop null locks blocks completed-jobs completed-job-count)] + [(cons path rest) + (place-channel-put wp (cons path poly-target)) + (loop rest locks blocks completed-jobs completed-job-count)])] + [(list wpidx wp (and (or 'finished-job 'crashed-job) tag) path ms) + (match tag + ['finished-job + (message + (format "rendered parallel @ job ~a /~a ~a" + (add1 wpidx) + (find-relative-path (current-project-root) (->output-path path)) + (if (< ms 1000) (format "(~a ms)" ms) (format "(~a s)" (/ ms 1000.0)))))] + [_ + (message + (format "render crash @ job ~a /~a (retry pending)" + (add1 wpidx) + (find-relative-path (current-project-root) (->output-path path))))]) + (loop source-paths + (match (assoc wp locks) + [#false locks] + [lock (remove lock locks)]) + blocks + (cons (cons path (eq? tag 'finished-job)) completed-jobs) + (add1 completed-job-count))] + [(list wpidx wp 'wants-lock path) + (loop source-paths locks (append blocks (list (cons wp path))) completed-jobs completed-job-count)])]))) + (define+provide/contract (render-batch #:parallel [wants-parallel-render #false] . paths-in) ((#:parallel any/c) #:rest list-of-pathish? . ->* . void?) ;; Why not just (for-each render ...)? @@ -55,114 +159,10 @@ ;; And with render, they would be rendered repeatedly. ;; Using reset-modification-dates is sort of like session control. (reset-mod-date-hash!) - (cond - [wants-parallel-render - - (define source-paths - (let loop ([paths paths-in] [acc null]) - (match (and (pair? paths) (->complete-path (car paths))) - [#false (remove-duplicates acc)] - [(? pagetree-source? pt) (loop (append (pagetree->paths pt) (cdr paths)) acc)] - [(app ->source-path (and (not #false) (? file-exists?) sp)) (loop (cdr paths) (cons sp acc))] - [_ (loop (cdr paths) acc)]))) - - (define job-count - (min - (length source-paths) - (match wants-parallel-render - [#true (processor-count)] - [(? exact-positive-integer? count) count] - [_ (raise-argument-error 'render-batch "exact positive integer" wants-parallel-render)]))) - - ;; initialize the workers - (define worker-evts - (for/list ([wpidx (in-range job-count)]) - (define wp (place ch - (let loop () - (match-define (cons path poly-target) - (place-channel-put/get ch (list 'wants-job))) - (parameterize ([current-poly-target poly-target]) - (place-channel-put/get ch (list 'wants-lock (->output-path path))) - ;; trap any exceptions and pass them back as crashed jobs. - ;; otherwise, a crashed rendering place can't recover, and the parallel job will be stuck. - (with-handlers ([exn:fail? (λ (e) (place-channel-put ch (list 'crashed-job path #f)))]) - (match-define-values (_ _ ms _) - (time-apply render-from-source-or-output-path (list path))) - (place-channel-put ch (list 'finished-job path ms)))) - (loop)))) - (handle-evt wp (λ (val) (list* wpidx wp val))))) - - (define poly-target (current-poly-target)) - - ;; `locks` and `blocks` are (listof (cons/c evt? path?)) - (define starting-source-paths (sort source-paths stringstring)) - (define crashed-jobs - (let loop ([source-paths starting-source-paths] - [locks-in null] - [blocks-in null] - ;; `completed-jobs` is (listof (cons/c path? boolean?)) - [completed-jobs null] - [completed-job-count 0]) - ;; try to unblock blocked workers - (define-values (locks blocks) - (for/fold ([locks locks-in] - [blocks null]) - ([block (in-list blocks-in)]) - (match-define (cons wp path) block) - (cond - [(member path (dict-values locks)) - (values locks (cons block blocks))] - [else - (place-channel-put wp 'lock-approved) - (values (cons block locks) blocks)]))) - (cond - [(eq? completed-job-count (length starting-source-paths)) - ;; crashed jobs are completed jobs that weren't finished - (for/list ([(path finished?) (in-dict completed-jobs)] - #:unless finished?) - path)] - [else - (match (apply sync worker-evts) - [(list wpidx wp 'wants-job) - (match source-paths - [(? null?) (loop null locks blocks completed-jobs completed-job-count)] - [(cons path rest) - (place-channel-put wp (cons path poly-target)) - (loop rest locks blocks completed-jobs completed-job-count)])] - [(list wpidx wp (and (or 'finished-job 'crashed-job) tag) path ms) - (match tag - ['finished-job - (message - (format "rendered parallel @ job ~a /~a ~a" - (add1 wpidx) - (find-relative-path (current-project-root) (->output-path path)) - (if (< ms 1000) (format "(~a ms)" ms) (format "(~a s)" (/ ms 1000.0)))))] - [_ - (message - (format "render crash @ job ~a /~a (retry pending)" - (add1 wpidx) - (find-relative-path (current-project-root) (->output-path path))))]) - (loop source-paths - (match (assoc wp locks) - [#false locks] - [lock (remove lock locks)]) - blocks - (cons (cons path (eq? tag 'finished-job)) completed-jobs) - (add1 completed-job-count))] - [(list wpidx wp 'wants-lock path) - (loop source-paths locks (append blocks (list (cons wp path))) completed-jobs completed-job-count)])]))) - ;; second bite at the apple for crashed jobs. - ;; 1) many crashes that arise in parallel rendering are - ;; a result of concurrency issues (e.g. shared files not being readable at the right moment). - ;; That is, they do not appear under serial rendering. - ;; 2) even if a crash is legit (that is, there is a real flaw in the source) - ;; and should be raised, we don't want to do it inside a parallel-rendering `place` - ;; because then the place will never return, and the whole parallel job will never finish. - ;; so we take the list of crashed jobs and try rendering them again serially. - ;; if it was a concurrency-related error, it will disappear. - ;; if it was a legit error, the render will stop and print a trace. - (for-each render-from-source-or-output-path crashed-jobs)] - [else (for-each render-from-source-or-output-path paths-in)])) + (for-each render-from-source-or-output-path (if wants-parallel-render + ;; returns crashed jobs for serial rendering + (parallel-render paths-in wants-parallel-render) + paths-in))) (define (pagetree->paths pagetree-or-path) (define pagetree (if (pagetree? pagetree-or-path)