1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
|
(** A non-time based job queue service
Build with: ocamlfind ocamlopt -package lwt,lwt.unix,lwt_ppx,logs,logs.lwt -linkpkg -o uncron ./uncron.ml
*)
let () = Printexc.record_backtrace true
open Lwt
let (let*) = Lwt.bind
(* Queue *)
module Queue = struct
type 'a queue = Queue of 'a list * 'a list
let empty = Queue ([], [])
let add q item =
match q with
| Queue (front, back) -> Queue (item :: front, back)
let take q =
match q with
| Queue ([], []) -> None, empty
| Queue (front, b :: bs) -> Some b, (Queue (front, bs))
| Queue (front, []) ->
let back = List.rev front
in (Some (List.hd back), Queue ([], List.tl back))
end
(* Shared job queue *)
let queue = ref Queue.empty
let sock_path = "/tmp/uncron.sock"
let backlog = 100
(* Communication functions *)
let handle_message msg =
let jobs = !queue in
let jobs = Queue.add jobs msg in
let () = ignore @@ Logs_lwt.info (fun m -> m "Job \"%s\" was added to the queue" msg) in
queue := jobs; Printf.sprintf "Job \"%s\" accepted" msg
let handle_connection ic oc () =
Lwt_io.read_line_opt ic >>=
(fun msg ->
match msg with
| Some msg ->
let reply = handle_message msg in
let* () = Lwt_io.write_line oc reply in
let* () = Lwt_io.flush oc in
Lwt_io.close oc
| None -> Logs_lwt.info (fun m -> m "Connection closed") >>= return)
let accept_connection conn =
let fd, _ = conn in
let ic = Lwt_io.of_fd ~mode:Lwt_io.Input fd in
let oc = Lwt_io.of_fd ~mode:Lwt_io.Output fd in
Lwt.on_failure (handle_connection ic oc ()) (fun e -> Logs.err (fun m -> m "%s" (Printexc.to_string e)));
Logs_lwt.info (fun m -> m "New connection") >>= return
let delete_socket_if_exists sockfile =
try
let _ = Unix.stat sockfile in
Unix.unlink sockfile
with
| Unix.Unix_error (Unix.ENOENT, _, _) -> ()
| _ -> failwith "Could not delete old socket file, exiting"
(** Bind to a UNIX socket *)
let create_socket sockfile =
let open Lwt_unix in
let () = delete_socket_if_exists sockfile in
let backlog = 100 in
let* sock = socket PF_UNIX SOCK_STREAM 0 |> Lwt.return in
let* () = Lwt_unix.bind sock @@ ADDR_UNIX(sockfile) in
listen sock backlog;
Lwt.return sock
(* Job handling functions *)
let log_result (res, out, err) =
let msg =
(match res with
| Ok (Unix.WEXITED n) -> Printf.sprintf "Job exited with code %d" n
| Ok (Unix.WSIGNALED n) -> Printf.sprintf "Job was killed by signal %d" n
| Ok (Unix.WSTOPPED _) -> "Job stopped"
| Error msg -> Printf.sprintf "Job execution caused an exception: %s" msg)
in Logs_lwt.info (fun m -> m "%s\nStdout: %s\nStderr: %s\n" msg out err)
let get_program_output ?(input=None) command env_array =
(* open_process_full does not automatically pass the existing environment
to the child process, so we need to add it to our custom environment. *)
let env_array = Array.append (Unix.environment ()) env_array in
try
let std_out, std_in, std_err = Unix.open_process_full command env_array in
let () =
begin match input with
| None -> ()
| Some i ->
let () = Printf.fprintf std_in i; flush std_in in
(* close stdin to signal the end of input *)
close_out std_in
end
in
let output = CCIO.read_all std_out in
let err = CCIO.read_all std_err in
let res = Unix.close_process_full (std_out, std_in, std_err) in
(Ok res, output, err)
with
| Sys_error msg -> (Error (Printf.sprintf "System error: %s" msg), "", "")
| _ ->
let msg = Printexc.get_backtrace () in
(Error msg, "", "")
let run_job job =
match job with
| Some j ->
let* _ = Logs_lwt.info (fun m -> m "Running job \"%s\"" j) in
let* (res, out, err) = get_program_output j [||] |> Lwt.return in
let* () = log_result (res, out, err) in
return ()
| None -> return_unit
let fetch_job () =
let jobs = !queue in
let item, rest = Queue.take jobs in
match item with
| None -> return None
| Some i -> queue := rest; return @@ Some i
let rec run_jobs () =
fetch_job () >>= run_job >>= (fun () -> Lwt_unix.sleep 1.) >>= run_jobs
let start_runner () =
Lwt.on_failure (run_jobs ()) (fun e -> Logs.err (fun m -> m "%s" (Printexc.to_string e) ));
return_unit
let create_server sock =
let rec serve () =
Lwt_unix.accept sock >>= accept_connection >>= serve
in serve
let main_loop () =
let* sock = create_socket sock_path in
let* () = start_runner () in
let serve = create_server sock in
serve ()
let () =
let () = print_endline "Starting uncron" in
let () = Logs.set_reporter (Logs.format_reporter ()) in
let () = Logs.set_level (Some Logs.Info) in
Lwt_main.run @@ main_loop ()
|