diff options
| author | Wayne-Cole <77279425+Wacky404@users.noreply.github.com> | 2024-12-23 06:47:14 -0600 |
|---|---|---|
| committer | Wayne-Cole <77279425+Wacky404@users.noreply.github.com> | 2024-12-23 06:47:14 -0600 |
| commit | f515c567586822ee1927fba3549d75ca8c76b3fe (patch) | |
| tree | ea92fafe20c5ada6625daf3068b58343ab9e0a06 /src/funcs_worklogger.py | |
| parent | fc89c79cf88c47a19a6162aa1d311b26d8376adb (diff) | |
| download | worklogger-f515c567586822ee1927fba3549d75ca8c76b3fe.tar.xz worklogger-f515c567586822ee1927fba3549d75ca8c76b3fe.zip | |
feat: merge function finished; ironing out bugs and adding in tests
Diffstat (limited to 'src/funcs_worklogger.py')
| -rw-r--r-- | src/funcs_worklogger.py | 234 |
1 files changed, 143 insertions, 91 deletions
diff --git a/src/funcs_worklogger.py b/src/funcs_worklogger.py index b58efa4..4388ad4 100644 --- a/src/funcs_worklogger.py +++ b/src/funcs_worklogger.py @@ -6,14 +6,115 @@ author: Wacky404 email: wacky404@dev.com """ -from pprint import pprint from log_util_worklogger import logger from datetime import timezone, datetime from pathlib import Path +import paths_util_worklogger as pu +import os.path as osp import csv import os -import os.path as osp -import paths_util_worklogger as pu + + +TIME_FORMAT = "%Y-%m-%dT%H:%M:%S%z" + + +def _prep_write(format, s_log, p_settings, _dt, k_args): + dt = _dt + if format == '.txt' or format == None: + for key, val in k_args.items(): + if key == 'job' and val is not None: + s_log += f"{key}:'{val}'" + elif key == 'time' and val is not None: + s_log += f"{key}:{val}hrs " + elif key == 'message' and val is not None: + s_log += f"{'desc'}:'{val}' " + elif key == 'proj' and val is not None and p_settings is not None: + job_projects = p_settings[str(k_args['job']).upper()] + if str(val).upper() in job_projects.keys(): + s_log += f"{key}:{job_projects[str(val).upper()]} " + logger.debug( + f"Job: {k_args['job']}; Projects: {job_projects}") + else: + s_log += f"{key}:{val} " + logger.debug( + f"Couldn't get specified projects for {k_args['job']} in config.") + elif key == 'start' and val == 'now': + s_log += f"{key}:{dt.strftime("'%H:%M'")} " + elif key == 'start' and val is not None: + s_log += f"{key}:'{val}' " + elif key == 'end' and val == 'now': + s_log += f"{key}:{dt.strftime("'%H:%M'")} " + elif key == 'end' and val is not None: + s_log += f"{key}:'{val}' " + elif val is not None: + s_log += f"{key}:{val} " + + return s_log, None, None + + elif format == '.csv': + cp_kwargs = { + 'timestamp': f"{dt.strftime(TIME_FORMAT)}" + } + for key, val in k_args.items(): + if key == 'message': + cp_kwargs['desc'] = val + continue + elif key == 'proj' and val is not None and p_settings is not None: + job_projects = p_settings[str( + k_args['job']).upper()] + if str(val).upper() in job_projects.keys(): + cp_kwargs[key] = job_projects[str(val).upper()] + logger.debug( + f"Job: {k_args['job']}; Projects: {job_projects}") + else: + logger.debug( + f"Couldn't get specified projects for {k_args['job']} in config.") + continue + elif key == 'start' and val == 'now': + cp_kwargs['start'] = f"{dt.strftime("'%H:%M'")}" + continue + elif key == 'end' and val == 'now': + cp_kwargs['end'] = f"{dt.strftime("'%H:%M'")}" + continue + + cp_kwargs[key] = val + + field_names = [x for x in cp_kwargs.keys()] + + return None, cp_kwargs, field_names + + return None, None, None + + +def _write_file(p_save, p_backup, format, j_upper, len_file, _cp_kwargs, _log_str, _field_names) -> None: + for path in [p_save, p_backup]: + if path is not None: + if format != None: + chosen_job = osp.join(path, f"{j_upper}{format}") + else: + chosen_job = osp.join(path, f"{j_upper}.txt") + try: + if format == '.csv' and len_file == 0 or len_file == None: + with open(chosen_job, 'w', newline='\n') as fd: + csv_writer = csv.DictWriter( + fd, fieldnames=_field_names) + csv_writer.writeheader() + for _kwarg in _cp_kwargs: + csv_writer.writerow(_kwarg) + elif format == '.csv' and len_file >= 1: + with open(chosen_job, 'a', newline='\n') as fd: + csv_writer = csv.DictWriter( + fd, fieldnames=_field_names) + for _kwarg in _cp_kwargs: + csv_writer.writerow(_kwarg) + else: + with open(chosen_job, 'a') as fd: + for _log in _log_str: + fd.write(f"{_log}\n") + except PermissionError as e: + logger.exception(str(e)) + + logger.info(f"Written {j_upper} worklog(s) to {path}") def configure(dir_list=None): @@ -34,7 +135,7 @@ def configure(dir_list=None): f"Details: This is okay, output will save in existing {directory}.") -def parse(filepath=None): +def parse(filepath): filename, ext = osp.splitext(osp.basename(filepath)) logger.debug(f"Parsed: {filename}, {ext}") if ext == '.txt': @@ -51,12 +152,13 @@ def parse(filepath=None): for key, val in row.items(): if key == 'timestamp': csv_log += f"{val} " + elif key == 'job': + csv_log += f"{key}:'{val}' " elif val: csv_log += f"{key}:{val} " lines.append(csv_log) - print(lines) return lines # TODO: Make the parser for json, once I figure out json structure elif ext == '.json': @@ -65,7 +167,7 @@ def parse(filepath=None): def add_log(file_format=None, proj_settings=None, savepath=None, backuppath=None, **kwargs): dt = datetime.datetime.now(timezone.utc) - log_str = f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")} " + log_str = f"{dt.strftime(TIME_FORMAT)} " job_upper = str(kwargs['job']).upper() if file_format != None: @@ -97,7 +199,7 @@ def add_log(file_format=None, proj_settings=None, savepath=None, backuppath=None insert_time = input("End Time: ") with open(chosen_job, 'a') as fd: if file_format == '.txt': - if insert_time == 'now': + if insert_time.lower() == 'now': fd.write( f"{log_str}job:{kwargs['job']} end:{dt.strftime("'%H:%M'")}\n") else: @@ -108,95 +210,19 @@ def add_log(file_format=None, proj_settings=None, savepath=None, backuppath=None fd, fieldnames=['timestamp'] + [x if x != 'message' else 'desc' for x in kwargs.keys()]) if insert_time == 'now': csv_writer.writerow( - {'timestamp': f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")}", 'job': kwargs['job'], 'end': f"{dt.strftime("'%H:%M'")}"}) + {'timestamp': f"{dt.strftime(TIME_FORMAT)}", 'job': kwargs['job'], 'end': f"{dt.strftime("'%H:%M'")}"}) else: csv_writer.writerow( - {'timestamp': f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")}", 'job': kwargs['job'], 'end': f"{insert_time}"}) + {'timestamp': f"{dt.strftime(TIME_FORMAT)}", 'job': kwargs['job'], 'end': f"{insert_time}"}) break elif add_endtime == 'n': break - if file_format == '.txt' or file_format == None: - for key, val in kwargs.items(): - if key == 'time' and val is not None: - log_str += f"{key}:{val}hrs " - elif key == 'message' and val is not None: - log_str += f"{'desc'}:'{val}' " - elif key == 'proj' and val is not None and proj_settings is not None: - job_projects: dict[str, str] = proj_settings[str( - kwargs['job']).upper()] - if str(val).upper() in job_projects.keys(): - log_str += f"{key}:{job_projects[str(val).upper()]} " - logger.debug( - f"Job: {kwargs['job']}; Projects: {job_projects}") - else: - log_str += f"{key}:{val} " - logger.debug( - f"Couldn't get specified projects for {kwargs['job']} in config.") - elif key == 'start' and val == 'now': - log_str += f"{key}:{dt.strftime("'%H:%M'")} " - elif key == 'start' and val is not None: - log_str += f"{key}:'{val}' " - elif key == 'end' and val == 'now': - log_str += f"{key}:{dt.strftime("'%H:%M'")} " - elif key == 'end' and val is not None: - log_str += f"{key}:'{val}' " - elif val is not None: - log_str += f"{key}:{val} " - elif file_format == '.csv': - cp_kwargs = kwargs - cp_kwargs = { - 'timestamp': f"{dt.strftime("%Y-%m-%dT%H:%M:%S%Z")}" - } - for key, val in kwargs.items(): - if key == 'message': - cp_kwargs['desc'] = val - continue - elif key == 'proj' and val is not None and proj_settings is not None: - job_projects: dict[str, str] = proj_settings[str( - kwargs['job']).upper()] - if str(val).upper() in job_projects.keys(): - cp_kwargs[key] = job_projects[str(val).upper()] - logger.debug( - f"Job: {kwargs['job']}; Projects: {job_projects}") - else: - logger.debug( - f"Couldn't get specified projects for {kwargs['job']} in config.") - continue - elif key == 'start' and val == 'now': - cp_kwargs['start'] = f"{dt.strftime("'%H:%M'")}" - continue - elif key == 'end' and val == 'now': - cp_kwargs['end'] = f"{dt.strftime("'%H:%M'")}" - continue - - cp_kwargs[key] = val - - field_names = [x for x in cp_kwargs.keys()] - - for path in [savepath, backuppath]: - if file_format != None: - chosen_job = osp.join(path, f"{job_upper}{file_format}") - else: - chosen_job = osp.join(path, f"{job_upper}.txt") - try: - if file_format == '.csv' and len(parsed_file) == 0: - with open(chosen_job, 'w', newline='\n') as fd: - csv_writer = csv.DictWriter(fd, fieldnames=field_names) - csv_writer.writeheader() - csv_writer.writerow(cp_kwargs) - elif file_format == '.csv' and len(parsed_file) >= 1: - with open(chosen_job, 'a', newline='\n') as fd: - csv_writer = csv.DictWriter(fd, fieldnames=field_names) - csv_writer.writerow(cp_kwargs) - else: - with open(chosen_job, 'a') as fd: - fd.write(f"{log_str}\n") - except PermissionError as e: - logger.exception(str(e)) - - logger.info(f"Written {kwargs['job']} worklog to {path}") + log_str, cp_kwargs, field_names = _prep_write(format=file_format, s_log=log_str, + p_settings=proj_settings, _dt=dt) + _write_file(p_save=savepath, p_backup=backuppath, format=file_format, j_upper=job_upper, len_file=len(parsed_file), + _cp_kwargs=[cp_kwargs], _log_str=[log_str], _field_names=field_names) def combine_log(target_job, specified_ext, target_extension=None, savepath=None, backuppath=None, delete=False): @@ -224,15 +250,41 @@ def combine_log(target_job, specified_ext, target_extension=None, savepath=None, _entry = content[1].split(' ') buffer[content[0]].append(_entry[0]) - # TODO: Finished sorting logs, now just need to write to target file and delete if needed. try: buffer.sort(key=lambda x: datetime.strptime( - x[2], "%Y-%m-%dT%H:%M:%S%Z")) + x[2], TIME_FORMAT)) except ValueError as e: logger.exception(f"An exception of type {type(e).__name__} occurred. " f"Details: {str(e)}") return None + csv_logs = [] + if specified_ext == '.csv': + for log in buffer: + _log = {} + split_log = log[1].split(' ') + for index, param in enumerate(split_log): + if index == 0: + _log['timestamp'] = param + continue + + split_param = param.split(':') + var, val = split_param[0], split_param[1] if len( + split_param) == 2 else None + if val is not None: + _log[var] = val + + csv_logs.append(_log) + + _write_file(p_save=savepath, p_backup=None, format=specified_ext, j_upper=target_job.upper(), + len_file=None, _cp_kwargs=csv_logs, _log_str=[x[1] for x in buffer], + _field_names=['timestamp', 'job', 'proj', 'loc', 'time', 'start', 'end', 'desc']) + + if delete: + for file in files: + if file.suffix != specified_ext: + os.remove(file) + def send_email(sender=None, to=None, subject=None, files=None): pass |
