This file is indexed.

/usr/share/racket/collects/setup/parallel-do.rkt is in racket-common 6.7-3.

This file is owned by root:root, with mode 0o644.

The actual contents of the file can be viewed below.

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
#lang racket/base

(require racket/file
         racket/future
         racket/place
         racket/port
         racket/fasl
         racket/match
         racket/path
         racket/class
         racket/stxparam
         setup/dirs
         (for-syntax syntax/parse
                     racket/base))

(provide parallel-do
         current-executable-path
         current-collects-path
         match-message-loop
         send/success
         send/error
         send/report
         send/msg
         send/log
         recv/req
         worker/die
         work-queue<%>
         define/class/generics
         list-queue)

(define-syntax-rule (mk-generic func clss method args ...)
  (begin
    (define g (generic clss method))
    (define (func obj args ...)
      (send-generic obj g args ...))))

(define-syntax-rule (define/class/generics class (func method args ...) ...)
  (begin
    (mk-generic func class method args ...) ...))

(define-syntax-rule (define/class/generics/provide class (func method args ...) ...)
  (begin
    (begin
      (mk-generic func class method args ...)
      (provide func)) ...))

(define-syntax-rule (DEBUG_COMM a ...)
  (void)
;  (begin a ...)
)

(define worker<%> 
  (interface ()
    spawn
    send/msg
    kill
    break
    wait
    recv/msg
    read-all
    get-id
    get-out))

(define worker% 
  (class* object% (worker<%>)
    (field [id 0]
           [process-handle null]
           [out null]
           [in null]
           [err null]
           [module-path null]
           [funcname null])

    (define/public (spawn _id _module-path _funcname [initialmsg #f])
      (set! module-path _module-path)
      (set! funcname _funcname)
      (define worker-cmdline-list (list (current-executable-path)
                                        "-X" (path->string (current-collects-path))
                                        "-G" (path->string (find-config-dir))
                                        "-e" "(eval(read))"))
      (define dynamic-require-cmd `((dynamic-require (string->path ,module-path) (quote ,funcname)) #f))
      (let-values ([(_process-handle _out _in _err) (apply subprocess #f #f (current-error-port) worker-cmdline-list)])
        (set! id _id)
        (set! process-handle _process-handle)
        (set! out _out)
        (set! in _in)
        (set! err _err)
        (send/msg dynamic-require-cmd)
        (when initialmsg (send/msg (initialmsg id)))))
    (define/public (send/msg msg) 
      (with-handlers ([exn:fail?
                       (lambda (x)
                         (eprintf "While sending message to parallel-do worker: ~a ~a\n"
                                  id (exn-message x))
                         (exit 1))])
        (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" id msg))
        (write (convert-paths msg) in)
        (flush-output in)))
    (define/public (recv/msg)
      (with-handlers ([exn:fail?
                       (lambda (x)
                         (eprintf (string-append
                                   "While receiving message from parallel-do worker ~a ~a\n"
                                   "  input continues: ~s\n")
                                  id (exn-message x)
                                  (let ([bstr (make-bytes 32)])
                                    (define n (peek-bytes-avail!* bstr 0 #f out))
                                    (if (number? n)
                                        (subbytes bstr 0 n)
                                        n)))
                         (exit 1))])
        (define r (deconvert-paths (read out)))
        (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" id r))
        r))
    (define/public (read-all) (port->string out))
    (define/public (get-id) id)
    (define/public (get-out) out)
    (define/public (kill)
      (DEBUG_COMM (eprintf "KILLING WORKER ~a\n" id))
      (close-output-port in)
      (close-input-port out)
      (subprocess-kill process-handle #t))
    (define/public (break) (kill))
    (define/public (kill/respawn worker-cmdline-list [initialmsg #f])
      (kill)
      (spawn id module-path funcname [initialmsg #f]))
    (define/public (wait) (subprocess-wait process-handle))
    (super-new)))

(define place-worker% 
  (class* object% (worker<%>)
    (init-field [id 0]              
                [pl null])
               
    (define/public (spawn _id module-path funcname [initialmsg #f])
      (set! id _id)
      (set! pl (dynamic-place (string->path module-path) funcname))
      (when initialmsg (send/msg (initialmsg id))))
    (define/public (send/msg msg)
      (DEBUG_COMM (eprintf "CSENDING ~v ~v\n" pl msg))
      (place-channel-put pl msg))
    (define/public (recv/msg)
      (define r (place-channel-get pl))
      (DEBUG_COMM (eprintf "CRECEIVNG ~v ~v\n" pl r))
      r)
    (define/public (read-all) "")
    (define/public (get-id) id) 
    (define/public (get-out) pl)
    (define/public (kill) #f)
    (define/public (break) (place-break pl))
    (define/public (wait) (place-wait pl))
    (super-new))) 

(define work-queue<%> 
  (interface ()
    get-job
    work-done
    has-jobs?
    jobs-cnt
    get-results))

(define/class/generics/provide worker<%>
  (wrkr/spawn spawn id worker-cmdline-list initialcode initialmsg)
  (wrkr/send  send/msg msg)
  (wrkr/kill  kill)
  (wrkr/break break)
  (wrkr/recv  recv/msg)
  (wrkr/read-all  read-all)
  (wrkr/id    get-id)
  (wrkr/out   get-out)
  (wrkr/wait  wait))


(define/class/generics/provide work-queue<%>
  (queue/get        get-job wrkrid)
  (queue/work-done  work-done node wrkr msg)
  (queue/has        has-jobs?)
  (queue/count      jobs-cnt)
  (queue/results    get-results))

(define (current-executable-path) 
 (parameterize ([current-directory (find-system-path 'orig-dir)])
  (find-executable-path (find-system-path 'exec-file) #f)))

(define (current-collects-path)
 (let ([p (find-system-path 'collects-dir)])
  (if (complete-path? p)
      p
      (path->complete-path p (or (path-only (current-executable-path))
                                 (find-system-path 'orig-dir))))))

(define (parallel-do-event-loop module-path funcname initialmsg work-queue nprocs [stopat #f])
  (define use-places? (place-enabled?)) ; set to #f to use processes instead of places
  
  (define (spawn id)
    ;; spawns a new worker
    (define wrkr (if use-places? (new place-worker%) (new worker%)))
    (wrkr/spawn wrkr id module-path funcname initialmsg)
    wrkr)

  (define workers null)
  (define (spawn! id)
    ;; spawn a worker and add it to the list;
    ;; disable breaks because we want to make sure
    ;; that a new worker is added to the list of workers
    ;; before a break exception is raised:
    (parameterize-break 
     #f
     (let ([w (spawn id)])
       (set! workers (cons w workers))
       w)))
  (define (unspawn! wkr)
    (wrkr/kill wkr)
    (set! workers (remq wkr workers)))

  (define (jobs?) (queue/has work-queue))
  (define (empty?) (not (queue/has work-queue)))

  ;; If any exception (including a break exception) happens before
  ;; the work loop ends, then send a break to interrupt each worker;
  ;; the `normal-finish?' flag is set to #t when the working loop ends
  ;; normally.
  (define normal-finish? #f)

  (define log-exn (lambda (exn [msg #f])
                    (log-error (let ([s (if (exn? exn)
                                            (let ([p (open-output-string)])
                                              (parameterize ([current-error-port p])
                                                ((error-display-handler) (exn-message exn) exn))
                                              (get-output-string p))
                                            (format "exception: ~v" exn))])
                                 (if msg
                                     (format "~a; ~a" msg s)
                                     s)))))

  (dynamic-wind
    (lambda () (void))
    (lambda ()
      (define (check-error-threshold x) 
        (when (x . >= . 4)
          (error 'parallel-do "error count reached ~a, exiting" x)))
      (for/list ([i (in-range nprocs)]) 
        (spawn! i))
      (let loop ([idle workers]
                 [inflight null]
                 [count 0]
                 [error-count 0])
        (check-error-threshold error-count)
        (cond 
          ;; Reached stopat count STOP
          [(and stopat (= count stopat)) ; ???
           (log-error "done at limit")]
          ;; Queue empty and all workers idle, we are all done
          [(and (empty?) (null? inflight)) 
           ;; done
           (void)]
          ;; Send work to idle worker
          [(and (jobs?) (pair? idle))
           (match-define (cons wrkr idle-rest) idle)
           (define-values (job cmd-list) (queue/get work-queue (wrkr/id wrkr)))
           (let retry-loop ([wrkr wrkr]
                            [error-count error-count]) 
             (check-error-threshold error-count)
             (with-handlers* ([exn:fail? (lambda (e) 
                                           (log-exn e (format "error writing to worker: ~v" 
                                                              (wrkr/id wrkr)))
                                           (unspawn! wrkr)
                                           (retry-loop (spawn! (wrkr/id wrkr)) (add1 error-count)))])
               (wrkr/send wrkr cmd-list))
             (loop idle-rest (cons (list job wrkr) inflight) count error-count))]
          [else
           (define (kill/remove-dead-worker node-worker wrkr)
             (DEBUG_COMM (printf "KILLING ~v\n" (wrkr/id wrkr)))
             (unspawn! wrkr)
             (loop (cons (spawn! (wrkr/id wrkr)) idle)
                   (remove node-worker inflight)
                   count
                   (add1 error-count))) 
           (define (gen-node-handler node-worker)
             (match node-worker
               [(list node wrkr)
                (handle-evt
                 (wrkr/out wrkr) 
                 (λ (e)
                    (let ([msg
                           (with-handlers* ([exn:fail? (lambda (e) 
                                                         (log-exn e (format "error reading from worker: ~v"
                                                                            (wrkr/id wrkr)))
                                                         (kill/remove-dead-worker node-worker wrkr))])
                             (if use-places? e (wrkr/recv wrkr)))])
                      (if (pair? msg)
                          (if (queue/work-done work-queue node wrkr msg)
                              (loop (cons wrkr idle) (remove node-worker inflight) (add1 count) error-count)
                              (loop idle inflight count error-count))
                          (begin
                            (queue/work-done work-queue node wrkr (string-append msg (wrkr/read-all wrkr)))
                            (kill/remove-dead-worker node-worker wrkr))))))]
               [else 
                (log-error (format "parallel-do-event-loop match node-worker failed trying to match: ~e" 
                                   node-worker))]))
           (DEBUG_COMM (printf "WAITING ON WORKERS TO RESPOND\n"))
           (apply sync (map gen-node-handler inflight))]))
      ;; Ask workers to stop:
      (for ([p workers]) 
        (wrkr/send p (list 'DIE)))
      ;; Finish normally:
      (set! normal-finish? #t))
    (lambda () 
      (define (break-all)
        (for ([p workers]) 
          (with-handlers ([exn? log-exn])
            (wrkr/break p))))
      (unless normal-finish?
        ;; There was an exception, so tell workers to stop:
        (break-all))
      ;; Wait for workers to complete; pass any break request on
      ;; to the worker places, in case they ignored an earlier
      ;; break for some reason:
      (let loop ()
        (with-handlers* ([exn:break? (lambda (exn)
                                       (break-all)
                                       (loop))])
          (parameterize-break
           #t
           (for ([p workers]) 
             (with-handlers ([exn:fail? log-exn])
               (wrkr/wait p)))))))))
  
(define list-queue% 
  (class* object% (work-queue<%>)
    (init-field queue create-job-thunk success-thunk failure-thunk [report-proc display])
    (field [results null])

    (define/public (work-done work workerid msg)
      (match msg
        [(list (list 'REPORT msg) stdout stderr)
         (report-proc msg)
         #f]
        [(list (list 'DONE result) stdout stderr)
         (set! results (cons (success-thunk work result stdout stderr) results))
         #t]
        [(list (list 'ERROR errmsg) stdout stderr)
         (failure-thunk work errmsg stdout stderr)
         #t]))
    (define/public (get-job workerid)
      (match queue
        [(cons h t)
          (set! queue t)
          (values h (create-job-thunk h workerid))]))
    (define/public (has-jobs?) (not (null? queue)))
    (define/public (get-results) (reverse results))
    (define/public (jobs-cnt) (length queue))
    (super-new)))

(define (list-queue list-of-work create-job-thunk job-success-thunk job-failure-thunk)
  (make-object list-queue% list-of-work create-job-thunk job-success-thunk job-failure-thunk))

(define-syntax-rule (define-parallel-keyword-error d x)
  (d x (lambda (stx) (raise-syntax-error 'x "only allowed inside parallel worker definition" stx))))
(define-syntax-rule (define-syntax-parameter-error x) (define-parallel-keyword-error define-syntax-parameter x))

(define-parallel-keyword-error define match-message-loop)
(define-syntax-parameter-error send/msg)
(define-syntax-parameter-error send/success)
(define-syntax-parameter-error send/error)
(define-syntax-parameter-error send/report)
(define-syntax-parameter-error send/log)
(define-syntax-parameter-error recv/req)
(define-syntax-parameter-error worker/die)

(define-for-syntax (gen-worker-body globals-list globals-body work-body channel)
  (with-syntax ([globals-list globals-list]
                [(globals-body ...) globals-body]
                [([work work-body ...] ...) work-body]
                [ch channel])
    #'(do-worker
       ch
       (lambda (msg per-loop-body)
         ;; single starting message:
         (match msg
           [globals-list
            globals-body ...
            ;; bind per-worker-set procedures:
            (per-loop-body
             (lambda (send/msgp recv/reqp die-k)
               (syntax-parameterize ([send/msg (make-rename-transformer #'send/msgp)]
                                     [recv/req (make-rename-transformer #'recv/reqp)]
                                     [worker/die (make-rename-transformer #'die-k)])
                 ;; message handler:
                 (lambda (msg send/successp send/errorp send/reportp send/logp)
                   (syntax-parameterize ([send/success (make-rename-transformer #'send/successp)]
                                         [send/error (make-rename-transformer #'send/errorp)]
                                         [send/report (make-rename-transformer #'send/reportp)]
                                         [send/log (make-rename-transformer #'send/logp)])
                     (match msg
                       [work work-body ...]
                       ...))))))])))))

(define (do-worker ch setup-proc)
  (define orig-err (current-error-port))
  (define orig-out (current-output-port))
  (define orig-in  (current-input-port))
  (define (raw-send msg)
    (cond 
     [ch (place-channel-put ch msg)]
     [else (write (convert-paths msg) orig-out)
           (flush-output orig-out)]))
  (define (raw-recv)
    (cond 
     [ch (place-channel-get ch)]
     [else (deconvert-paths (read orig-in))]))
  (define (pdo-send msg)
    (with-handlers ([exn:fail?
                     (lambda (x)
                       (log-error (format "WORKER SEND MESSAGE ERROR: ~a" (exn-message x)))
                       (exit 1))])
      (DEBUG_COMM (fprintf orig-err "WSENDING ~v\n" msg))
      (raw-send msg)))
  (define (pdo-recv)
    (with-handlers ([exn:fail?
                     (lambda (x)
                       (log-error (format "WORKER RECEIVE MESSAGE ERROR: ~a" (exn-message x)))
                       (exit 1))])
      (define r (raw-recv))
      (DEBUG_COMM (fprintf orig-err "WRECVEIVED ~v\n" r))
      r))
  
  (setup-proc (pdo-recv)
              (lambda (set-proc)
                (let/ec die-k
                  (define (recv/reqp) (pdo-recv))
                  (define (send/msgp msg)
                    (pdo-send msg))
                  (let ([msg-proc (set-proc send/msgp recv/reqp die-k)])
                    (let loop ([i 0])
                      (DEBUG_COMM (fprintf orig-err "WAITING ON CONTROLLER TO RESPOND  ~v ~v\n" orig-in i))
                      (let ([out-str-port (open-output-string)]
                            [err-str-port (open-output-string)])
                        (define (send/resp type)
                          (pdo-send (list type (get-output-string out-str-port) (get-output-string err-str-port))))
                        (define (send/successp result)
                          (send/resp (list 'DONE result)))
                        (define (send/errorp message)
                          (send/resp (list 'ERROR message)))
                        (define (send/reportp message)
                          (send/resp (list 'REPORT message)))
                        (define (send/logp level message data)
                          (send/resp (list 'LOG level message data)))
                        ((with-handlers* ([exn:fail? (lambda (x) 
                                                       (define sp (open-output-string))
                                                       (parameterize ([current-error-port sp])
                                                         ((error-display-handler) (exn-message x) x))
                                                       (send/errorp (get-output-string sp))
                                                       (lambda () (loop (add1 i))))])
                           (parameterize ([current-output-port out-str-port]
                                          [current-error-port err-str-port])
                             (let ([msg (pdo-recv)])
                               (match msg
                                 [(list 'DIE) void]
                                 [_ (msg-proc msg send/successp send/errorp send/reportp send/logp)
                                    (lambda () (loop (add1 i)))]))))))))))))

(define-syntax (lambda-worker stx)
  (syntax-parse stx #:literals (match-message-loop)
    [(_ (globals-list:id ...)
      globals-body:expr ...
      (match-message-loop
        [work:expr work-body:expr ...] ...))

      (with-syntax ([body (gen-worker-body #'(list globals-list ...) #'(globals-body ...) #'([work work-body ...] ...) #'ch)])
        #'(lambda (ch) body))]))

(define-syntax (parallel-do stx)
  (syntax-case stx (define-worker)
    [(_ worker-count initalmsg work-queue (define-worker (name args ...) body ...))
     (begin
       (with-syntax ([interal-def-name (syntax-local-lift-expression #'(lambda-worker (args ...) body ...))])
         (syntax-local-lift-provide #'(rename interal-def-name name)))
       #'(let ([wq work-queue])
           (define module-path (path->string (resolved-module-path-name (variable-reference->resolved-module-path (#%variable-reference)))))
           (parallel-do-event-loop module-path 'name initalmsg wq worker-count)
           (queue/results wq)))]))


(struct path-wrapper (bstr) #:prefab)

(define (convert-paths msg)
  (cond
   [(path? msg) (path-wrapper (path->bytes msg))]
   [(pair? msg) (cons (convert-paths (car msg))
                      (convert-paths (cdr msg)))]
   [else msg]))

(define (deconvert-paths msg)
  (cond
   [(path-wrapper? msg) (bytes->path (path-wrapper-bstr msg))]
   [(pair? msg) (cons (deconvert-paths (car msg))
                      (deconvert-paths (cdr msg)))]
   [else msg]))