summaryrefslogtreecommitdiff
path: root/src/uncron.ml
blob: 70407741e7df97f76ea27d02f2f3e9e984981cc6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
(** A non-time based job queue service

    Build with: ocamlfind ocamlopt -package lwt,lwt.unix,lwt_ppx,logs,logs.lwt -linkpkg -o uncron ./uncron.ml
 *)

open Lwt

(* Queue *)
module Queue = struct
    type 'a queue = Queue of 'a list * 'a list

    let empty = Queue ([], [])

    let add q item =
        match q with
        | Queue (front, back) -> Queue (item :: front, back)

    let take q =
        match q with
        | Queue ([], []) -> None, empty
        | Queue (front, b :: bs) -> Some b, (Queue (front, bs))
        | Queue (front, []) ->
            let back = List.rev front
            in (Some (List.hd back), Queue ([], List.tl back))
end

(* Shared job queue *)
let queue = ref Queue.empty

let sock_path = "/tmp/uncron.sock"
let backlog = 100

(* Communication functions *)
let handle_message msg =
    let jobs = !queue in
    let jobs = Queue.add jobs msg in
    Logs_lwt.info (fun m -> m "Job \"%s\" was added to the queue" msg);
    queue := jobs; Printf.sprintf "Job \"%s\" accepted" msg

let rec handle_connection ic oc () =
    Lwt_io.read_line_opt ic >>=
    (fun msg ->
        match msg with
        | Some msg -> 
            let reply = handle_message msg in
            let%lwt () = Lwt_io.write_line oc reply in
            let%lwt () = Lwt_io.flush oc in
            Lwt_io.close oc
        | None -> Logs_lwt.info (fun m -> m "Connection closed") >>= return)

let accept_connection conn =
    let fd, _ = conn in
    let ic = Lwt_io.of_fd Lwt_io.Input fd in
    let oc = Lwt_io.of_fd Lwt_io.Output fd in
    Lwt.on_failure (handle_connection ic oc ()) (fun e -> Logs.err (fun m -> m "%s" (Printexc.to_string e)));
    Logs_lwt.info (fun m -> m "New connection") >>= return
 
let delete_socket_if_exists sockfile =
    try
        let _ = Unix.stat sockfile in
        Unix.unlink sockfile
    with
    | Unix.Unix_error (Unix.ENOENT, _, _) -> ()
    | _ -> failwith "Could not delete old socket file, exiting"

(** Bind to a UNIX socket *)
let create_socket sockfile =
    let open Lwt_unix in
    let () = delete_socket_if_exists sockfile in
    let backlog = 100 in
    let%lwt sock = socket PF_UNIX SOCK_STREAM 0 |> Lwt.return in
    let%lwt () = Lwt_unix.bind sock @@ ADDR_UNIX(sockfile) in
    listen sock backlog;
    Lwt.return sock

(* Job handling functions *)
let log_result res =
    let msg =
        (match res with
        | Lwt_unix.WEXITED n -> Printf.sprintf "Job exited with code %d" n
        | Lwt_unix.WSIGNALED n -> Printf.sprintf "Job was killed by signal %d" n
        | Lwt_unix.WSTOPPED _ -> "Job stopped")
    in Logs_lwt.info (fun m -> m "%s" msg)

let run_job job =
    match job with
    | Some j ->
        let%lwt _ = Logs_lwt.info (fun m -> m "Running job \"%s\"" j) in
        let%lwt res = Lwt_unix.system j in
        let%lwt () = log_result res in
        return ()
    | None -> return_unit

let fetch_job () =
    let jobs = !queue in
    let item, rest = Queue.take jobs in
    match item with
    | None -> return None
    | Some i -> queue := rest; return @@ Some i

let rec run_jobs () =
    fetch_job () >>= run_job >>= (fun () -> Lwt_unix.sleep 1.) >>= run_jobs

let start_runner () =
    Lwt.on_failure (run_jobs ()) (fun e -> Logs.err (fun m -> m "%s" (Printexc.to_string e) ));
    return_unit

let create_server sock =
    let rec serve () =
        Lwt_unix.accept sock >>= accept_connection >>= serve
    in serve

let main_loop () =
    let%lwt sock = create_socket sock_path in
    let%lwt () = start_runner () in
    let serve = create_server sock in
    serve ()

let () =
    let () = print_endline "Starting uncron" in
    let () = Logs.set_reporter (Logs.format_reporter ()) in
    let () = Logs.set_level (Some Logs.Info) in
    Lwt_main.run @@ main_loop ()