diff options
| author | Daniil Baturin <daniil@vyos.io> | 2020-08-12 23:38:06 +0200 |
|---|---|---|
| committer | Daniil Baturin <daniil@vyos.io> | 2020-08-12 23:38:06 +0200 |
| commit | 51d2cf5c9158b03b0a7e83a7377436a0cbaf1025 (patch) | |
| tree | f9738fe17567be60e7d709dbcbd55b54b4e22146 /src | |
| download | uncron-51d2cf5c9158b03b0a7e83a7377436a0cbaf1025.tar.gz uncron-51d2cf5c9158b03b0a7e83a7377436a0cbaf1025.zip | |
Initial import of the old, ad hoc code.
Diffstat (limited to 'src')
| -rw-r--r-- | src/Makefile | 4 | ||||
| -rw-r--r-- | src/uncron-add | 4 | ||||
| -rw-r--r-- | src/uncron.ml | 121 |
3 files changed, 129 insertions, 0 deletions
diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..87e1491 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,4 @@ +all: uncron + +uncron: uncron.ml + ocamlfind ocamlopt -package lwt,lwt.unix,lwt_ppx,logs,logs.lwt -linkpkg -o uncron -g ./uncron.ml diff --git a/src/uncron-add b/src/uncron-add new file mode 100644 index 0000000..eca6c26 --- /dev/null +++ b/src/uncron-add @@ -0,0 +1,4 @@ +#!/bin/sh + +echo $1 | socat - UNIX-CONNECT:/tmp/uncron.sock + diff --git a/src/uncron.ml b/src/uncron.ml new file mode 100644 index 0000000..870fc7e --- /dev/null +++ b/src/uncron.ml @@ -0,0 +1,121 @@ +(** A non-time based job queue service + + Build with: ocamlfind ocamlopt -package lwt,lwt.unix,lwt_ppx,logs,logs.lwt -linkpkg -o uncron ./uncron.ml + *) + +open Lwt + +(* Queue *) +module Queue = struct + type 'a queue = Queue of 'a list * 'a list + + let empty = Queue ([], []) + + let add q item = + match q with + | Queue (front, back) -> Queue (item :: front, back) + + let take q = + match q with + | Queue ([], []) -> None, empty + | Queue (front, b :: bs) -> Some b, (Queue (front, bs)) + | Queue (front, []) -> + let back = List.rev front + in (Some (List.hd back), Queue ([], List.tl back)) +end + +(* Shared job queue *) +let queue = ref Queue.empty + +let sock_path = "/tmp/uncron.sock" +let backlog = 100 + +(* Communication functions *) +let handle_message msg = + let jobs = !queue in + let jobs = Queue.add jobs msg in + Logs_lwt.info (fun m -> m "Job \"%s\" was added to the queue" msg); + queue := jobs; Printf.sprintf "Job \"%s\" accepted" msg + +let rec handle_connection ic oc () = + Lwt_io.read_line_opt ic >>= + (fun msg -> + match msg with + | Some msg -> + let reply = handle_message msg in + Lwt_io.write_line oc reply >>= handle_connection ic oc + | None -> Logs_lwt.info (fun m -> m "Connection closed") >>= return) + +let accept_connection conn = + let fd, _ = conn in + let ic = Lwt_io.of_fd Lwt_io.Input fd in + let oc = Lwt_io.of_fd Lwt_io.Output fd in + Lwt.on_failure (handle_connection ic oc ()) (fun e -> Logs.err (fun m -> m "%s" (Printexc.to_string e) )); + Logs_lwt.info (fun m -> m "New connection") >>= return + +let delete_socket_if_exists sockfile = + try + let _ = Unix.stat sockfile in + Unix.unlink sockfile + with + | Unix.Unix_error (Unix.ENOENT, _, _) -> () + | _ -> failwith "Could not delete old socket file, exiting" + +(** Bind to a UNIX socket *) +let create_socket sockfile = + let open Lwt_unix in + let () = delete_socket_if_exists sockfile in + let backlog = 100 in + let%lwt sock = socket PF_UNIX SOCK_STREAM 0 |> Lwt.return in + let%lwt () = Lwt_unix.bind sock @@ ADDR_UNIX(sockfile) in + listen sock backlog; + Lwt.return sock + +(* Job handling functions *) +let log_result res = + let msg = + (match res with + | Lwt_unix.WEXITED n -> Printf.sprintf "Job exited with code %d" n + | Lwt_unix.WSIGNALED n -> Printf.sprintf "Job was killed by signal %d" n + | Lwt_unix.WSTOPPED _ -> "Job stopped") + in Logs_lwt.info (fun m -> m "%s" msg) + +let run_job job = + match job with + | Some j -> + let%lwt _ = Logs_lwt.info (fun m -> m "Running job \"%s\"" j) in + let%lwt res = Lwt_unix.system j in + let%lwt () = log_result res in + return () + | None -> return_unit + +let fetch_job () = + let jobs = !queue in + let item, rest = Queue.take jobs in + match item with + | None -> return None + | Some i -> queue := rest; return @@ Some i + +let rec run_jobs () = + fetch_job () >>= run_job >>= (fun () -> Lwt_unix.sleep 1.) >>= run_jobs + +let start_runner () = + Lwt.on_failure (run_jobs ()) (fun e -> Logs.err (fun m -> m "%s" (Printexc.to_string e) )); + return_unit + +let create_server sock = + let rec serve () = + Lwt_unix.accept sock >>= accept_connection >>= serve + in serve + +let main_loop () = + let%lwt sock = create_socket sock_path in + let%lwt () = start_runner () in + let serve = create_server sock in + serve () + +let () = + let () = print_endline "Starting uncron" in + let () = Logs.set_reporter (Logs.format_reporter ()) in + let () = Logs.set_level (Some Logs.Info) in + Lwt_main.run @@ main_loop () |
