#!/usr/bin/env python3
#
# Copyright (C) 2023 VyOS maintainers and contributors
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License version 2 or later as
# published by the Free Software Foundation.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
import os
import re
from argparse import ArgumentParser
from datetime import datetime, timedelta, time as type_time, date as type_date
from sys import exit
from time import time
from vyos.util import ask_yes_no, cmd, call, run, STDOUT
systemd_sched_file = "/run/systemd/shutdown/scheduled"
def utc2local(datetime):
now = time()
offs = datetime.fromtimestamp(now) - datetime.utcfromtimestamp(now)
return datetime + offs
def parse_time(s):
try:
if re.match(r'^\d{1,9999}$', s):
if (int(s) > 59) and (int(s) < 1440):
s = str(int(s)//60) + ":" + str(int(s)%60)
return datetime.strptime(s, "%H:%M").time()
if (int(s) >= 1440):
return s.split()
else:
return datetime.strptime(s, "%M").time()
else:
return datetime.strptime(s, "%H:%M").time()
except ValueError:
return None
def parse_date(s):
for fmt in ["%d%m%Y", "%d/%m/%Y", "%d.%m.%Y", "%d:%m:%Y", "%Y-%m-%d"]:
try:
return datetime.strptime(s, fmt).date()
except ValueError:
continue
# If nothing matched...
return None
def get_shutdown_status():
if os.path.exists(systemd_sched_file):
# Get scheduled from systemd file
with open(systemd_sched_file, 'r') as f:
data = f.read().rstrip('\n')
r_data = {}
for line in data.splitlines():
tmp_split = line.split("=")
if tmp_split[0] == "USEC":
# Convert USEC to human readable format
r_data['DATETIME'] = datetime.utcfromtimestamp(
int(tmp_split[1])/1000000).strftime('%Y-%m-%d %H:%M:%S')
else:
r_data[tmp_split[0]] = tmp_split[1]
return r_data
return None
def check_shutdown():
output = get_shutdown_status()
if output and 'MODE' in output:
dt = datetime.strptime(output['DATETIME'], '%Y-%m-%d %H:%M:%S')
if output['MODE'] == 'reboot':
print("Reboot is scheduled", utc2local(dt))
elif output['MODE'] == 'poweroff':
print("Poweroff is scheduled", utc2local(dt))
else:
print("Reboot or poweroff is not scheduled")
def cancel_shutdown():
output = get_shutdown_status()
if output and 'MODE' in output:
timenow = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
try:
run('/sbin/shutdown -c --no-wall')
except OSError as e:
exit(f'Could not cancel a reboot or poweroff: {e}')
mode = output['MODE']
message = f'Scheduled {mode} has been cancelled {timenow}'
run(f'wall {message} > /dev/null 2>&1')
else:
print("Reboot or poweroff is not scheduled")
def check_unsaved_config():
from vyos.config_mgmt import unsaved_commits
from vyos.utils.boot import boot_configuration_success
if unsaved_commits() and boot_configuration_success():
print("Warning: there are unsaved configuration changes!")
print("Run 'save' command if you do not want to lose those changes after reboot/shutdown.")
else:
pass
def execute_shutdown(time, reboot=True, ask=True):
check_unsaved_config()
action = "reboot" if reboot else "poweroff"
if not ask:
if not ask_yes_no(f"Are you sure you want to {action} this system?"):
exit(0)
action_cmd = "-r" if reboot else "-P"
if len(time) == 0:
# T870 legacy reboot job support
chk_vyatta_based_reboots()
###
out = cmd(f'/sbin/shutdown {action_cmd} now', stderr=STDOUT)
print(out.split(",", 1)[0])
return
elif len(time) == 1:
# Assume the argument is just time
ts = parse_time(time[0])
if ts:
cmd(f'/sbin/shutdown {action_cmd} {time[0]}', stderr=STDOUT)
# Inform all other logged in users about the reboot/shutdown
wall_msg = f'System {action} is scheduled {time[0]}'
cmd(f'/usr/bin/wall "{wall_msg}"')
else:
exit(f'Invalid time "{time[0]}". The valid format is HH:MM')
elif len(time) == 2:
# Assume it's date and time
ts = parse_time(time[0])
ds = parse_date(time[1])
if ts and ds:
t = datetime.combine(ds, ts)
td = t - datetime.now()
t2 = 1 + int(td.total_seconds())//60 # Get total minutes
cmd(f'/sbin/shutdown {action_cmd} {t2}', stderr=STDOUT)
# Inform all other logged in users about the reboot/shutdown
wall_msg = f'System {action} is scheduled {time[1]} {time[0]}'
cmd(f'/usr/bin/wall "{wall_msg}"')
else:
if not ts:
exit(f'Invalid time "{time[0]}". Uses 24 Hour Clock format')
else:
exit(f'Invalid date "{time[1]}". A valid format is YYYY-MM-DD [HH:MM]')
else:
exit('Could not decode date and time. Valids formats are HH:MM or YYYY-MM-DD HH:MM')
check_shutdown()
def chk_vyatta_based_reboots():
# T870 commit-confirm is still using the vyatta code base, once gone, the code below can be removed
# legacy scheduled reboot s are using at and store the is as /var/run/.job
# name is the node of scheduled the job, commit-confirm checks for that
f = r'/var/run/confirm.job'
if os.path.exists(f):
jid = open(f).read().strip()
if jid != 0:
call(f'sudo atrm {jid}')
os.remove(f)
def main():
parser = ArgumentParser()
parser.add_argument("--yes", "-y",
help="Do not ask for confirmation",
action="store_true",
dest="yes")
action = parser.add_mutually_exclusive_group(required=True)
action.add_argument("--reboot", "-r",
help="Reboot the system",
nargs="*",
metavar="HH:MM")
action.add_argument("--reboot_in", "-i",
help="Reboot the system",
nargs="*",
metavar="Minutes")
action.add_argument("--poweroff", "-p",
help="Poweroff the system",
nargs="*",
metavar="Minutes|HH:MM")
action.add_argument("--cancel", "-c",
help="Cancel pending shutdown",
action="store_true")
action.add_argument("--check",
help="Check pending shutdown",
action="store_true")
args = parser.parse_args()
try:
if args.reboot is not None:
for r in args.reboot:
if ':' not in r and '/' not in r and '.' not in r:
print("Incorrect format! Use HH:MM")
exit(1)
execute_shutdown(args.reboot, reboot=True, ask=args.yes)
if args.reboot_in is not None:
for i in args.reboot_in:
if ':' in i:
print("Incorrect format! Use Minutes")
exit(1)
execute_shutdown(args.reboot_in, reboot=True, ask=args.yes)
if args.poweroff is not None:
execute_shutdown(args.poweroff, reboot=False, ask=args.yes)
if args.cancel:
cancel_shutdown()
if args.check:
check_shutdown()
except KeyboardInterrupt:
exit("Interrupted")
if __name__ == "__main__":
main()