abnosql.table
1from abc import ABCMeta # type: ignore 2from abc import abstractmethod 3import os 4import re 5import typing as t 6 7import pluggy # type: ignore 8 9import abnosql.exceptions as ex 10from abnosql import plugin 11 12hookimpl = pluggy.HookimplMarker('abnosql.table') 13hookspec = pluggy.HookspecMarker('abnosql.table') 14 15 16class TableSpecs(plugin.PluginSpec): 17 18 @hookspec(firstresult=True) 19 def set_config(self, table: str) -> t.Dict: # type: ignore[empty-body] # noqa E501 20 """Hook to set config 21 22 Args: 23 table: table name 24 25 Returns: 26 dictionary containing config 27 28 """ 29 pass 30 31 @hookspec(firstresult=True) 32 def get_item_post(self, table: str, item: t.Dict) -> t.Dict: # type: ignore[empty-body] # noqa E501 33 """Hook invoked after get_item() 34 35 Args: 36 table: table name 37 item: dictionary item retrieved from get_item call 38 39 Returns: 40 dictionary containing updated item 41 42 """ 43 pass 44 45 @hookspec 46 def put_item_post(self, table: str, item: t.Dict) -> None: # type: ignore[empty-body] # noqa E501 47 """Hook invoked after put_item() 48 49 Args: 50 table: table name 51 item: dictionary containing partition and range/sort key 52 53 """ 54 pass 55 56 @hookspec 57 def put_items_post(self, table: str, items: t.List[t.Dict]) -> None: # type: ignore[empty-body] # noqa E501 58 """Hook invoked after put_items() 59 60 Args: 61 table: table name 62 item: list of dictionary items written to table 63 64 """ 65 pass 66 67 @hookspec 68 def delete_item_post(self, table: str, key: t.Dict) -> None: # type: ignore[empty-body] # noqa E501 69 """Hook invoked after delete_item() 70 71 Args: 72 table: table name 73 key: dictionary of item written to table 74 75 """ 76 pass 77 78 79class TableBase(metaclass=ABCMeta): 80 @abstractmethod 81 def __init__( 82 self, pm: plugin.PM, name: str, config: t.Optional[dict] = None 83 ) -> None: 84 """Instantiate table object 85 86 Args: 87 pm: pluggy plugin manager 88 name: table name 89 config: optional config dict dict 90 """ 91 pass 92 93 @abstractmethod 94 def get_item(self, **kwargs) -> t.Dict: 95 """Get table/collection item 96 97 Args: 98 partition key and range/sort key (if used) 99 100 Returns: 101 item dictionary or None if not found 102 103 """ 104 pass 105 106 @abstractmethod 107 def put_item(self, item: t.Dict): 108 """Puts table/collection item 109 110 Args: 111 item: dictionary 112 113 """ 114 pass 115 116 @abstractmethod 117 def put_items(self, items: t.Iterable[t.Dict]): 118 """Puts multiple table/collection items 119 120 Args: 121 items: list of item dictionaries 122 123 """ 124 pass 125 126 @abstractmethod 127 def delete_item(self, **kwargs): 128 """Deletes table/collection item 129 130 Args: 131 partition key and range/sort key (if used) 132 133 """ 134 pass 135 136 @abstractmethod 137 def query( 138 self, 139 key: t.Dict[str, t.Any], 140 filters: t.Optional[t.Dict[str, t.Any]] = None, 141 limit: t.Optional[int] = None, 142 next: t.Optional[str] = None 143 ) -> t.Dict[str, t.Any]: 144 """Perform key based query with optional exact match filters 145 146 Args: 147 key: dictionary containing partition key and range/sort key 148 filters: optional dictionary of key=value to query and filter on 149 limit: query limit 150 next: pagination token 151 152 Returns: 153 dictionary containing 'items' and 'next' pagination token 154 155 """ 156 pass 157 158 @abstractmethod 159 def query_sql( 160 self, 161 statement: str, 162 parameters: t.Optional[t.Dict[str, t.Any]] = None, 163 limit: t.Optional[int] = None, 164 next: t.Optional[str] = None 165 ) -> t.Dict[str, t.Any]: 166 """Perform key based query with optional exact match filters 167 168 Args: 169 statement: SQL statement to query table 170 parameters: optional dictionary containing @key = value placeholders 171 limit: query limit 172 next: pagination token 173 174 Returns: 175 dictionary containing 'items' and 'next' pagination token 176 177 """ 178 pass 179 180 181def get_sql_params( 182 statement: str, 183 parameters: t.Dict[str, t.Any], 184 param_val: t.Callable, 185 replace: t.Optional[str] = None 186) -> t.Tuple[str, t.List]: 187 # convert @variable to dynamodb ? placeholders 188 validate_statement(statement) 189 vars = list(re.findall(r'\@[a-zA-Z0-9_.-]+', statement)) 190 params = [] 191 _missing = {} 192 for var in vars: 193 if var not in parameters: 194 _missing[var] = True 195 else: 196 val = parameters[var] 197 params.append(param_val(var, val)) 198 for var in parameters.keys(): 199 if var not in vars: 200 _missing[var] = True 201 missing = sorted(_missing.keys()) 202 if len(missing): 203 raise ex.ValidationException( 204 'missing parameters: ' + ', '.join(missing) 205 ) 206 if isinstance(replace, str): 207 for var in parameters.keys(): 208 statement = statement.replace(var, replace) 209 return (statement, params) 210 211 212def quote_str(str): 213 return "'" + str.translate( 214 str.maketrans({ 215 "'": "\\'" 216 }) 217 ) + "'" 218 219 220def validate_query_attrs(key: t.Dict, filters: t.Dict): 221 _name_pat = re.compile(r'^[a-zA-Z09_-]+$') 222 223 def _validate_key_names(obj): 224 return [_ for _ in obj.keys() if not _name_pat.match(_)] 225 226 invalid = sorted(set( 227 _validate_key_names(key) + _validate_key_names(filters) 228 )) 229 if len(invalid): 230 raise ex.ValidationException( 231 'invalid key or filter keys: ' + ', '.join(invalid) 232 ) 233 234 235def validate_statement(statement: str): 236 # sqlglot can do this (and sqlparse), but lets keep it simple 237 tokens = [_.strip() for _ in statement.split(' ') if _.strip() != ''] 238 if len(tokens) == 0 or tokens[0].upper() != 'SELECT': 239 raise ex.ValidationException('statement must start with SELECT') 240 241 242def table( 243 name: str, 244 config: t.Optional[dict] = None, 245 database: t.Optional[str] = None 246) -> TableBase: 247 if database is None: 248 database = os.environ.get('ABNOSQL_DB') 249 pm = plugin.get_pm('table') 250 module = pm.get_plugin(database) 251 if module is None: 252 raise ex.PluginException(f'table.{database} plugin not found') 253 return module.Table(pm, name, config)
17class TableSpecs(plugin.PluginSpec): 18 19 @hookspec(firstresult=True) 20 def set_config(self, table: str) -> t.Dict: # type: ignore[empty-body] # noqa E501 21 """Hook to set config 22 23 Args: 24 table: table name 25 26 Returns: 27 dictionary containing config 28 29 """ 30 pass 31 32 @hookspec(firstresult=True) 33 def get_item_post(self, table: str, item: t.Dict) -> t.Dict: # type: ignore[empty-body] # noqa E501 34 """Hook invoked after get_item() 35 36 Args: 37 table: table name 38 item: dictionary item retrieved from get_item call 39 40 Returns: 41 dictionary containing updated item 42 43 """ 44 pass 45 46 @hookspec 47 def put_item_post(self, table: str, item: t.Dict) -> None: # type: ignore[empty-body] # noqa E501 48 """Hook invoked after put_item() 49 50 Args: 51 table: table name 52 item: dictionary containing partition and range/sort key 53 54 """ 55 pass 56 57 @hookspec 58 def put_items_post(self, table: str, items: t.List[t.Dict]) -> None: # type: ignore[empty-body] # noqa E501 59 """Hook invoked after put_items() 60 61 Args: 62 table: table name 63 item: list of dictionary items written to table 64 65 """ 66 pass 67 68 @hookspec 69 def delete_item_post(self, table: str, key: t.Dict) -> None: # type: ignore[empty-body] # noqa E501 70 """Hook invoked after delete_item() 71 72 Args: 73 table: table name 74 key: dictionary of item written to table 75 76 """ 77 pass
19 @hookspec(firstresult=True) 20 def set_config(self, table: str) -> t.Dict: # type: ignore[empty-body] # noqa E501 21 """Hook to set config 22 23 Args: 24 table: table name 25 26 Returns: 27 dictionary containing config 28 29 """ 30 pass
Hook to set config
Args: table: table name
Returns: dictionary containing config
32 @hookspec(firstresult=True) 33 def get_item_post(self, table: str, item: t.Dict) -> t.Dict: # type: ignore[empty-body] # noqa E501 34 """Hook invoked after get_item() 35 36 Args: 37 table: table name 38 item: dictionary item retrieved from get_item call 39 40 Returns: 41 dictionary containing updated item 42 43 """ 44 pass
Hook invoked after get_item()
Args: table: table name item: dictionary item retrieved from get_item call
Returns: dictionary containing updated item
46 @hookspec 47 def put_item_post(self, table: str, item: t.Dict) -> None: # type: ignore[empty-body] # noqa E501 48 """Hook invoked after put_item() 49 50 Args: 51 table: table name 52 item: dictionary containing partition and range/sort key 53 54 """ 55 pass
Hook invoked after put_item()
Args: table: table name item: dictionary containing partition and range/sort key
57 @hookspec 58 def put_items_post(self, table: str, items: t.List[t.Dict]) -> None: # type: ignore[empty-body] # noqa E501 59 """Hook invoked after put_items() 60 61 Args: 62 table: table name 63 item: list of dictionary items written to table 64 65 """ 66 pass
Hook invoked after put_items()
Args: table: table name item: list of dictionary items written to table
68 @hookspec 69 def delete_item_post(self, table: str, key: t.Dict) -> None: # type: ignore[empty-body] # noqa E501 70 """Hook invoked after delete_item() 71 72 Args: 73 table: table name 74 key: dictionary of item written to table 75 76 """ 77 pass
Hook invoked after delete_item()
Args: table: table name key: dictionary of item written to table
80class TableBase(metaclass=ABCMeta): 81 @abstractmethod 82 def __init__( 83 self, pm: plugin.PM, name: str, config: t.Optional[dict] = None 84 ) -> None: 85 """Instantiate table object 86 87 Args: 88 pm: pluggy plugin manager 89 name: table name 90 config: optional config dict dict 91 """ 92 pass 93 94 @abstractmethod 95 def get_item(self, **kwargs) -> t.Dict: 96 """Get table/collection item 97 98 Args: 99 partition key and range/sort key (if used) 100 101 Returns: 102 item dictionary or None if not found 103 104 """ 105 pass 106 107 @abstractmethod 108 def put_item(self, item: t.Dict): 109 """Puts table/collection item 110 111 Args: 112 item: dictionary 113 114 """ 115 pass 116 117 @abstractmethod 118 def put_items(self, items: t.Iterable[t.Dict]): 119 """Puts multiple table/collection items 120 121 Args: 122 items: list of item dictionaries 123 124 """ 125 pass 126 127 @abstractmethod 128 def delete_item(self, **kwargs): 129 """Deletes table/collection item 130 131 Args: 132 partition key and range/sort key (if used) 133 134 """ 135 pass 136 137 @abstractmethod 138 def query( 139 self, 140 key: t.Dict[str, t.Any], 141 filters: t.Optional[t.Dict[str, t.Any]] = None, 142 limit: t.Optional[int] = None, 143 next: t.Optional[str] = None 144 ) -> t.Dict[str, t.Any]: 145 """Perform key based query with optional exact match filters 146 147 Args: 148 key: dictionary containing partition key and range/sort key 149 filters: optional dictionary of key=value to query and filter on 150 limit: query limit 151 next: pagination token 152 153 Returns: 154 dictionary containing 'items' and 'next' pagination token 155 156 """ 157 pass 158 159 @abstractmethod 160 def query_sql( 161 self, 162 statement: str, 163 parameters: t.Optional[t.Dict[str, t.Any]] = None, 164 limit: t.Optional[int] = None, 165 next: t.Optional[str] = None 166 ) -> t.Dict[str, t.Any]: 167 """Perform key based query with optional exact match filters 168 169 Args: 170 statement: SQL statement to query table 171 parameters: optional dictionary containing @key = value placeholders 172 limit: query limit 173 next: pagination token 174 175 Returns: 176 dictionary containing 'items' and 'next' pagination token 177 178 """ 179 pass
81 @abstractmethod 82 def __init__( 83 self, pm: plugin.PM, name: str, config: t.Optional[dict] = None 84 ) -> None: 85 """Instantiate table object 86 87 Args: 88 pm: pluggy plugin manager 89 name: table name 90 config: optional config dict dict 91 """ 92 pass
Instantiate table object
Args: pm: pluggy plugin manager name: table name config: optional config dict dict
94 @abstractmethod 95 def get_item(self, **kwargs) -> t.Dict: 96 """Get table/collection item 97 98 Args: 99 partition key and range/sort key (if used) 100 101 Returns: 102 item dictionary or None if not found 103 104 """ 105 pass
Get table/collection item
Args: partition key and range/sort key (if used)
Returns: item dictionary or None if not found
107 @abstractmethod 108 def put_item(self, item: t.Dict): 109 """Puts table/collection item 110 111 Args: 112 item: dictionary 113 114 """ 115 pass
Puts table/collection item
Args: item: dictionary
117 @abstractmethod 118 def put_items(self, items: t.Iterable[t.Dict]): 119 """Puts multiple table/collection items 120 121 Args: 122 items: list of item dictionaries 123 124 """ 125 pass
Puts multiple table/collection items
Args: items: list of item dictionaries
127 @abstractmethod 128 def delete_item(self, **kwargs): 129 """Deletes table/collection item 130 131 Args: 132 partition key and range/sort key (if used) 133 134 """ 135 pass
Deletes table/collection item
Args: partition key and range/sort key (if used)
137 @abstractmethod 138 def query( 139 self, 140 key: t.Dict[str, t.Any], 141 filters: t.Optional[t.Dict[str, t.Any]] = None, 142 limit: t.Optional[int] = None, 143 next: t.Optional[str] = None 144 ) -> t.Dict[str, t.Any]: 145 """Perform key based query with optional exact match filters 146 147 Args: 148 key: dictionary containing partition key and range/sort key 149 filters: optional dictionary of key=value to query and filter on 150 limit: query limit 151 next: pagination token 152 153 Returns: 154 dictionary containing 'items' and 'next' pagination token 155 156 """ 157 pass
Perform key based query with optional exact match filters
Args: key: dictionary containing partition key and range/sort key filters: optional dictionary of key=value to query and filter on limit: query limit next: pagination token
Returns: dictionary containing 'items' and 'next' pagination token
159 @abstractmethod 160 def query_sql( 161 self, 162 statement: str, 163 parameters: t.Optional[t.Dict[str, t.Any]] = None, 164 limit: t.Optional[int] = None, 165 next: t.Optional[str] = None 166 ) -> t.Dict[str, t.Any]: 167 """Perform key based query with optional exact match filters 168 169 Args: 170 statement: SQL statement to query table 171 parameters: optional dictionary containing @key = value placeholders 172 limit: query limit 173 next: pagination token 174 175 Returns: 176 dictionary containing 'items' and 'next' pagination token 177 178 """ 179 pass
Perform key based query with optional exact match filters
Args: statement: SQL statement to query table parameters: optional dictionary containing @key = value placeholders limit: query limit next: pagination token
Returns: dictionary containing 'items' and 'next' pagination token
182def get_sql_params( 183 statement: str, 184 parameters: t.Dict[str, t.Any], 185 param_val: t.Callable, 186 replace: t.Optional[str] = None 187) -> t.Tuple[str, t.List]: 188 # convert @variable to dynamodb ? placeholders 189 validate_statement(statement) 190 vars = list(re.findall(r'\@[a-zA-Z0-9_.-]+', statement)) 191 params = [] 192 _missing = {} 193 for var in vars: 194 if var not in parameters: 195 _missing[var] = True 196 else: 197 val = parameters[var] 198 params.append(param_val(var, val)) 199 for var in parameters.keys(): 200 if var not in vars: 201 _missing[var] = True 202 missing = sorted(_missing.keys()) 203 if len(missing): 204 raise ex.ValidationException( 205 'missing parameters: ' + ', '.join(missing) 206 ) 207 if isinstance(replace, str): 208 for var in parameters.keys(): 209 statement = statement.replace(var, replace) 210 return (statement, params)
221def validate_query_attrs(key: t.Dict, filters: t.Dict): 222 _name_pat = re.compile(r'^[a-zA-Z09_-]+$') 223 224 def _validate_key_names(obj): 225 return [_ for _ in obj.keys() if not _name_pat.match(_)] 226 227 invalid = sorted(set( 228 _validate_key_names(key) + _validate_key_names(filters) 229 )) 230 if len(invalid): 231 raise ex.ValidationException( 232 'invalid key or filter keys: ' + ', '.join(invalid) 233 )
236def validate_statement(statement: str): 237 # sqlglot can do this (and sqlparse), but lets keep it simple 238 tokens = [_.strip() for _ in statement.split(' ') if _.strip() != ''] 239 if len(tokens) == 0 or tokens[0].upper() != 'SELECT': 240 raise ex.ValidationException('statement must start with SELECT')
243def table( 244 name: str, 245 config: t.Optional[dict] = None, 246 database: t.Optional[str] = None 247) -> TableBase: 248 if database is None: 249 database = os.environ.get('ABNOSQL_DB') 250 pm = plugin.get_pm('table') 251 module = pm.get_plugin(database) 252 if module is None: 253 raise ex.PluginException(f'table.{database} plugin not found') 254 return module.Table(pm, name, config)