Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions docs/config_reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -843,6 +843,43 @@ The following example concatenates fc_ofc_no + fc_no + "2025"
columns: [ 'fc_ofc_no', 'fc_no' ]
suffix: '2025'

**Conditional Formatting with `constant_condition`**

The `compare` function allows for flexible conditional formatting with the following operators:
- `==`, `!=`, `<`, `<=`, `>`, `>=`, `in`
The comparison can be performed for column `match_col` and applied values can be defined with `value`.

- src_col_name: 'MATCH_COL'
formatters:
- type: 'constant_condition'
format:
match_col: col2
compare: ['==', 'x2']
values: ['Yes', 'No']
Additionally, this can be combined commandline argument `--override_params` using `match_override` to decide when to skip formatting with `constant_condition`

formatters:
- type: 'constant'
format: 'Original'
- type: 'constant_condition'
format:
match_col: col2
compare: ['!=', 'x2']
values: ['True', 'False']
match_override: skip_col
pattern: "^Y$"
Here, `constant_condition` will be skipped if `skip_col` is present in `--override_params` with value `Y`.

**override**

The purpose of this formatter is to create new column with a constant value passed via commandline argument.

- src_col_name: 'CONST_OVR_COL'
formatters:
- type: 'override'
format: 'col_value'
Here, `col_value` is passed in `--override_params` commandline argument.

**Validations**

Field Name Type Description
Expand Down Expand Up @@ -1443,3 +1480,29 @@ $function_name(args)
uuid NA Generates random UUID using uuid4() inbuilt function $uuid()
*only available in file path names


**Command-line Overrides**

For passing value to override in config using interpolator, inject value to API url_params and perform conditional column operations, commandline overrides can be used.

- `--infile`: Can be either a single file path or key-value pairs in the format `source_id=file_path`
- Example: `--infile source1=data1.json source2=data2.json`
- When using with the `infile` interpolator, the source_id is used to look up the corresponding file path

- `--override_params`: Key-value pairs that can be used by interpolators and formatters
- Example: `--override_params env=prod region=us-west-1`
- These parameters can be accessed using the `override` interpolator in the config file

**Interpolators with command-line overrides**

New interpolators have been added to support dynamic values in the configuration:

1. `infile`: Retrieves a filename from the `--infile` argument
- If `--infile` is a single value: Returns the basename of the file
- If `--infile` contains key-value pairs: Uses the provided key to look up the corresponding file path
- Example: `{infile:source1}` would return `data1.json` when run with `--infile source1=data1.json`
- Infile is supported for sourceId, url_params->Ids and output keys.

2. `override`: Retrieves values from the `--override_params` argument
- Example: `{override:env}` would return `prod` when run with `--override_params env=prod`
- Returns an empty string if the key is not found
24 changes: 19 additions & 5 deletions ingen/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
from datetime import date

from ingen.metadata.metadata_parser import MetaDataParser
from ingen.utils.utils import KeyValue
from ingen.utils.utils import KeyValue, KeyValueOrString
from ingen.logger import init_logging

logger = logging.getLogger()


def main(
config_path, query_params, run_date, interfaces, infile=None, dynamic_data=None
config_path, query_params, run_date, interfaces, infile=None, dynamic_data=None, override_params=None
):
parser = MetaDataParser(
config_path, query_params, run_date, interfaces, infile, dynamic_data
config_path, query_params, run_date, interfaces, infile, dynamic_data, override_params
)
metadata_list = parser.parse_metadata()
run_config = parser.run_config
Expand Down Expand Up @@ -79,7 +79,16 @@ def create_arg_parser():
"all interfaces listed in the config file will be generated",
)
parser.add_argument(
"--infile", help="filepath to the JSON file to be used in JSON source"
"--infile",
nargs="*",
action=KeyValueOrString,
help="Either a single filepath override, or key=value pairs mapping source_id to filepath"
)
parser.add_argument(
"--override_params",
nargs="*",
action=KeyValue,
help="Key value pairs used by runtime overrides (interpolators/formatters)",
)
return parser

Expand All @@ -100,5 +109,10 @@ def process_json(config_path, dynamic_data):
f"Use poller $infile when source use_infile is turned on, overwrite source file_path: {args.infile}"
)
main(
args.config_path, args.query_params, args.run_date, args.interfaces, args.infile
args.config_path,
args.query_params,
args.run_date,
args.interfaces,
args.infile,
override_params=args.override_params
)
16 changes: 9 additions & 7 deletions ingen/data_source/api_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ class APISource(DataSource):
This class represents a API source
"""

def __init__(self, source, params_map=None, interpolator=Interpolator()):
def __init__(self, source, params_map=None):
"""
Loads a API source

Expand All @@ -30,7 +30,8 @@ def __init__(self, source, params_map=None, interpolator=Interpolator()):
super().__init__(source.get('id'))
if params_map is None:
params_map = {}
self._interpolator = interpolator
self.params_map = params_map
self._interpolator = Interpolator(params_map) if params_map else Interpolator()
self._url = source.get('url')
self._url_params = source.get('url_params')
self._batch = source.get('batch')
Expand Down Expand Up @@ -65,13 +66,14 @@ def fetch(self):

:return: A DataFrame created using the result of the request made to the API
"""
url_constructor = UrlConstructor(self._url, self._url_params, self._batch, self._run_date)
interpolated_url = self._interpolator.interpolate(self._url)
url_constructor = UrlConstructor(interpolated_url, self._url_params, self._batch, self._run_date, self.params_map)
urls = url_constructor.get_urls()
requests = [HTTPRequest(url=url,
method=self._method,
headers=self._headers,
auth=self._auth,
data=self._req_data) for url in urls]
method=self._method,
headers=self._headers,
auth=self._auth,
data=self._req_data) for url in urls]
url_reader = APIReader(requests, self.reader_params)
return self.fetch_data(url_reader)

Expand Down
9 changes: 7 additions & 2 deletions ingen/data_source/file_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,13 @@ def __init__(self, source, params_map, interpolator=Interpolator()):
"""
super().__init__(source.get('id'))
self.interpolator = interpolator
if params_map.get('infile') and source['use_infile']:
source['file_path'] = params_map['infile']
infile = params_map.get('infile') if params_map else None
if infile and source.get('use_infile'):
if isinstance(infile, dict):
if self.id in infile:
source['file_path'] = infile[self.id]
else:
source['file_path'] = infile
self._src = source
else:
self._src = self.format_file_path(source, params_map)
Expand Down
41 changes: 38 additions & 3 deletions ingen/formatters/common_formatters.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

pd.options.mode.chained_assignment = None
from ingen.utils.properties import Properties
from ingen.utils.utils import get_business_day
from ingen.utils.utils import get_business_day, compare

log = logging.getLogger()

Expand Down Expand Up @@ -614,6 +614,40 @@ def suffix_string_formatter(dataframe, col_name, config, runtime_params):
return dataframe


def constant_condition_formatter(dataframe, col_name, config, runtime_params):
if 'match_override' in config and 'pattern' in config:
override_params = runtime_params.get('override_params') if runtime_params else None
skip_key = config.get('match_override')
skip_value = None
if isinstance(override_params, dict):
skip_value = override_params.get(skip_key)
match = bool(re.match(str(config.get('pattern')), str(skip_value)))
if match:
return dataframe

compare_lst = config.get('compare')
match_col = config.get('match_col')
vals = config.get('values')
if not vals or len(vals) > 2:
raise ValueError("Values does not exist or have too many elements")

cond_results = compare(dataframe, match_col, compare_lst)
if len(vals) == 1:
if col_name in dataframe:
match_col = col_name
dataframe[col_name] = dataframe[match_col].mask(cond_results, vals[0])
if len(vals) == 2:
dataframe[col_name] = cond_results.map({True: vals[0], False: vals[1]})
return dataframe


def override_formatter(dataframe, col, ovr_key, runtime_params):
override_params = runtime_params.get('override_params') if runtime_params else None
if isinstance(override_params, dict):
dataframe[col] = override_params.get(ovr_key)
return dataframe


formatter_map = {
'date': date_formatter,
'float': float_formatter,
Expand Down Expand Up @@ -647,8 +681,9 @@ def suffix_string_formatter(dataframe, col_name, config, runtime_params):
'get_running_environment': get_running_environment,
'drop_duplicates': drop_duplicates,
'prefix_string': prefix_string_formatter,
'suffix_string': suffix_string_formatter

'suffix_string': suffix_string_formatter,
'constant_condition': constant_condition_formatter,
'override': override_formatter
}


Expand Down
2 changes: 2 additions & 0 deletions ingen/metadata/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ def _initialize_sources(self):
return sources

def _validate_path(self, props):
if self._infile and 'output' in self._infile:
props['path'] = self._infile['output']
path = props.get("path")
if path is None:
return
Expand Down
4 changes: 4 additions & 0 deletions ingen/metadata/metadata_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(
selected_interfaces,
infile=None,
dynamic_data=None,
override_params=None,
):
"""Initializes a metadata parser

Expand All @@ -34,6 +35,7 @@ def __init__(
selected_interfaces: List of interface names. This should be a subset of names declared in the metadata file
infile: A file path passed from command line to load a File Source
dynamic_data: JSON String passed from command line to load a JSON Source
override_params: Key Value pairs passed from CLI that is used to replace keys with values in config
"""
self._filepath = filepath
self._run_date = run_date
Expand All @@ -42,6 +44,7 @@ def __init__(
self._run_config = None
self._infile = infile
self._dynamic_data = dynamic_data
self._override_params = override_params

@property
def run_config(self):
Expand Down Expand Up @@ -77,6 +80,7 @@ def parse_metadata(self):
"query_params": self._query_params,
"run_date": self._run_date,
"infile": self._infile,
"override_params": self._override_params,
}

interface_configs = [
Expand Down
6 changes: 5 additions & 1 deletion ingen/reader/file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,16 @@ class CSVFileReader(Reader):
def read(self, src):
config = get_config(src)
dtype = src.get('dtype')
encoding = src.get('encoding', 'utf-8')
try:
result = pd.read_csv(src['file_path'],
sep=src.get('delimiter'),
index_col=False,
skiprows=config['header_size'],
skipfooter=config['trailer_size'],
names=config['all_cols'],
dtype=dtype)
dtype=dtype,
encoding=encoding)
except TypeError:
logging.error(self.DTYPE_LOG_MSG)
raise
Expand Down Expand Up @@ -80,11 +82,13 @@ def read(self, src):
dtype = src.get('dtype')
file_path = src.get('file_path')
colspecs = src.get('col_specification')
encoding = src.get('encoding', 'utf-8')
try:
result = pd.read_fwf(file_path,
index_col=False,
colspecs=colspecs,
dtype=dtype,
encoding=encoding,
skiprows=config['header_size'],
skipfooter=config['trailer_size'],
names=config['all_cols'])
Expand Down
3 changes: 2 additions & 1 deletion ingen/reader/json_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@
class JSONFileReader:

def read(self, src):
with open(src.get('file_path'), 'r') as res:
encoding = src.get('encoding', 'utf-8')
with open(src.get('file_path'), 'r', encoding=encoding) as res:
data = json.load(res)
df = pd.json_normalize(data, src.get('record_path'), src.get('meta'), src.get('meta_prefix'))
return df
3 changes: 2 additions & 1 deletion ingen/reader/xml_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
class XMLFileReader:

def read(self, src):
xml_file = open(src['file_path'], 'r')
encoding = src.get('encoding', 'utf-8')
xml_file = open(src['file_path'], 'r', encoding=encoding)
try:
data = xmltodict.parse(xml_file.read())
tree = et.parse(src['file_path'])
Expand Down
22 changes: 18 additions & 4 deletions ingen/utils/interpolators/common_interpolators.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,32 @@ def uuid_func(args=None, params=None):

def get_infile(args, params):
"""
return file name from command line parameters
return file path from command line parameters
"""
infile = params.get('infile')
if not infile:
return ''
if isinstance(infile, dict):
if args and args in infile:
return os.path.basename(infile[args])
return ''
return os.path.basename(infile)

file_name = os.path.basename(params['infile'])
return file_name

def get_overrides(args, params):
"""
return overrides from command line parameters
"""
override_params = params.get('override_params') if params else None
return override_params.get(args, '') if override_params else ''


COMMON_INTERPOLATORS = {
'token': Properties.get_property,
'token_secret': token_secret,
'timestamp': timestamp,
'uuid': uuid_func,
'infile': get_infile
'infile': get_infile,
'override': get_overrides

}
Loading