online source: formatting

This commit is contained in:
Frederik Jaeckel 2023-04-21 15:40:07 +02:00
parent 26192ea831
commit edfa0424e5

View file

@ -4,7 +4,10 @@
# full copyright notices and license terms. # full copyright notices and license terms.
from string import Template from string import Template
import requests, logging, html2text, re import requests
import logging
import html2text
import re
from datetime import datetime from datetime import datetime
from decimal import Decimal from decimal import Decimal
from trytond.model import ModelView, ModelSQL, fields from trytond.model import ModelView, ModelSQL, fields
@ -35,7 +38,8 @@ sel_rgxdatefmt = [
('%b %d %Y', 'mon dd yyyy'), ('%b %d %Y', 'mon dd yyyy'),
] ]
fields_check = ['url', 'nsin', 'isin', 'symbol', 'text', 'http_state', \ fields_check = [
'url', 'nsin', 'isin', 'symbol', 'text', 'http_state',
'fnddate', 'fndrate', 'fndident'] 'fnddate', 'fndrate', 'fndident']
@ -51,32 +55,40 @@ class OnlineSource(ModelSQL, ModelView):
__name__ = 'investment.source' __name__ = 'investment.source'
name = fields.Char(string='Name', required=True) name = fields.Char(string='Name', required=True)
query_method = fields.Selection(string='Method', required=True, query_method = fields.Selection(
string='Method', required=True,
help='Select the method to retrieve the data.', help='Select the method to retrieve the data.',
selection='get_query_methods') selection='get_query_methods')
url = fields.Char(string='URL', states=STATES_WEB, depends=DEPENDS_WEB) url = fields.Char(string='URL', states=STATES_WEB, depends=DEPENDS_WEB)
nohtml = fields.Boolean(string='Remove HTML', nohtml = fields.Boolean(
string='Remove HTML',
help='Removes HTML tags before the text is interpreted.', help='Removes HTML tags before the text is interpreted.',
states={ states={
'invisible': STATES_WEB['invisible'], 'invisible': STATES_WEB['invisible'],
}, depends=DEPENDS_WEB) }, depends=DEPENDS_WEB)
rgxdate = fields.Char(string='Date', rgxdate = fields.Char(
string='Date',
help='Regex code to find the date in the downloaded HTML file.', help='Regex code to find the date in the downloaded HTML file.',
states=STATES_WEB, depends=DEPENDS_WEB) states=STATES_WEB, depends=DEPENDS_WEB)
rgxdatefmt = fields.Selection(string='Date format', selection=sel_rgxdatefmt, rgxdatefmt = fields.Selection(
string='Date format', selection=sel_rgxdatefmt,
states=STATES_WEB, depends=DEPENDS_WEB) states=STATES_WEB, depends=DEPENDS_WEB)
rgxrate = fields.Char(string='Rate', rgxrate = fields.Char(
string='Rate',
help='Regex code to find the rate in the downloaded HTML file.', help='Regex code to find the rate in the downloaded HTML file.',
states=STATES_WEB, depends=DEPENDS_WEB) states=STATES_WEB, depends=DEPENDS_WEB)
rgxdecimal = fields.Selection(string='Decimal Separator', rgxdecimal = fields.Selection(
string='Decimal Separator',
help='Decimal separator for converting the market value into a number.', help='Decimal separator for converting the market value into a number.',
selection=sel_rgxdecimal, states=STATES_WEB, depends=DEPENDS_WEB) selection=sel_rgxdecimal, states=STATES_WEB, depends=DEPENDS_WEB)
rgxident = fields.Char(string='Identifier', rgxident = fields.Char(
string='Identifier',
help='Regex code to find the identifier in the downloaded HTML file.', help='Regex code to find the identifier in the downloaded HTML file.',
states={ states={
'invisible': STATES_WEB['invisible'], 'invisible': STATES_WEB['invisible'],
}, depends=DEPENDS_WEB) }, depends=DEPENDS_WEB)
rgxidtype = fields.Selection(string='ID-Type', selection=sel_rgxidtype, rgxidtype = fields.Selection(
string='ID-Type', selection=sel_rgxidtype,
help='Type of identifier used to validate the result.', help='Type of identifier used to validate the result.',
states={ states={
'required': Bool(Eval('rgxident', '')), 'required': Bool(Eval('rgxident', '')),
@ -84,27 +96,32 @@ class OnlineSource(ModelSQL, ModelView):
}, depends=DEPENDS_WEB+['rgxident']) }, depends=DEPENDS_WEB+['rgxident'])
# field to test requests # field to test requests
used_url = fields.Function(fields.Char(string='Used URL', readonly=True, used_url = fields.Function(fields.Char(
string='Used URL', readonly=True,
help='This URL is used to retrieve the HTML file.', help='This URL is used to retrieve the HTML file.',
states={'invisible': STATES_WEB['invisible']}, depends=DEPENDS_WEB), states={'invisible': STATES_WEB['invisible']}, depends=DEPENDS_WEB),
'on_change_with_used_url') 'on_change_with_used_url')
nsin = fields.Function(fields.Char(string='NSIN'), nsin = fields.Function(fields.Char(
'on_change_with_nsin', setter='set_test_value') string='NSIN'), 'on_change_with_nsin', setter='set_test_value')
isin = fields.Function(fields.Char(string='ISIN'), isin = fields.Function(fields.Char(
'on_change_with_isin', setter='set_test_value') string='ISIN'), 'on_change_with_isin', setter='set_test_value')
symbol = fields.Function(fields.Char(string='Symbol'), symbol = fields.Function(fields.Char(
'on_change_with_symbol', setter='set_test_value') string='Symbol'), 'on_change_with_symbol', setter='set_test_value')
http_state = fields.Function(fields.Char(string='HTTP-State', http_state = fields.Function(fields.Char(
string='HTTP-State',
readonly=True), 'on_change_with_http_state') readonly=True), 'on_change_with_http_state')
text = fields.Function(fields.Text(string='Result', text = fields.Function(fields.Text(
readonly=True), 'on_change_with_text') string='Result', readonly=True), 'on_change_with_text')
fnddate = fields.Function(fields.Date(string='Date', readonly=True, fnddate = fields.Function(fields.Date(
string='Date', readonly=True,
help='Date found during test query.'), help='Date found during test query.'),
'on_change_with_fnddate') 'on_change_with_fnddate')
fndrate = fields.Function(fields.Numeric(string='Rate', readonly=True, fndrate = fields.Function(fields.Numeric(
help='Rate found during test query.', digits=(16,4)), string='Rate', readonly=True,
help='Rate found during test query.', digits=(16, 4)),
'on_change_with_fndrate') 'on_change_with_fndrate')
fndident = fields.Function(fields.Char(string='Identifier', readonly=True, fndident = fields.Function(fields.Char(
string='Identifier', readonly=True,
help='Identifier found during test query.'), help='Identifier found during test query.'),
'on_change_with_fndident') 'on_change_with_fndident')
@ -209,9 +226,9 @@ class OnlineSource(ModelSQL, ModelView):
""" """
if self.url: if self.url:
return self.get_url_with_parameter( return self.get_url_with_parameter(
isin = self.isin, isin=self.isin,
nsin = self.nsin, nsin=self.nsin,
symbol = self.symbol, symbol=self.symbol,
) )
@classmethod @classmethod
@ -244,10 +261,10 @@ class OnlineSource(ModelSQL, ModelView):
if getattr(osource, 'query_method', None) == 'web': if getattr(osource, 'query_method', None) == 'web':
return OSourc.read_from_website( return OSourc.read_from_website(
osource, osource,
isin = isin, isin=isin,
nsin = nsin, nsin=nsin,
symbol = symbol, symbol=symbol,
debug = debug, debug=debug,
) )
def call_online_source(self): def call_online_source(self):
@ -256,7 +273,8 @@ class OnlineSource(ModelSQL, ModelView):
""" """
OSourc = Pool().get('investment.source') OSourc = Pool().get('investment.source')
result = OSourc.run_query_method(self, self.isin, self.nsin, result = OSourc.run_query_method(
self, self.isin, self.nsin,
self.symbol, debug=True) self.symbol, debug=True)
if result is not None: if result is not None:
self.text = result.get('text', None) self.text = result.get('text', None)
@ -289,9 +307,9 @@ class OnlineSource(ModelSQL, ModelView):
for updtsource in asset.updtsources: for updtsource in asset.updtsources:
rate_data = cls.run_query_method( rate_data = cls.run_query_method(
updtsource, updtsource,
isin = asset.isin, isin=asset.isin,
nsin = asset.wkn, nsin=asset.wkn,
symbol = asset.secsymb, symbol=asset.secsymb,
) )
if len(updtsource.rgxident or '') > 0: if len(updtsource.rgxident or '') > 0:
@ -306,7 +324,8 @@ class OnlineSource(ModelSQL, ModelView):
if (asset_code or '').lower() != code.lower(): if (asset_code or '').lower() != code.lower():
# fail # fail
logger.warning( logger.warning(
'update_rate: got wrong code "%(wrong)s" - expected "%(exp)s"' % { 'update_rate: got wrong code ' +
'"%(wrong)s" - expected "%(exp)s"' % {
'exp': asset_code, 'exp': asset_code,
'wrong': code, 'wrong': code,
}) })
@ -322,11 +341,10 @@ class OnlineSource(ModelSQL, ModelView):
# check if exists # check if exists
if Rate.search_count([ if Rate.search_count([
('asset.id', '=', asset.id), ('asset.id', '=', asset.id),
('date', '=', to_create['date']), ('date', '=', to_create['date'])]) == 0:
]) == 0:
Rate.create([to_create]) Rate.create([to_create])
return True return True
else : else:
# if we got a record for today - stop # if we got a record for today - stop
# otherwise try next source # otherwise try next source
if to_create['date'] == IrDate.today(): if to_create['date'] == IrDate.today():
@ -344,7 +362,7 @@ class OnlineSource(ModelSQL, ModelView):
if search_result is None: if search_result is None:
return None return None
try : try:
result = search_result.group(1) result = search_result.group(1)
except IndexError: except IndexError:
result = search_result.group(0) result = search_result.group(0)
@ -353,20 +371,23 @@ class OnlineSource(ModelSQL, ModelView):
dec_sep = [',', '.'] dec_sep = [',', '.']
dec_sep.remove(self.rgxdecimal) dec_sep.remove(self.rgxdecimal)
result = result.replace(dec_sep[0], '').replace(self.rgxdecimal, '.') result = result.replace(
try : dec_sep[0], '').replace(self.rgxdecimal, '.')
try:
result = Decimal(result) result = Decimal(result)
except : except Exception:
result = None result = None
elif field_name == 'rgxdate': elif field_name == 'rgxdate':
try : try:
result = datetime.strptime(result, self.rgxdatefmt).date() result = datetime.strptime(result, self.rgxdatefmt).date()
except : except Exception:
result = None result = None
return result return result
@classmethod @classmethod
def read_from_website(cls, updtsource, isin=None, nsin=None, symbol=None, debug=False): def read_from_website(
cls, updtsource, isin=None, nsin=None,
symbol=None, debug=False):
""" read from url, extract values """ read from url, extract values
""" """
result = {} result = {}
@ -377,9 +398,9 @@ class OnlineSource(ModelSQL, ModelView):
res1 = requests.get( res1 = requests.get(
updtsource.get_url_with_parameter( updtsource.get_url_with_parameter(
isin = isin, isin=isin,
nsin = nsin, nsin=nsin,
symbol = symbol, symbol=symbol,
), ),
allow_redirects=True, allow_redirects=True,
timeout=5.0) timeout=5.0)
@ -409,8 +430,10 @@ class OnlineSource(ModelSQL, ModelView):
result['rate'] = updtsource.get_regex_result(html, 'rgxrate') result['rate'] = updtsource.get_regex_result(html, 'rgxrate')
result['date'] = updtsource.get_regex_result(html, 'rgxdate') result['date'] = updtsource.get_regex_result(html, 'rgxdate')
result['code'] = updtsource.get_regex_result(html, 'rgxident') result['code'] = updtsource.get_regex_result(html, 'rgxident')
else : else:
logger.error('read_from_website: %(code)s, url: %(url)s, redirects: [%(redirects)s]' % { logger.error(
'read_from_website: ' +
'%(code)s, url: %(url)s, redirects: [%(redirects)s]' % {
'code': res1.status_code, 'code': res1.status_code,
'url': res1.url, 'url': res1.url,
'redirects': ', '.join([x.url for x in res1.history]), 'redirects': ', '.join([x.url for x in res1.history]),