1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130
(************************************************************************) (* * The Coq Proof Assistant / The Coq Development Team *) (* v * INRIA, CNRS and contributors - Copyright 1999-2019 *) (* <O___,, * (see CREDITS file for the list of authors) *) (* \VV/ **************************************************************) (* // * This file is distributed under the terms of the *) (* * GNU Lesser General Public License Version 2.1 *) (* * (see LICENSE file for the text of the license) *) (************************************************************************) type worker_id = string type 'a cpanel = { exit : unit -> unit; (* called by manager to exit instead of Thread.exit *) cancelled : unit -> bool; (* manager checks for a request of termination *) extra : 'a; (* extra stuff to pass to the manager *) } module type PoolModel = sig (* this shall come from a Spawn.* model *) type process val spawn : int -> CoqworkmgrApi.priority -> worker_id * process * CThread.thread_ic * out_channel (* this defines the main loop of the manager *) type extra val manager : extra cpanel -> worker_id * process * CThread.thread_ic * out_channel -> unit end module Make(Model : PoolModel) = struct type worker = { name : worker_id; cancel : bool ref; manager : Thread.t; process : Model.process; } type pre_pool = { workers : worker list ref; count : int ref; extra_arg : Model.extra; } type pool = { lock : Mutex.t; pool : pre_pool } let magic_no = 17 let master_handshake worker_id ic oc = try Marshal.to_channel oc magic_no []; flush oc; let n = (CThread.thread_friendly_input_value ic : int) in if n <> magic_no then begin Printf.eprintf "Handshake with %s failed: protocol mismatch\n" worker_id; exit 1; end with e when CErrors.noncritical e -> Printf.eprintf "Handshake with %s failed: %s\n" worker_id (Printexc.to_string e); exit 1 let worker_handshake slave_ic slave_oc = try let v = (CThread.thread_friendly_input_value slave_ic : int) in if v <> magic_no then begin prerr_endline "Handshake failed: protocol mismatch\n"; exit 1; end; Marshal.to_channel slave_oc v []; flush slave_oc; with e when CErrors.noncritical e -> prerr_endline ("Handshake failed: " ^ Printexc.to_string e); exit 1 let locking { lock; pool = p } f = try Mutex.lock lock; let x = f p in Mutex.unlock lock; x with e -> Mutex.unlock lock; raise e let rec create_worker extra pool priority id = let cancel = ref false in let name, process, ic, oc as worker = Model.spawn id priority in master_handshake name ic oc; let exit () = cancel := true; cleanup pool priority; Thread.exit () in let cancelled () = !cancel in let cpanel = { exit; cancelled; extra } in let manager = CThread.create (Model.manager cpanel) worker in { name; cancel; manager; process } and cleanup x priority = locking x begin fun { workers; count; extra_arg } -> workers := List.map (function | { cancel } as w when !cancel = false -> w | _ -> let n = !count in incr count; create_worker extra_arg x priority n) !workers end let n_workers x = locking x begin fun { workers } -> List.length !workers end let is_empty x = locking x begin fun { workers } -> !workers = [] end let create extra_arg ~size priority = let x = { lock = Mutex.create (); pool = { extra_arg; workers = ref []; count = ref size; }} in locking x begin fun { workers } -> workers := CList.init size (create_worker extra_arg x priority) end; x let cancel n x = locking x begin fun { workers } -> List.iter (fun { name; cancel } -> if n = name then cancel := true) !workers end let cancel_all x = locking x begin fun { workers } -> List.iter (fun { cancel } -> cancel := true) !workers end let destroy x = locking x begin fun { workers } -> List.iter (fun { cancel } -> cancel := true) !workers; workers := [] end end