1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382
(************************************************************************) (* * The Coq Proof Assistant / The Coq Development Team *) (* v * INRIA, CNRS and contributors - Copyright 1999-2019 *) (* <O___,, * (see CREDITS file for the list of authors) *) (* \VV/ **************************************************************) (* // * This file is distributed under the terms of the *) (* * GNU Lesser General Public License Version 2.1 *) (* * (see LICENSE file for the text of the license) *) (************************************************************************) open CErrors open Pp open Util let stm_pr_err pp = Format.eprintf "%s] @[%a@]\n%!" (Spawned.process_id ()) Pp.pp_with pp let stm_prerr_endline s = if !Flags.debug then begin stm_pr_err (str s) end else () type cancel_switch = bool ref let async_proofs_flags_for_workers = ref [] module type Task = sig type task type competence type worker_status = Fresh | Old of competence (* Marshallable *) type request type response val name : string ref (* UID of the task kind, for -toploop *) val extra_env : unit -> string array (* run by the master, on a thread *) val request_of_task : worker_status -> task -> request option val task_match : worker_status -> task -> bool val use_response : worker_status -> task -> response -> [ `Stay of competence * task list | `End ] val on_marshal_error : string -> task -> unit val on_task_cancellation_or_expiration_or_slave_death : task option -> unit val forward_feedback : Feedback.feedback -> unit (* run by the worker *) val perform : request -> response (* debugging *) val name_of_task : task -> string val name_of_request : request -> string end module Make(T : Task) () = struct exception Die type response = | Response of T.response | RespFeedback of Feedback.feedback | RespGetCounterNewUnivLevel type request = Request of T.request type more_data = | MoreDataUnivLevel of UnivGen.univ_unique_id list let slave_respond (Request r) = let res = T.perform r in Response res exception MarshalError of string let marshal_to_channel oc data = Marshal.to_channel oc data []; flush oc let marshal_err s = raise (MarshalError s) let marshal_request oc (req : request) = try marshal_to_channel oc req with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_request: "^s) let unmarshal_request ic = try (CThread.thread_friendly_input_value ic : request) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_request: "^s) let marshal_response oc (res : response) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_response: "^s) let unmarshal_response ic = try (CThread.thread_friendly_input_value ic : response) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_response: "^s) let marshal_more_data oc (res : more_data) = try marshal_to_channel oc res with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("marshal_more_data: "^s) let unmarshal_more_data ic = try (CThread.thread_friendly_input_value ic : more_data) with Failure s | Invalid_argument s | Sys_error s -> marshal_err ("unmarshal_more_data: "^s) let report_status ?(id = !Flags.async_proofs_worker_id) s = let open Feedback in feedback ~id:Stateid.initial (WorkerStatus(id, s)) module Worker = Spawn.Sync () module Model = struct type process = Worker.process type extra = (T.task * cancel_switch) TQueue.t let spawn id priority = let name = Printf.sprintf "%s:%d" !T.name id in let proc, ic, oc = (* Filter arguments for slaves. *) let rec set_slave_opt = function | [] -> !async_proofs_flags_for_workers @ ["-worker-id"; name; "-async-proofs-worker-priority"; CoqworkmgrApi.(string_of_priority priority)] (* Options to discard: 0 arguments *) | "-emacs"::tl -> set_slave_opt tl (* Options to discard: 1 argument *) | ( "-async-proofs" | "-vio2vo" | "-o" | "-load-vernac-source" | "-l" | "-load-vernac-source-verbose" | "-lv" | "-compile" | "-compile-verbose" | "-async-proofs-cache" | "-async-proofs-j" | "-async-proofs-tac-j" | "-async-proofs-private-flags" | "-async-proofs-tactic-error-resilience" | "-async-proofs-command-error-resilience" | "-async-proofs-delegation-threshold" | "-async-proofs-worker-priority" | "-worker-id") :: _ :: tl -> set_slave_opt tl (* We need to pass some options with one argument *) | ( "-I" | "-include" | "-top" | "-topfile" | "-coqlib" | "-exclude-dir" | "-compat" | "-load-ml-object" | "-load-ml-source" | "-require" | "-w" | "-color" | "-init-file" | "-profile-ltac-cutoff" | "-main-channel" | "-control-channel" | "-mangle-names" | "-set" | "-unset" | "-diffs" | "-mangle-name" | "-dump-glob" | "-bytecode-compiler" | "-native-compiler" as x) :: a :: tl -> x :: a :: set_slave_opt tl (* We need to pass some options with two arguments *) | ( "-R" | "-Q" as x) :: a1 :: a2 :: tl -> x :: a1 :: a2 :: set_slave_opt tl (* Finally we pass all options starting in '-'; check this is safe w.r.t the weird vio* option set *) | x :: tl when x.[0] = '-' -> x :: set_slave_opt tl (* We assume this is a file, filter out *) | _ :: tl -> set_slave_opt tl in let args = Array.of_list (set_slave_opt (List.tl (Array.to_list Sys.argv))) in let env = Array.append (T.extra_env ()) (Unix.environment ()) in let worker_name = System.get_toplevel_path ("coq" ^ !T.name) in Worker.spawn ~env worker_name args in name, proc, CThread.prepare_in_channel_for_thread_friendly_io ic, oc let manager cpanel (id, proc, ic, oc) = let { WorkerPool.extra = queue; exit; cancelled } = cpanel in let exit () = report_status ~id "Dead"; exit () in let last_task = ref None in let worker_age = ref T.Fresh in let got_token = ref false in let giveback_exec_token () = if !got_token then (CoqworkmgrApi.giveback 1; got_token := false) in let stop_waiting = ref false in let expiration_date = ref (ref false) in let pick_task () = stm_prerr_endline "waiting for a task"; let pick age (t, c) = not !c && T.task_match age t in let task, task_expiration = TQueue.pop ~picky:(pick !worker_age) ~destroy:stop_waiting queue in expiration_date := task_expiration; last_task := Some task; stm_prerr_endline ("got task: " ^ T.name_of_task task); task in let add_tasks l = List.iter (fun t -> TQueue.push queue (t,!expiration_date)) l in let get_exec_token () = ignore(CoqworkmgrApi.get 1); got_token := true; stm_prerr_endline ("got execution token") in let kill proc = Worker.kill proc; stm_prerr_endline ("Worker exited: " ^ match Worker.wait proc with | Unix.WEXITED 0x400 -> "exit code unavailable" | Unix.WEXITED i -> Printf.sprintf "exit(%d)" i | Unix.WSIGNALED sno -> Printf.sprintf "signalled(%d)" sno | Unix.WSTOPPED sno -> Printf.sprintf "stopped(%d)" sno) in let more_univs n = CList.init n (fun _ -> UnivGen.new_univ_id ()) in let rec kill_if () = if not (Worker.is_alive proc) then () else if cancelled () || !(!expiration_date) then let () = stop_waiting := true in let () = TQueue.broadcast queue in Worker.kill proc else let () = Unix.sleep 1 in kill_if () in let kill_if () = try kill_if () with Sys.Break -> let () = stop_waiting := true in let () = TQueue.broadcast queue in Worker.kill proc in let _ = CThread.create kill_if () in try while true do report_status ~id "Idle"; let task = pick_task () in match T.request_of_task !worker_age task with | None -> stm_prerr_endline ("Task expired: " ^ T.name_of_task task) | Some req -> try get_exec_token (); marshal_request oc (Request req); let rec continue () = match unmarshal_response ic with | RespGetCounterNewUnivLevel -> marshal_more_data oc (MoreDataUnivLevel (more_univs 10)); continue () | RespFeedback fbk -> T.forward_feedback fbk; continue () | Response resp -> match T.use_response !worker_age task resp with | `End -> raise Die | `Stay(competence, new_tasks) -> last_task := None; giveback_exec_token (); worker_age := T.Old competence; add_tasks new_tasks in continue () with | (Sys_error _|Invalid_argument _|End_of_file|Die) as e -> raise e (* we pass the exception to the external handler *) | MarshalError s -> T.on_marshal_error s task; raise Die | e -> stm_pr_err Pp.(seq [str "Uncaught exception in worker manager: "; print e]); flush_all (); raise Die done with | (Die | TQueue.BeingDestroyed) -> giveback_exec_token (); kill proc; exit () | Sys_error _ | Invalid_argument _ | End_of_file -> T.on_task_cancellation_or_expiration_or_slave_death !last_task; giveback_exec_token (); kill proc; exit () end module Pool = WorkerPool.Make(Model) type queue = { active : Pool.pool; queue : (T.task * cancel_switch) TQueue.t; cleaner : Thread.t option; } let create size priority = let cleaner queue = while true do try ignore(TQueue.pop ~picky:(fun (_,cancelled) -> !cancelled) queue) with TQueue.BeingDestroyed -> Thread.exit () done in let queue = TQueue.create () in { active = Pool.create queue ~size priority; queue; cleaner = if size > 0 then Some (CThread.create cleaner queue) else None; } let destroy { active; queue } = Pool.destroy active; TQueue.destroy queue let broadcast { queue } = TQueue.broadcast queue let enqueue_task { queue; active } t ~cancel_switch = stm_prerr_endline ("Enqueue task "^T.name_of_task t); TQueue.push queue (t, cancel_switch) let cancel_worker { active } n = Pool.cancel n active let name_of_request (Request r) = T.name_of_request r let set_order { queue } cmp = TQueue.set_order queue (fun (t1,_) (t2,_) -> cmp t1 t2) let join { queue; active } = if not (Pool.is_empty active) then TQueue.wait_until_n_are_waiting_and_queue_empty (Pool.n_workers active + 1(*cleaner*)) queue let cancel_all { queue; active } = TQueue.clear queue; Pool.cancel_all active let slave_ic = ref None let slave_oc = ref None let init_stdout () = let ic, oc = Spawned.get_channels () in slave_oc := Some oc; slave_ic := Some ic let bufferize f = let l = ref [] in fun () -> match !l with | [] -> let data = f () in l := List.tl data; List.hd data | x::tl -> l := tl; x let slave_handshake () = Pool.worker_handshake (Option.get !slave_ic) (Option.get !slave_oc) let pp_pid pp = Pp.(str (Spawned.process_id () ^ " ") ++ pp) let debug_with_pid = Feedback.(function | { contents = Message(Debug, loc, pp) } as fb -> { fb with contents = Message(Debug,loc, pp_pid pp) } | x -> x) let main_loop () = (* We pass feedback to master *) let slave_feeder oc fb = Control.protect_sigalrm (fun () -> Marshal.to_channel oc (RespFeedback (debug_with_pid fb)) []; flush oc) () in ignore (Feedback.add_feeder (fun x -> slave_feeder (Option.get !slave_oc) x)); (* We ask master to allocate universe identifiers *) UnivGen.set_remote_new_univ_id (bufferize @@ Control.protect_sigalrm (fun () -> marshal_response (Option.get !slave_oc) RespGetCounterNewUnivLevel; match unmarshal_more_data (Option.get !slave_ic) with | MoreDataUnivLevel l -> l)); let working = ref false in slave_handshake (); while true do try working := false; let request = unmarshal_request (Option.get !slave_ic) in working := true; report_status (name_of_request request); let response = slave_respond request in report_status "Idle"; marshal_response (Option.get !slave_oc) response; CEphemeron.clean () with | MarshalError s -> stm_pr_err Pp.(prlist str ["Fatal marshal error: "; s]); flush_all (); exit 2 | End_of_file -> stm_prerr_endline "connection lost"; flush_all (); exit 2 | e -> stm_pr_err Pp.(seq [str "Slave: critical exception: "; print e]); flush_all (); exit 1 done let clear { queue; active } = assert(Pool.is_empty active); (* We allow that only if no slaves *) TQueue.clear queue let snapshot { queue; active } = List.map fst (TQueue.wait_until_n_are_waiting_then_snapshot (Pool.n_workers active) queue) let with_n_workers n priority f = let q = create n priority in try let rc = f q in destroy q; rc with e -> let e = CErrors.push e in destroy q; iraise e let n_workers { active } = Pool.n_workers active end module MakeQueue(T : Task) () = struct include Make(T) () end module MakeWorker(T : Task) () = struct include Make(T) () end