Skip to content
Snippets Groups Projects
Commit 438f68b8 authored by Bert Palm's avatar Bert Palm 🎇
Browse files

improved signatureMaker to be more readable

parent e44e2db7
No related branches found
No related tags found
3 merge requests!271Static expansion of regular expressions,!260Follow-Up Translations,!237Flagger Translations
import re
from typing import List
from saqc.funcs import *
from saqc.core.register import FUNC_MAP
import os
......@@ -7,7 +10,24 @@ def start_with_exactly_N_spaces(line: str, N: int):
return line.startswith(' ' * N) and not line.startswith(' ' * (N + 1))
def find_original_signature(fh, fname):
def find_original_signature(fh, func_name):
"""
Extract the signature code from a file.
Parameters
----------
fh : file
file descriptor, to read the code from
func_name : str
function name, of the signature in question
Returns
-------
lines: list or None
list of lines of code if found, otherwise None.
"""
sig = []
start = end = False
for line in fh.readlines():
......@@ -15,7 +35,7 @@ def find_original_signature(fh, fname):
# find start of signature
if not start:
if line.startswith(f'def {fname}'):
if line.startswith(f'def {func_name}'):
sig.append(line)
start = True
continue
......@@ -42,34 +62,52 @@ def find_original_signature(fh, fname):
return sig
def replace_core_signatures(readlines, writelines, fname, fh):
def replace_core_signatures(core_code: List[str], sig_code: List[str], func_name: str, target_file):
"""
Replace a signature in the core code with a signature from a module.
Parameters
----------
core_code : list
lines of code, one by one
sig_code : list
lines of code, one by one (only the signature in question)
func_name : str
function name in question
target_file : file
file descriptor to write the modified code to
"""
start = end = False
for line in readlines:
for line in core_code:
# append the rest of the file, the loop ends here
if end is True:
fh.write(line)
target_file.write(line)
continue
# find start of signature, loop starts here
if not start:
if line.startswith(f' def {fname}'):
if line.startswith(f' def {func_name}'):
start = True
# insert the replacement
for rline in writelines:
fh.write(' ')
fh.write(rline)
for rline in sig_code:
target_file.write(' ')
target_file.write(rline)
# start of sig, not found yet
else:
fh.write(line)
target_file.write(line)
continue
# found line after end of signature
if '"""' in line or start_with_exactly_N_spaces(line, 8):
end = True
fh.write(line)
target_file.write(line)
continue
# found last line of signature
......@@ -79,33 +117,31 @@ def replace_core_signatures(readlines, writelines, fname, fh):
def replace_datafieldflagger(lines):
import re
"""
Remove 'data' and 'flagger' from signature, and insert 'self' instead.
"""
empty = re.compile(' *\n')
data = re.compile('.*(data[=: ][^,]*, ?)') # eg. 'data: DictOfSeries,'
flagger = re.compile('.*(flagger[=: ][^,]*, ?)') # eg. 'flagger: Flagger,'
pattern = [data, flagger]
pattern_list = [data, flagger]
i = 0
replaced = []
for line in lines:
if 'copy' in line:
i = i
if i < len(pattern):
found = pattern[i].match(line)
if found:
match = found[1] # regex group
replacement = ''
if i == 0:
replacement = 'self, '
line = line.replace(match, replacement, 1)
i += 1
# find in same line
for j in range(i, len(pattern)):
found = pattern[i].match(line)
if found:
line = line.replace(found[1], '', 1)
i += 1
if i < len(pattern_list):
# search for one patter after the other in the current line,
# if any is NOT found, we stop and continued from there in the
# next line (next loop outer integration)
for j in range(i, len(pattern_list)):
found = pattern_list[i].match(line)
if found:
# we replace the first match ('data') with 'self'
line = line.replace(found[1], 'self, ' if i == 0 else '', 1)
i += 1 # next pattern please
else:
break
empty_line = empty.match(line)
if empty_line:
......@@ -116,11 +152,16 @@ def replace_datafieldflagger(lines):
return replaced
def autoreplace_signature():
def autoreplace_signatures():
"""
Replaces core-signatures with the module-signatures, one-by-one.
"""
postfix = '_autosignature'
saqc_path = 'saqc/core/modules/'
touched_modules = []
# one-by-one: we only process one signature at a time, this means
# that we see most files multiple times.
for name in FUNC_MAP:
module, fname = name.split('.')
......@@ -131,9 +172,13 @@ def autoreplace_signature():
warnings.warn(f"end of signature of '{fname}' not found - ignoring")
continue
# modify original function signature
lines = replace_datafieldflagger(lines)
print(''.join(lines))
# find the right file. If we already processed a signature
# of the same module, we already have a modified file, so we
# need to read and write the same.
readfile = f'{saqc_path}{module}.py'
writefile = f'{saqc_path}{module}{postfix}.py'
if module in touched_modules:
......@@ -141,12 +186,16 @@ def autoreplace_signature():
else:
touched_modules.append(module)
# READ
with open(readfile, 'r') as fh:
readlines = fh.readlines()
# WRITE
# replace sig's in Saqc.module to a temporary file
with open(writefile, 'w') as fh:
replace_core_signatures(readlines, lines, fname, fh)
# replace all original files with the temporary ones
files = os.listdir('saqc/core/modules/')
for new in files:
if postfix in new:
......@@ -154,6 +203,5 @@ def autoreplace_signature():
new = saqc_path + new
os.replace(new, old)
if __name__ == '__main__':
autoreplace_signature()
autoreplace_signatures()
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment