abnosql.table

  1from abc import ABCMeta  # type: ignore
  2from abc import abstractmethod
  3import os
  4import re
  5import typing as t
  6
  7import pluggy  # type: ignore
  8
  9import abnosql.exceptions as ex
 10from abnosql import plugin
 11
 12hookimpl = pluggy.HookimplMarker('abnosql.table')
 13hookspec = pluggy.HookspecMarker('abnosql.table')
 14
 15
 16class TableSpecs(plugin.PluginSpec):
 17
 18    @hookspec(firstresult=True)
 19    def set_config(self, table: str) -> t.Dict:  # type: ignore[empty-body] # noqa E501
 20        """Hook to set config
 21
 22        Args:
 23            table: table name
 24
 25        Returns:
 26            dictionary containing config
 27
 28        """
 29        pass
 30
 31    @hookspec(firstresult=True)
 32    def get_item_post(self, table: str, item: t.Dict) -> t.Dict:  # type: ignore[empty-body] # noqa E501
 33        """Hook invoked after get_item()
 34
 35        Args:
 36            table: table name
 37            item: dictionary item retrieved from get_item call
 38
 39        Returns:
 40            dictionary containing updated item
 41
 42        """
 43        pass
 44
 45    @hookspec
 46    def put_item_post(self, table: str, item: t.Dict) -> None:  # type: ignore[empty-body] # noqa E501
 47        """Hook invoked after put_item()
 48
 49        Args:
 50            table: table name
 51            item: dictionary containing partition and range/sort key
 52
 53        """
 54        pass
 55
 56    @hookspec
 57    def put_items_post(self, table: str, items: t.List[t.Dict]) -> None:  # type: ignore[empty-body] # noqa E501
 58        """Hook invoked after put_items()
 59
 60        Args:
 61            table: table name
 62            item: list of dictionary items written to table
 63
 64        """
 65        pass
 66
 67    @hookspec
 68    def delete_item_post(self, table: str, key: t.Dict) -> None:  # type: ignore[empty-body] # noqa E501
 69        """Hook invoked after delete_item()
 70
 71        Args:
 72            table: table name
 73            key: dictionary of item written to table
 74
 75        """
 76        pass
 77
 78
 79class TableBase(metaclass=ABCMeta):
 80    @abstractmethod
 81    def __init__(
 82        self, pm: plugin.PM, name: str, config: t.Optional[dict] = None
 83    ) -> None:
 84        """Instantiate table object
 85
 86        Args:
 87            pm: pluggy plugin manager
 88            name: table name
 89            config: optional config dict dict
 90        """
 91        pass
 92
 93    @abstractmethod
 94    def get_item(self, **kwargs) -> t.Dict:
 95        """Get table/collection item
 96
 97        Args:
 98            partition key and range/sort key (if used)
 99
100        Returns:
101            item dictionary or None if not found
102
103        """
104        pass
105
106    @abstractmethod
107    def put_item(self, item: t.Dict):
108        """Puts table/collection item
109
110        Args:
111            item: dictionary
112
113        """
114        pass
115
116    @abstractmethod
117    def put_items(self, items: t.Iterable[t.Dict]):
118        """Puts multiple table/collection items
119
120        Args:
121            items: list of item dictionaries
122
123        """
124        pass
125
126    @abstractmethod
127    def delete_item(self, **kwargs):
128        """Deletes table/collection item
129
130        Args:
131            partition key and range/sort key (if used)
132
133        """
134        pass
135
136    @abstractmethod
137    def query(
138        self,
139        key: t.Dict[str, t.Any],
140        filters: t.Optional[t.Dict[str, t.Any]] = None,
141        limit: t.Optional[int] = None,
142        next: t.Optional[str] = None
143    ) -> t.Dict[str, t.Any]:
144        """Perform key based query with optional exact match filters
145
146        Args:
147            key: dictionary containing partition key and range/sort key
148            filters: optional dictionary of key=value to query and filter on
149            limit: query limit
150            next: pagination token
151
152        Returns:
153            dictionary containing 'items' and 'next' pagination token
154
155        """
156        pass
157
158    @abstractmethod
159    def query_sql(
160        self,
161        statement: str,
162        parameters: t.Optional[t.Dict[str, t.Any]] = None,
163        limit: t.Optional[int] = None,
164        next: t.Optional[str] = None
165    ) -> t.Dict[str, t.Any]:
166        """Perform key based query with optional exact match filters
167
168        Args:
169            statement: SQL statement to query table
170            parameters: optional dictionary containing @key = value placeholders
171            limit: query limit
172            next: pagination token
173
174        Returns:
175            dictionary containing 'items' and 'next' pagination token
176
177        """
178        pass
179
180
181def get_sql_params(
182    statement: str,
183    parameters: t.Dict[str, t.Any],
184    param_val: t.Callable,
185    replace: t.Optional[str] = None
186) -> t.Tuple[str, t.List]:
187    # convert @variable to dynamodb ? placeholders
188    validate_statement(statement)
189    vars = list(re.findall(r'\@[a-zA-Z0-9_.-]+', statement))
190    params = []
191    _missing = {}
192    for var in vars:
193        if var not in parameters:
194            _missing[var] = True
195        else:
196            val = parameters[var]
197            params.append(param_val(var, val))
198    for var in parameters.keys():
199        if var not in vars:
200            _missing[var] = True
201    missing = sorted(_missing.keys())
202    if len(missing):
203        raise ex.ValidationException(
204            'missing parameters: ' + ', '.join(missing)
205        )
206    if isinstance(replace, str):
207        for var in parameters.keys():
208            statement = statement.replace(var, replace)
209    return (statement, params)
210
211
212def quote_str(str):
213    return "'" + str.translate(
214        str.maketrans({
215            "'": "\\'"
216        })
217    ) + "'"
218
219
220def validate_query_attrs(key: t.Dict, filters: t.Dict):
221    _name_pat = re.compile(r'^[a-zA-Z09_-]+$')
222
223    def _validate_key_names(obj):
224        return [_ for _ in obj.keys() if not _name_pat.match(_)]
225
226    invalid = sorted(set(
227        _validate_key_names(key) + _validate_key_names(filters)
228    ))
229    if len(invalid):
230        raise ex.ValidationException(
231            'invalid key or filter keys: ' + ', '.join(invalid)
232        )
233
234
235def validate_statement(statement: str):
236    # sqlglot can do this (and sqlparse), but lets keep it simple
237    tokens = [_.strip() for _ in statement.split(' ') if _.strip() != '']
238    if len(tokens) == 0 or tokens[0].upper() != 'SELECT':
239        raise ex.ValidationException('statement must start with SELECT')
240
241
242def table(
243    name: str,
244    config: t.Optional[dict] = None,
245    database: t.Optional[str] = None
246) -> TableBase:
247    if database is None:
248        database = os.environ.get('ABNOSQL_DB')
249    pm = plugin.get_pm('table')
250    module = pm.get_plugin(database)
251    if module is None:
252        raise ex.PluginException(f'table.{database} plugin not found')
253    return module.Table(pm, name, config)
hookimpl = <pluggy._hooks.HookimplMarker object>
hookspec = <pluggy._hooks.HookspecMarker object>
class TableSpecs(abnosql.plugin.PluginSpec):
17class TableSpecs(plugin.PluginSpec):
18
19    @hookspec(firstresult=True)
20    def set_config(self, table: str) -> t.Dict:  # type: ignore[empty-body] # noqa E501
21        """Hook to set config
22
23        Args:
24            table: table name
25
26        Returns:
27            dictionary containing config
28
29        """
30        pass
31
32    @hookspec(firstresult=True)
33    def get_item_post(self, table: str, item: t.Dict) -> t.Dict:  # type: ignore[empty-body] # noqa E501
34        """Hook invoked after get_item()
35
36        Args:
37            table: table name
38            item: dictionary item retrieved from get_item call
39
40        Returns:
41            dictionary containing updated item
42
43        """
44        pass
45
46    @hookspec
47    def put_item_post(self, table: str, item: t.Dict) -> None:  # type: ignore[empty-body] # noqa E501
48        """Hook invoked after put_item()
49
50        Args:
51            table: table name
52            item: dictionary containing partition and range/sort key
53
54        """
55        pass
56
57    @hookspec
58    def put_items_post(self, table: str, items: t.List[t.Dict]) -> None:  # type: ignore[empty-body] # noqa E501
59        """Hook invoked after put_items()
60
61        Args:
62            table: table name
63            item: list of dictionary items written to table
64
65        """
66        pass
67
68    @hookspec
69    def delete_item_post(self, table: str, key: t.Dict) -> None:  # type: ignore[empty-body] # noqa E501
70        """Hook invoked after delete_item()
71
72        Args:
73            table: table name
74            key: dictionary of item written to table
75
76        """
77        pass
@hookspec(firstresult=True)
def set_config(self, table: str) -> Dict:
19    @hookspec(firstresult=True)
20    def set_config(self, table: str) -> t.Dict:  # type: ignore[empty-body] # noqa E501
21        """Hook to set config
22
23        Args:
24            table: table name
25
26        Returns:
27            dictionary containing config
28
29        """
30        pass

Hook to set config

Args: table: table name

Returns: dictionary containing config

@hookspec(firstresult=True)
def get_item_post(self, table: str, item: Dict) -> Dict:
32    @hookspec(firstresult=True)
33    def get_item_post(self, table: str, item: t.Dict) -> t.Dict:  # type: ignore[empty-body] # noqa E501
34        """Hook invoked after get_item()
35
36        Args:
37            table: table name
38            item: dictionary item retrieved from get_item call
39
40        Returns:
41            dictionary containing updated item
42
43        """
44        pass

Hook invoked after get_item()

Args: table: table name item: dictionary item retrieved from get_item call

Returns: dictionary containing updated item

@hookspec
def put_item_post(self, table: str, item: Dict) -> None:
46    @hookspec
47    def put_item_post(self, table: str, item: t.Dict) -> None:  # type: ignore[empty-body] # noqa E501
48        """Hook invoked after put_item()
49
50        Args:
51            table: table name
52            item: dictionary containing partition and range/sort key
53
54        """
55        pass

Hook invoked after put_item()

Args: table: table name item: dictionary containing partition and range/sort key

@hookspec
def put_items_post(self, table: str, items: List[Dict]) -> None:
57    @hookspec
58    def put_items_post(self, table: str, items: t.List[t.Dict]) -> None:  # type: ignore[empty-body] # noqa E501
59        """Hook invoked after put_items()
60
61        Args:
62            table: table name
63            item: list of dictionary items written to table
64
65        """
66        pass

Hook invoked after put_items()

Args: table: table name item: list of dictionary items written to table

@hookspec
def delete_item_post(self, table: str, key: Dict) -> None:
68    @hookspec
69    def delete_item_post(self, table: str, key: t.Dict) -> None:  # type: ignore[empty-body] # noqa E501
70        """Hook invoked after delete_item()
71
72        Args:
73            table: table name
74            key: dictionary of item written to table
75
76        """
77        pass

Hook invoked after delete_item()

Args: table: table name key: dictionary of item written to table

class TableBase:
 80class TableBase(metaclass=ABCMeta):
 81    @abstractmethod
 82    def __init__(
 83        self, pm: plugin.PM, name: str, config: t.Optional[dict] = None
 84    ) -> None:
 85        """Instantiate table object
 86
 87        Args:
 88            pm: pluggy plugin manager
 89            name: table name
 90            config: optional config dict dict
 91        """
 92        pass
 93
 94    @abstractmethod
 95    def get_item(self, **kwargs) -> t.Dict:
 96        """Get table/collection item
 97
 98        Args:
 99            partition key and range/sort key (if used)
100
101        Returns:
102            item dictionary or None if not found
103
104        """
105        pass
106
107    @abstractmethod
108    def put_item(self, item: t.Dict):
109        """Puts table/collection item
110
111        Args:
112            item: dictionary
113
114        """
115        pass
116
117    @abstractmethod
118    def put_items(self, items: t.Iterable[t.Dict]):
119        """Puts multiple table/collection items
120
121        Args:
122            items: list of item dictionaries
123
124        """
125        pass
126
127    @abstractmethod
128    def delete_item(self, **kwargs):
129        """Deletes table/collection item
130
131        Args:
132            partition key and range/sort key (if used)
133
134        """
135        pass
136
137    @abstractmethod
138    def query(
139        self,
140        key: t.Dict[str, t.Any],
141        filters: t.Optional[t.Dict[str, t.Any]] = None,
142        limit: t.Optional[int] = None,
143        next: t.Optional[str] = None
144    ) -> t.Dict[str, t.Any]:
145        """Perform key based query with optional exact match filters
146
147        Args:
148            key: dictionary containing partition key and range/sort key
149            filters: optional dictionary of key=value to query and filter on
150            limit: query limit
151            next: pagination token
152
153        Returns:
154            dictionary containing 'items' and 'next' pagination token
155
156        """
157        pass
158
159    @abstractmethod
160    def query_sql(
161        self,
162        statement: str,
163        parameters: t.Optional[t.Dict[str, t.Any]] = None,
164        limit: t.Optional[int] = None,
165        next: t.Optional[str] = None
166    ) -> t.Dict[str, t.Any]:
167        """Perform key based query with optional exact match filters
168
169        Args:
170            statement: SQL statement to query table
171            parameters: optional dictionary containing @key = value placeholders
172            limit: query limit
173            next: pagination token
174
175        Returns:
176            dictionary containing 'items' and 'next' pagination token
177
178        """
179        pass
@abstractmethod
TableBase( pm: abnosql.plugin.PM, name: str, config: Union[dict, NoneType] = None)
81    @abstractmethod
82    def __init__(
83        self, pm: plugin.PM, name: str, config: t.Optional[dict] = None
84    ) -> None:
85        """Instantiate table object
86
87        Args:
88            pm: pluggy plugin manager
89            name: table name
90            config: optional config dict dict
91        """
92        pass

Instantiate table object

Args: pm: pluggy plugin manager name: table name config: optional config dict dict

@abstractmethod
def get_item(self, **kwargs) -> Dict:
 94    @abstractmethod
 95    def get_item(self, **kwargs) -> t.Dict:
 96        """Get table/collection item
 97
 98        Args:
 99            partition key and range/sort key (if used)
100
101        Returns:
102            item dictionary or None if not found
103
104        """
105        pass

Get table/collection item

Args: partition key and range/sort key (if used)

Returns: item dictionary or None if not found

@abstractmethod
def put_item(self, item: Dict):
107    @abstractmethod
108    def put_item(self, item: t.Dict):
109        """Puts table/collection item
110
111        Args:
112            item: dictionary
113
114        """
115        pass

Puts table/collection item

Args: item: dictionary

@abstractmethod
def put_items(self, items: Iterable[Dict]):
117    @abstractmethod
118    def put_items(self, items: t.Iterable[t.Dict]):
119        """Puts multiple table/collection items
120
121        Args:
122            items: list of item dictionaries
123
124        """
125        pass

Puts multiple table/collection items

Args: items: list of item dictionaries

@abstractmethod
def delete_item(self, **kwargs):
127    @abstractmethod
128    def delete_item(self, **kwargs):
129        """Deletes table/collection item
130
131        Args:
132            partition key and range/sort key (if used)
133
134        """
135        pass

Deletes table/collection item

Args: partition key and range/sort key (if used)

@abstractmethod
def query( self, key: Dict[str, Any], filters: Union[Dict[str, Any], NoneType] = None, limit: Union[int, NoneType] = None, next: Union[str, NoneType] = None) -> Dict[str, Any]:
137    @abstractmethod
138    def query(
139        self,
140        key: t.Dict[str, t.Any],
141        filters: t.Optional[t.Dict[str, t.Any]] = None,
142        limit: t.Optional[int] = None,
143        next: t.Optional[str] = None
144    ) -> t.Dict[str, t.Any]:
145        """Perform key based query with optional exact match filters
146
147        Args:
148            key: dictionary containing partition key and range/sort key
149            filters: optional dictionary of key=value to query and filter on
150            limit: query limit
151            next: pagination token
152
153        Returns:
154            dictionary containing 'items' and 'next' pagination token
155
156        """
157        pass

Perform key based query with optional exact match filters

Args: key: dictionary containing partition key and range/sort key filters: optional dictionary of key=value to query and filter on limit: query limit next: pagination token

Returns: dictionary containing 'items' and 'next' pagination token

@abstractmethod
def query_sql( self, statement: str, parameters: Union[Dict[str, Any], NoneType] = None, limit: Union[int, NoneType] = None, next: Union[str, NoneType] = None) -> Dict[str, Any]:
159    @abstractmethod
160    def query_sql(
161        self,
162        statement: str,
163        parameters: t.Optional[t.Dict[str, t.Any]] = None,
164        limit: t.Optional[int] = None,
165        next: t.Optional[str] = None
166    ) -> t.Dict[str, t.Any]:
167        """Perform key based query with optional exact match filters
168
169        Args:
170            statement: SQL statement to query table
171            parameters: optional dictionary containing @key = value placeholders
172            limit: query limit
173            next: pagination token
174
175        Returns:
176            dictionary containing 'items' and 'next' pagination token
177
178        """
179        pass

Perform key based query with optional exact match filters

Args: statement: SQL statement to query table parameters: optional dictionary containing @key = value placeholders limit: query limit next: pagination token

Returns: dictionary containing 'items' and 'next' pagination token

def get_sql_params( statement: str, parameters: Dict[str, Any], param_val: Callable, replace: Union[str, NoneType] = None) -> Tuple[str, List]:
182def get_sql_params(
183    statement: str,
184    parameters: t.Dict[str, t.Any],
185    param_val: t.Callable,
186    replace: t.Optional[str] = None
187) -> t.Tuple[str, t.List]:
188    # convert @variable to dynamodb ? placeholders
189    validate_statement(statement)
190    vars = list(re.findall(r'\@[a-zA-Z0-9_.-]+', statement))
191    params = []
192    _missing = {}
193    for var in vars:
194        if var not in parameters:
195            _missing[var] = True
196        else:
197            val = parameters[var]
198            params.append(param_val(var, val))
199    for var in parameters.keys():
200        if var not in vars:
201            _missing[var] = True
202    missing = sorted(_missing.keys())
203    if len(missing):
204        raise ex.ValidationException(
205            'missing parameters: ' + ', '.join(missing)
206        )
207    if isinstance(replace, str):
208        for var in parameters.keys():
209            statement = statement.replace(var, replace)
210    return (statement, params)
def quote_str(str):
213def quote_str(str):
214    return "'" + str.translate(
215        str.maketrans({
216            "'": "\\'"
217        })
218    ) + "'"
def validate_query_attrs(key: Dict, filters: Dict):
221def validate_query_attrs(key: t.Dict, filters: t.Dict):
222    _name_pat = re.compile(r'^[a-zA-Z09_-]+$')
223
224    def _validate_key_names(obj):
225        return [_ for _ in obj.keys() if not _name_pat.match(_)]
226
227    invalid = sorted(set(
228        _validate_key_names(key) + _validate_key_names(filters)
229    ))
230    if len(invalid):
231        raise ex.ValidationException(
232            'invalid key or filter keys: ' + ', '.join(invalid)
233        )
def validate_statement(statement: str):
236def validate_statement(statement: str):
237    # sqlglot can do this (and sqlparse), but lets keep it simple
238    tokens = [_.strip() for _ in statement.split(' ') if _.strip() != '']
239    if len(tokens) == 0 or tokens[0].upper() != 'SELECT':
240        raise ex.ValidationException('statement must start with SELECT')
def table( name: str, config: Union[dict, NoneType] = None, database: Union[str, NoneType] = None) -> abnosql.table.TableBase:
243def table(
244    name: str,
245    config: t.Optional[dict] = None,
246    database: t.Optional[str] = None
247) -> TableBase:
248    if database is None:
249        database = os.environ.get('ABNOSQL_DB')
250    pm = plugin.get_pm('table')
251    module = pm.get_plugin(database)
252    if module is None:
253        raise ex.PluginException(f'table.{database} plugin not found')
254    return module.Table(pm, name, config)