summaryrefslogtreecommitdiff
path: root/src/funcs_worklogger.py
diff options
context:
space:
mode:
authorWayne-Cole <77279425+Wacky404@users.noreply.github.com>2024-12-23 06:47:14 -0600
committerWayne-Cole <77279425+Wacky404@users.noreply.github.com>2024-12-23 06:47:14 -0600
commitf515c567586822ee1927fba3549d75ca8c76b3fe (patch)
treeea92fafe20c5ada6625daf3068b58343ab9e0a06 /src/funcs_worklogger.py
parentfc89c79cf88c47a19a6162aa1d311b26d8376adb (diff)
downloadworklogger-f515c567586822ee1927fba3549d75ca8c76b3fe.tar.xz
worklogger-f515c567586822ee1927fba3549d75ca8c76b3fe.zip
feat: merge function finished; ironing out bugs and adding in tests
Diffstat (limited to 'src/funcs_worklogger.py')
-rw-r--r--src/funcs_worklogger.py234
1 files changed, 143 insertions, 91 deletions
diff --git a/src/funcs_worklogger.py b/src/funcs_worklogger.py
index b58efa4..4388ad4 100644
--- a/src/funcs_worklogger.py
+++ b/src/funcs_worklogger.py
@@ -6,14 +6,115 @@ author: Wacky404
email: wacky404@dev.com
"""
-from pprint import pprint
from log_util_worklogger import logger
from datetime import timezone, datetime
from pathlib import Path
+import paths_util_worklogger as pu
+import os.path as osp
import csv
import os
-import os.path as osp
-import paths_util_worklogger as pu
+
+
+TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z"
+
+
+def _prep_write(format, s_log, p_settings, _dt, k_args):
+ dt = _dt
+ if format == '.txt' or format == None:
+ for key, val in k_args.items():
+ if key == 'job' and val is not None:
+ s_log += f"{key}:'{val}'"
+ elif key == 'time' and val is not None:
+ s_log += f"{key}:{val}hrs "
+ elif key == 'message' and val is not None:
+ s_log += f"{'desc'}:'{val}' "
+ elif key == 'proj' and val is not None and p_settings is not None:
+ job_projects = p_settings[str(k_args['job']).upper()]
+ if str(val).upper() in job_projects.keys():
+ s_log += f"{key}:{job_projects[str(val).upper()]} "
+ logger.debug(
+ f"Job: {k_args['job']}; Projects: {job_projects}")
+ else:
+ s_log += f"{key}:{val} "
+ logger.debug(
+ f"Couldn't get specified projects for {k_args['job']} in config.")
+ elif key == 'start' and val == 'now':
+ s_log += f"{key}:{dt.strftime("'%H:%M'")} "
+ elif key == 'start' and val is not None:
+ s_log += f"{key}:'{val}' "
+ elif key == 'end' and val == 'now':
+ s_log += f"{key}:{dt.strftime("'%H:%M'")} "
+ elif key == 'end' and val is not None:
+ s_log += f"{key}:'{val}' "
+ elif val is not None:
+ s_log += f"{key}:{val} "
+
+ return s_log, None, None
+
+ elif format == '.csv':
+ cp_kwargs = {
+ 'timestamp': f"{dt.strftime(TIME_FORMAT)}"
+ }
+ for key, val in k_args.items():
+ if key == 'message':
+ cp_kwargs['desc'] = val
+ continue
+ elif key == 'proj' and val is not None and p_settings is not None:
+ job_projects = p_settings[str(
+ k_args['job']).upper()]
+ if str(val).upper() in job_projects.keys():
+ cp_kwargs[key] = job_projects[str(val).upper()]
+ logger.debug(
+ f"Job: {k_args['job']}; Projects: {job_projects}")
+ else:
+ logger.debug(
+ f"Couldn't get specified projects for {k_args['job']} in config.")
+ continue
+ elif key == 'start' and val == 'now':
+ cp_kwargs['start'] = f"{dt.strftime("'%H:%M'")}"
+ continue
+ elif key == 'end' and val == 'now':
+ cp_kwargs['end'] = f"{dt.strftime("'%H:%M'")}"
+ continue
+
+ cp_kwargs[key] = val
+
+ field_names = [x for x in cp_kwargs.keys()]
+
+ return None, cp_kwargs, field_names
+
+ return None, None, None
+
+
+def _write_file(p_save, p_backup, format, j_upper, len_file, _cp_kwargs, _log_str, _field_names) -> None:
+ for path in [p_save, p_backup]:
+ if path is not None:
+ if format != None:
+ chosen_job = osp.join(path, f"{j_upper}{format}")
+ else:
+ chosen_job = osp.join(path, f"{j_upper}.txt")
+ try:
+ if format == '.csv' and len_file == 0 or len_file == None:
+ with open(chosen_job, 'w', newline='\n') as fd:
+ csv_writer = csv.DictWriter(
+ fd, fieldnames=_field_names)
+ csv_writer.writeheader()
+ for _kwarg in _cp_kwargs:
+ csv_writer.writerow(_kwarg)
+ elif format == '.csv' and len_file >= 1:
+ with open(chosen_job, 'a', newline='\n') as fd:
+ csv_writer = csv.DictWriter(
+ fd, fieldnames=_field_names)
+ for _kwarg in _cp_kwargs:
+ csv_writer.writerow(_kwarg)
+ else:
+ with open(chosen_job, 'a') as fd:
+ for _log in _log_str:
+ fd.write(f"{_log}\n")
+ except PermissionError as e:
+ logger.exception(str(e))
+
+ logger.info(f"Written {j_upper} worklog(s) to {path}")
def configure(dir_list=None):
@@ -34,7 +135,7 @@ def configure(dir_list=None):
f"Details: This is okay, output will save in existing {directory}.")
-def parse(filepath=None):
+def parse(filepath):
filename, ext = osp.splitext(osp.basename(filepath))
logger.debug(f"Parsed: {filename}, {ext}")
if ext == '.txt':
@@ -51,12 +152,13 @@ def parse(filepath=None):
for key, val in row.items():
if key == 'timestamp':
csv_log += f"{val} "
+ elif key == 'job':
+ csv_log += f"{key}:'{val}' "
elif val:
csv_log += f"{key}:{val} "
lines.append(csv_log)
- print(lines)
return lines
# TODO: Make the parser for json, once I figure out json structure
elif ext == '.json':
@@ -65,7 +167,7 @@ def parse(filepath=None):
def add_log(file_format=None, proj_settings=None, savepath=None, backuppath=None, **kwargs):
dt = datetime.datetime.now(timezone.utc)
- log_str = f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")} "
+ log_str = f"{dt.strftime(TIME_FORMAT)} "
job_upper = str(kwargs['job']).upper()
if file_format != None:
@@ -97,7 +199,7 @@ def add_log(file_format=None, proj_settings=None, savepath=None, backuppath=None
insert_time = input("End Time: ")
with open(chosen_job, 'a') as fd:
if file_format == '.txt':
- if insert_time == 'now':
+ if insert_time.lower() == 'now':
fd.write(
f"{log_str}job:{kwargs['job']} end:{dt.strftime("'%H:%M'")}\n")
else:
@@ -108,95 +210,19 @@ def add_log(file_format=None, proj_settings=None, savepath=None, backuppath=None
fd, fieldnames=['timestamp'] + [x if x != 'message' else 'desc' for x in kwargs.keys()])
if insert_time == 'now':
csv_writer.writerow(
- {'timestamp': f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")}", 'job': kwargs['job'], 'end': f"{dt.strftime("'%H:%M'")}"})
+ {'timestamp': f"{dt.strftime(TIME_FORMAT)}", 'job': kwargs['job'], 'end': f"{dt.strftime("'%H:%M'")}"})
else:
csv_writer.writerow(
- {'timestamp': f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")}", 'job': kwargs['job'], 'end': f"{insert_time}"})
+ {'timestamp': f"{dt.strftime(TIME_FORMAT)}", 'job': kwargs['job'], 'end': f"{insert_time}"})
break
elif add_endtime == 'n':
break
- if file_format == '.txt' or file_format == None:
- for key, val in kwargs.items():
- if key == 'time' and val is not None:
- log_str += f"{key}:{val}hrs "
- elif key == 'message' and val is not None:
- log_str += f"{'desc'}:'{val}' "
- elif key == 'proj' and val is not None and proj_settings is not None:
- job_projects: dict[str, str] = proj_settings[str(
- kwargs['job']).upper()]
- if str(val).upper() in job_projects.keys():
- log_str += f"{key}:{job_projects[str(val).upper()]} "
- logger.debug(
- f"Job: {kwargs['job']}; Projects: {job_projects}")
- else:
- log_str += f"{key}:{val} "
- logger.debug(
- f"Couldn't get specified projects for {kwargs['job']} in config.")
- elif key == 'start' and val == 'now':
- log_str += f"{key}:{dt.strftime("'%H:%M'")} "
- elif key == 'start' and val is not None:
- log_str += f"{key}:'{val}' "
- elif key == 'end' and val == 'now':
- log_str += f"{key}:{dt.strftime("'%H:%M'")} "
- elif key == 'end' and val is not None:
- log_str += f"{key}:'{val}' "
- elif val is not None:
- log_str += f"{key}:{val} "
- elif file_format == '.csv':
- cp_kwargs = kwargs
- cp_kwargs = {
- 'timestamp': f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")}"
- }
- for key, val in kwargs.items():
- if key == 'message':
- cp_kwargs['desc'] = val
- continue
- elif key == 'proj' and val is not None and proj_settings is not None:
- job_projects: dict[str, str] = proj_settings[str(
- kwargs['job']).upper()]
- if str(val).upper() in job_projects.keys():
- cp_kwargs[key] = job_projects[str(val).upper()]
- logger.debug(
- f"Job: {kwargs['job']}; Projects: {job_projects}")
- else:
- logger.debug(
- f"Couldn't get specified projects for {kwargs['job']} in config.")
- continue
- elif key == 'start' and val == 'now':
- cp_kwargs['start'] = f"{dt.strftime("'%H:%M'")}"
- continue
- elif key == 'end' and val == 'now':
- cp_kwargs['end'] = f"{dt.strftime("'%H:%M'")}"
- continue
-
- cp_kwargs[key] = val
-
- field_names = [x for x in cp_kwargs.keys()]
-
- for path in [savepath, backuppath]:
- if file_format != None:
- chosen_job = osp.join(path, f"{job_upper}{file_format}")
- else:
- chosen_job = osp.join(path, f"{job_upper}.txt")
- try:
- if file_format == '.csv' and len(parsed_file) == 0:
- with open(chosen_job, 'w', newline='\n') as fd:
- csv_writer = csv.DictWriter(fd, fieldnames=field_names)
- csv_writer.writeheader()
- csv_writer.writerow(cp_kwargs)
- elif file_format == '.csv' and len(parsed_file) >= 1:
- with open(chosen_job, 'a', newline='\n') as fd:
- csv_writer = csv.DictWriter(fd, fieldnames=field_names)
- csv_writer.writerow(cp_kwargs)
- else:
- with open(chosen_job, 'a') as fd:
- fd.write(f"{log_str}\n")
- except PermissionError as e:
- logger.exception(str(e))
-
- logger.info(f"Written {kwargs['job']} worklog to {path}")
+ log_str, cp_kwargs, field_names = _prep_write(format=file_format, s_log=log_str,
+ p_settings=proj_settings, _dt=dt)
+ _write_file(p_save=savepath, p_backup=backuppath, format=file_format, j_upper=job_upper, len_file=len(parsed_file),
+ _cp_kwargs=[cp_kwargs], _log_str=[log_str], _field_names=field_names)
def combine_log(target_job, specified_ext, target_extension=None, savepath=None, backuppath=None, delete=False):
@@ -224,15 +250,41 @@ def combine_log(target_job, specified_ext, target_extension=None, savepath=None,
_entry = content[1].split(' ')
buffer[content[0]].append(_entry[0])
- # TODO: Finished sorting logs, now just need to write to target file and delete if needed.
try:
buffer.sort(key=lambda x: datetime.strptime(
- x[2], "%Y-%m-%dT%H:%M:%S%Z"))
+ x[2], TIME_FORMAT))
except ValueError as e:
logger.exception(f"An exception of type {type(e).__name__} occurred. "
f"Details: {str(e)}")
return None
+ csv_logs = []
+ if specified_ext == '.csv':
+ for log in buffer:
+ _log = {}
+ split_log = log[1].split(' ')
+ for index, param in enumerate(split_log):
+ if index == 0:
+ _log['timestamp'] = param
+ continue
+
+ split_param = param.split(':')
+ var, val = split_param[0], split_param[1] if len(
+ split_param) == 2 else None
+ if val is not None:
+ _log[var] = val
+
+ csv_logs.append(_log)
+
+ _write_file(p_save=savepath, p_backup=None, format=specified_ext, j_upper=target_job.upper(),
+ len_file=None, _cp_kwargs=csv_logs, _log_str=[x[1] for x in buffer],
+ _field_names=['timestamp', 'job', 'proj', 'loc', 'time', 'start', 'end', 'desc'])
+
+ if delete:
+ for file in files:
+ if file.suffix != specified_ext:
+ os.remove(file)
+
def send_email(sender=None, to=None, subject=None, files=None):
pass