investment/onlinesource.py

364 lines
11 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
# This file is part of the investment-module from m-ds for Tryton.
# The COPYRIGHT file at the top level of this repository contains the
# full copyright notices and license terms.
2022-11-20 19:40:51 +00:00
from string import Template
import requests, logging, html2text, re
from datetime import datetime
from decimal import Decimal
from trytond.model import ModelView, ModelSQL, fields, Unique, Check
from trytond.transaction import Transaction
from trytond.pool import Pool
from trytond.pyson import Eval, Bool
2022-11-18 23:21:52 +00:00
logger = logging.getLogger(__name__)
sel_rgxdecimal = [
('.', '.'),
(',', ','),
]
sel_rgxidtype = [
('isin', 'ISIN'),
('nsin', 'NSIN'),
('symbol', 'Symbol'),
]
sel_rgxdatefmt = [
('%d.%m.%Y', 'dd.mm.yyyy'),
('%m/%d/%Y', 'mm/dd/yyyy'),
('%Y-%m-%d', 'yyyy-mm-dd'),
('%b %d %Y', 'mon dd yyyy'),
]
fields_check = ['url', 'nsin', 'isin', 'symbol', 'text', 'http_state', \
'fnddate', 'fndrate', 'fndident']
class OnlineSource(ModelSQL, ModelView):
'Online Source'
__name__ = 'investment.source'
2022-11-18 23:21:52 +00:00
name = fields.Char(string='Name', required=True)
url = fields.Char(string='URL', required=True)
nohtml = fields.Boolean(string='Remove HTML',
help='Removes HTML tags before the text is interpreted.')
2022-11-20 19:40:51 +00:00
rgxdate = fields.Char(string='Date', required=True,
help='Regex code to find the date in the downloaded HTML file.')
rgxdatefmt = fields.Selection(string='Date format', required=True,
selection=sel_rgxdatefmt)
2022-11-20 19:40:51 +00:00
rgxrate = fields.Char(string='Rate', required=True,
2022-11-21 15:23:26 +00:00
help='Regex code to find the rate in the downloaded HTML file.')
rgxdecimal = fields.Selection(string='Decimal Separator', required=True,
help='Decimal separator for converting the market value into a number.',
selection=sel_rgxdecimal)
rgxident = fields.Char(string='Identifier',
help='Regex code to find the identifier in the downloaded HTML file.')
rgxidtype = fields.Selection(string='ID-Type', selection=sel_rgxidtype,
help='Type of identifier used to validate the result.',
states={
'required': Bool(Eval('rgxident', '')),
}, depends=['rgxident'])
2022-11-20 19:40:51 +00:00
2022-11-18 23:21:52 +00:00
# field to test requests
2022-11-20 19:40:51 +00:00
used_url = fields.Function(fields.Char(string='Used URL', readonly=True,
help='This URL is used to retrieve the HTML file.'),
'on_change_with_used_url')
2022-11-18 23:21:52 +00:00
nsin = fields.Function(fields.Char(string='NSIN'),
'on_change_with_nsin', setter='set_test_value')
isin = fields.Function(fields.Char(string='ISIN'),
'on_change_with_isin', setter='set_test_value')
symbol = fields.Function(fields.Char(string='Symbol'),
'on_change_with_symbol', setter='set_test_value')
http_state = fields.Function(fields.Char(string='HTTP-State',
readonly=True), 'on_change_with_http_state')
2022-11-18 23:21:52 +00:00
text = fields.Function(fields.Text(string='Result',
readonly=True), 'on_change_with_text')
fnddate = fields.Function(fields.Date(string='Date', readonly=True,
help='Date found during test query.'),
'on_change_with_fnddate')
fndrate = fields.Function(fields.Numeric(string='Rate', readonly=True,
help='Rate found during test query.', digits=(16,4)),
'on_change_with_fndrate')
fndident = fields.Function(fields.Char(string='Identifier', readonly=True,
help='Identifier found during test query.'),
'on_change_with_fndident')
2022-11-18 23:21:52 +00:00
@classmethod
def default_url(cls):
""" defaul-url
"""
return 'https://'
2022-11-20 19:40:51 +00:00
@classmethod
def default_rgxdate(cls):
""" code to find date: dd.mm.yyyy
2022-11-18 23:21:52 +00:00
"""
2022-11-21 15:23:26 +00:00
return '(\\d{2}\\.\\d{2}\\.\\d{4})'
2022-11-18 23:21:52 +00:00
@classmethod
def default_rgxdatefmt(cls):
""" dd.mm.yyyy
"""
return '%d.%m.%Y'
@classmethod
def default_rgxrate(cls):
""" nn,nn
"""
return '(\\d+,\\d+)'
@classmethod
def default_rgxidtype(cls):
""" isin
"""
return 'isin'
@classmethod
def default_rgxdecimal(cls):
""" comma
"""
return ','
2022-11-18 23:21:52 +00:00
@classmethod
def default_nohtml(cls):
""" default: True
"""
return True
@fields.depends(*fields_check)
2022-11-18 23:21:52 +00:00
def on_change_nsin(self):
""" run request
"""
self.call_online_source()
@fields.depends(*fields_check)
2022-11-18 23:21:52 +00:00
def on_change_isin(self):
""" run request
"""
self.call_online_source()
@fields.depends(*fields_check)
2022-11-18 23:21:52 +00:00
def on_change_symbol(self):
""" run request
"""
self.call_online_source()
def on_change_with_fnddate(self, name=None):
return None
def on_change_with_fndrate(self, name=None):
return None
def on_change_with_fndident(self, name=None):
return ''
def on_change_with_http_state(self, name=True):
return ''
2022-11-18 23:21:52 +00:00
def on_change_with_text(self, name=None):
return ''
def on_change_with_nsin(self, name=None):
return ''
def on_change_with_isin(self, name=None):
return ''
def on_change_with_symbol(self, name=None):
return ''
2022-11-20 19:40:51 +00:00
@fields.depends('url', 'isin', 'nsin', 'symbol')
def on_change_with_used_url(self, name=None):
""" get url for testing
"""
if self.url:
return self.get_url_with_parameter(
isin = self.isin,
nsin = self.nsin,
symbol = self.symbol,
)
2022-11-18 23:21:52 +00:00
@classmethod
def set_test_value(cls, record, name, value):
""" dont store it
"""
pass
2022-11-21 15:23:26 +00:00
def call_online_source(self):
""" use updated values to call online-source,
for testing parameters
"""
OSourc = Pool().get('investment.source')
result = OSourc.read_from_website(
self,
isin = self.isin,
nsin = self.nsin,
symbol = self.symbol,
debug = True,
2022-11-21 15:23:26 +00:00
)
self.text = result.get('text', None)
self.http_state = result.get('http_state', None)
self.fnddate = result.get('date', None)
self.fndrate = result.get('rate', None)
self.fndident = result.get('code', None)
2022-11-21 15:23:26 +00:00
def get_url_with_parameter(self, isin=None, nsin=None, symbol=None):
""" generate url
"""
if self.url:
return Template(self.url).substitute({
'isin': isin if isin is not None else '',
'nsin': nsin if nsin is not None else '',
'symbol': symbol if symbol is not None else '',
})
2022-11-21 15:23:26 +00:00
2022-11-18 23:21:52 +00:00
@classmethod
def update_rate(cls, asset):
2022-11-21 15:23:26 +00:00
""" read data from inet, write result to rates of asset
2022-11-18 23:21:52 +00:00
"""
2022-11-22 21:43:28 +00:00
pool = Pool()
Rate = pool.get('investment.rate')
2022-11-29 20:54:27 +00:00
IrDate = pool.get('ir.date')
2022-11-22 21:43:28 +00:00
2022-11-29 20:54:27 +00:00
if len(asset.updtsources) == 0:
2022-11-18 23:21:52 +00:00
return
2022-11-22 21:43:28 +00:00
2022-11-29 20:54:27 +00:00
for updtsource in asset.updtsources:
rate_data = cls.read_from_website(
updtsource,
isin = asset.isin,
nsin = asset.wkn,
symbol = asset.secsymb,
)
if len(updtsource.rgxident or '') > 0:
# check result - same code?
code = rate_data.get('code', None)
if code:
asset_code = getattr(asset, {
'isin': 'isin',
'nsin': 'wkn',
'symbol': 'secsymb',
}[updtsource.rgxidtype])
if (asset_code or '').lower() != code.lower():
# fail
logger.warning(
'update_rate: got wrong code "%(wrong)s" - expected "%(exp)s"' % {
'exp': asset_code,
'wrong': code,
})
continue
to_create = {
'date': rate_data.get('date', None),
'rate': rate_data.get('rate', None),
'asset': asset.id,
}
if (to_create['date'] is not None) and \
(to_create['rate'] is not None):
# check if exists
if Rate.search_count([
('asset.id', '=', asset.id),
('date', '=', to_create['date']),
]) == 0:
Rate.create([to_create])
return True
else :
# if we got a record for today - stop
# otherwise try next source
if to_create['date'] == IrDate.today():
break
2022-11-22 21:43:28 +00:00
return False
2022-11-18 23:21:52 +00:00
def get_regex_result(self, html_text, field_name):
""" run regex on html-text, convert result
2022-11-18 23:21:52 +00:00
"""
rgxcode = getattr(self, field_name) or ''
2022-11-20 19:40:51 +00:00
if len(rgxcode) == 0:
return None
search_result = re.compile(rgxcode).search(html_text)
if search_result is None:
2022-11-20 19:40:51 +00:00
return None
try :
result = search_result.group(1)
except IndexError:
result = search_result.group(0)
if field_name == 'rgxrate':
dec_sep = [',', '.']
dec_sep.remove(self.rgxdecimal)
result = result.replace(dec_sep[0], '').replace(self.rgxdecimal, '.')
try :
result = Decimal(result)
except :
result = None
elif field_name == 'rgxdate':
try :
result = datetime.strptime(result, self.rgxdatefmt).date()
except :
result = None
2022-11-20 19:40:51 +00:00
return result
2022-11-18 23:21:52 +00:00
@classmethod
2022-11-22 21:43:28 +00:00
def read_from_website(cls, updtsource, isin=None, nsin=None, symbol=None, debug=False):
2022-11-18 23:21:52 +00:00
""" read from url, extract values
"""
result = {}
if updtsource.url == 'https://':
result['text'] = 'invalid url'
return result
2022-11-20 19:40:51 +00:00
res1 = requests.get(
2022-11-21 08:32:57 +00:00
updtsource.get_url_with_parameter(
isin = isin,
nsin = nsin,
symbol = symbol,
),
2022-11-20 19:40:51 +00:00
allow_redirects=True,
timeout=5.0)
result['http_state'] = '%(code)d: %(msg)s' % {
'code': res1.status_code,
'msg': res1.reason,
}
2022-11-21 15:23:26 +00:00
if res1.status_code in [200, 204]:
html = res1.text
2022-11-18 23:21:52 +00:00
2022-11-21 15:23:26 +00:00
# remove html-tags
2022-11-18 23:21:52 +00:00
if updtsource.nohtml:
o1 = html2text.HTML2Text()
o1.ignore_links = True
o1.ignore_tables = True
o1.bypass_tables = False
o1.single_line_break = True
o1.body_width = 0
2022-11-18 23:21:52 +00:00
html = o1.handle(html)
del o1
if debug:
result['text'] = html
2022-11-20 19:40:51 +00:00
result['rate'] = updtsource.get_regex_result(html, 'rgxrate')
result['date'] = updtsource.get_regex_result(html, 'rgxdate')
result['code'] = updtsource.get_regex_result(html, 'rgxident')
2022-11-18 23:21:52 +00:00
else :
2022-11-21 15:23:26 +00:00
logger.error('read_from_website: %(code)s, url: %(url)s, redirects: [%(redirects)s]' % {
'code': res1.status_code,
'url': res1.url,
'redirects': ', '.join([x.url for x in res1.history]),
})
2022-11-18 23:21:52 +00:00
if debug:
result['text'] = res1.text
return result
# end OnlineSource