370 lines
12 KiB
Python
370 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
# This file is part of the investment-module from m-ds for Tryton.
|
|
# The COPYRIGHT file at the top level of this repository contains the
|
|
# full copyright notices and license terms.
|
|
|
|
from string import Template
|
|
import requests, logging, html2text, re
|
|
from datetime import datetime
|
|
from decimal import Decimal
|
|
from trytond.model import ModelView, ModelSQL, fields, Unique, Check
|
|
from trytond.transaction import Transaction
|
|
from trytond.pool import Pool
|
|
from trytond.pyson import Eval, Bool
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
sel_rgxdecimal = [
|
|
('.', '.'),
|
|
(',', ','),
|
|
]
|
|
|
|
|
|
sel_rgxidtype = [
|
|
('isin', 'ISIN'),
|
|
('nsin', 'NSIN'),
|
|
('symbol', 'Symbol'),
|
|
]
|
|
|
|
sel_rgxdatefmt = [
|
|
('%d.%m.%Y', 'dd.mm.yyyy'),
|
|
('%d.%m.%y', 'dd.mm.yy'),
|
|
('%m/%d/%Y', 'mm/dd/yyyy'),
|
|
('%m/%d/%y', 'mm/dd/yy'),
|
|
('%Y-%m-%d', 'yyyy-mm-dd'),
|
|
('%b %d %Y', 'mon dd yyyy'),
|
|
]
|
|
|
|
fields_check = ['url', 'nsin', 'isin', 'symbol', 'text', 'http_state', \
|
|
'fnddate', 'fndrate', 'fndident']
|
|
|
|
|
|
class OnlineSource(ModelSQL, ModelView):
|
|
'Online Source'
|
|
__name__ = 'investment.source'
|
|
|
|
name = fields.Char(string='Name', required=True)
|
|
url = fields.Char(string='URL', required=True)
|
|
nohtml = fields.Boolean(string='Remove HTML',
|
|
help='Removes HTML tags before the text is interpreted.')
|
|
rgxdate = fields.Char(string='Date', required=True,
|
|
help='Regex code to find the date in the downloaded HTML file.')
|
|
rgxdatefmt = fields.Selection(string='Date format', required=True,
|
|
selection=sel_rgxdatefmt)
|
|
rgxrate = fields.Char(string='Rate', required=True,
|
|
help='Regex code to find the rate in the downloaded HTML file.')
|
|
rgxdecimal = fields.Selection(string='Decimal Separator', required=True,
|
|
help='Decimal separator for converting the market value into a number.',
|
|
selection=sel_rgxdecimal)
|
|
rgxident = fields.Char(string='Identifier',
|
|
help='Regex code to find the identifier in the downloaded HTML file.')
|
|
rgxidtype = fields.Selection(string='ID-Type', selection=sel_rgxidtype,
|
|
help='Type of identifier used to validate the result.',
|
|
states={
|
|
'required': Bool(Eval('rgxident', '')),
|
|
}, depends=['rgxident'])
|
|
|
|
# field to test requests
|
|
used_url = fields.Function(fields.Char(string='Used URL', readonly=True,
|
|
help='This URL is used to retrieve the HTML file.'),
|
|
'on_change_with_used_url')
|
|
nsin = fields.Function(fields.Char(string='NSIN'),
|
|
'on_change_with_nsin', setter='set_test_value')
|
|
isin = fields.Function(fields.Char(string='ISIN'),
|
|
'on_change_with_isin', setter='set_test_value')
|
|
symbol = fields.Function(fields.Char(string='Symbol'),
|
|
'on_change_with_symbol', setter='set_test_value')
|
|
http_state = fields.Function(fields.Char(string='HTTP-State',
|
|
readonly=True), 'on_change_with_http_state')
|
|
text = fields.Function(fields.Text(string='Result',
|
|
readonly=True), 'on_change_with_text')
|
|
fnddate = fields.Function(fields.Date(string='Date', readonly=True,
|
|
help='Date found during test query.'),
|
|
'on_change_with_fnddate')
|
|
fndrate = fields.Function(fields.Numeric(string='Rate', readonly=True,
|
|
help='Rate found during test query.', digits=(16,4)),
|
|
'on_change_with_fndrate')
|
|
fndident = fields.Function(fields.Char(string='Identifier', readonly=True,
|
|
help='Identifier found during test query.'),
|
|
'on_change_with_fndident')
|
|
|
|
@classmethod
|
|
def __setup__(cls):
|
|
super(OnlineSource, cls).__setup__()
|
|
cls._order.insert(0, ('name', 'DESC'))
|
|
|
|
@classmethod
|
|
def default_url(cls):
|
|
""" defaul-url
|
|
"""
|
|
return 'https://'
|
|
|
|
@classmethod
|
|
def default_rgxdate(cls):
|
|
""" code to find date: dd.mm.yyyy
|
|
"""
|
|
return '(\\d{2}\\.\\d{2}\\.\\d{4})'
|
|
|
|
@classmethod
|
|
def default_rgxdatefmt(cls):
|
|
""" dd.mm.yyyy
|
|
"""
|
|
return '%d.%m.%Y'
|
|
|
|
@classmethod
|
|
def default_rgxrate(cls):
|
|
""" nn,nn
|
|
"""
|
|
return '(\\d+,\\d+)'
|
|
|
|
@classmethod
|
|
def default_rgxidtype(cls):
|
|
""" isin
|
|
"""
|
|
return 'isin'
|
|
|
|
@classmethod
|
|
def default_rgxdecimal(cls):
|
|
""" comma
|
|
"""
|
|
return ','
|
|
|
|
@classmethod
|
|
def default_nohtml(cls):
|
|
""" default: True
|
|
"""
|
|
return True
|
|
|
|
@fields.depends(*fields_check)
|
|
def on_change_nsin(self):
|
|
""" run request
|
|
"""
|
|
self.call_online_source()
|
|
|
|
@fields.depends(*fields_check)
|
|
def on_change_isin(self):
|
|
""" run request
|
|
"""
|
|
self.call_online_source()
|
|
|
|
@fields.depends(*fields_check)
|
|
def on_change_symbol(self):
|
|
""" run request
|
|
"""
|
|
self.call_online_source()
|
|
|
|
def on_change_with_fnddate(self, name=None):
|
|
return None
|
|
|
|
def on_change_with_fndrate(self, name=None):
|
|
return None
|
|
|
|
def on_change_with_fndident(self, name=None):
|
|
return ''
|
|
|
|
def on_change_with_http_state(self, name=True):
|
|
return ''
|
|
|
|
def on_change_with_text(self, name=None):
|
|
return ''
|
|
|
|
def on_change_with_nsin(self, name=None):
|
|
return ''
|
|
|
|
def on_change_with_isin(self, name=None):
|
|
return ''
|
|
|
|
def on_change_with_symbol(self, name=None):
|
|
return ''
|
|
|
|
@fields.depends('url', 'isin', 'nsin', 'symbol')
|
|
def on_change_with_used_url(self, name=None):
|
|
""" get url for testing
|
|
"""
|
|
if self.url:
|
|
return self.get_url_with_parameter(
|
|
isin = self.isin,
|
|
nsin = self.nsin,
|
|
symbol = self.symbol,
|
|
)
|
|
|
|
@classmethod
|
|
def set_test_value(cls, record, name, value):
|
|
""" dont store it
|
|
"""
|
|
pass
|
|
|
|
def call_online_source(self):
|
|
""" use updated values to call online-source,
|
|
for testing parameters
|
|
"""
|
|
OSourc = Pool().get('investment.source')
|
|
|
|
result = OSourc.read_from_website(
|
|
self,
|
|
isin = self.isin,
|
|
nsin = self.nsin,
|
|
symbol = self.symbol,
|
|
debug = True,
|
|
)
|
|
self.text = result.get('text', None)
|
|
self.http_state = result.get('http_state', None)
|
|
self.fnddate = result.get('date', None)
|
|
self.fndrate = result.get('rate', None)
|
|
self.fndident = result.get('code', None)
|
|
|
|
def get_url_with_parameter(self, isin=None, nsin=None, symbol=None):
|
|
""" generate url
|
|
"""
|
|
if self.url:
|
|
return Template(self.url).substitute({
|
|
'isin': isin if isin is not None else '',
|
|
'nsin': nsin if nsin is not None else '',
|
|
'symbol': symbol if symbol is not None else '',
|
|
})
|
|
|
|
@classmethod
|
|
def update_rate(cls, asset):
|
|
""" read data from inet, write result to rates of asset
|
|
"""
|
|
pool = Pool()
|
|
Rate = pool.get('investment.rate')
|
|
IrDate = pool.get('ir.date')
|
|
|
|
if len(asset.updtsources) == 0:
|
|
return
|
|
|
|
for updtsource in asset.updtsources:
|
|
rate_data = cls.read_from_website(
|
|
updtsource,
|
|
isin = asset.isin,
|
|
nsin = asset.wkn,
|
|
symbol = asset.secsymb,
|
|
)
|
|
|
|
if len(updtsource.rgxident or '') > 0:
|
|
# check result - same code?
|
|
code = rate_data.get('code', None)
|
|
if code:
|
|
asset_code = getattr(asset, {
|
|
'isin': 'isin',
|
|
'nsin': 'wkn',
|
|
'symbol': 'secsymb',
|
|
}[updtsource.rgxidtype])
|
|
if (asset_code or '').lower() != code.lower():
|
|
# fail
|
|
logger.warning(
|
|
'update_rate: got wrong code "%(wrong)s" - expected "%(exp)s"' % {
|
|
'exp': asset_code,
|
|
'wrong': code,
|
|
})
|
|
continue
|
|
|
|
to_create = {
|
|
'date': rate_data.get('date', None),
|
|
'rate': rate_data.get('rate', None),
|
|
'asset': asset.id,
|
|
}
|
|
if (to_create['date'] is not None) and \
|
|
(to_create['rate'] is not None):
|
|
# check if exists
|
|
if Rate.search_count([
|
|
('asset.id', '=', asset.id),
|
|
('date', '=', to_create['date']),
|
|
]) == 0:
|
|
Rate.create([to_create])
|
|
return True
|
|
else :
|
|
# if we got a record for today - stop
|
|
# otherwise try next source
|
|
if to_create['date'] == IrDate.today():
|
|
break
|
|
return False
|
|
|
|
def get_regex_result(self, html_text, field_name):
|
|
""" run regex on html-text, convert result
|
|
"""
|
|
rgxcode = getattr(self, field_name) or ''
|
|
if len(rgxcode) == 0:
|
|
return None
|
|
|
|
search_result = re.compile(rgxcode).search(html_text)
|
|
if search_result is None:
|
|
return None
|
|
|
|
try :
|
|
result = search_result.group(1)
|
|
except IndexError:
|
|
result = search_result.group(0)
|
|
|
|
if field_name == 'rgxrate':
|
|
dec_sep = [',', '.']
|
|
dec_sep.remove(self.rgxdecimal)
|
|
|
|
result = result.replace(dec_sep[0], '').replace(self.rgxdecimal, '.')
|
|
try :
|
|
result = Decimal(result)
|
|
except :
|
|
result = None
|
|
elif field_name == 'rgxdate':
|
|
try :
|
|
result = datetime.strptime(result, self.rgxdatefmt).date()
|
|
except :
|
|
result = None
|
|
return result
|
|
|
|
@classmethod
|
|
def read_from_website(cls, updtsource, isin=None, nsin=None, symbol=None, debug=False):
|
|
""" read from url, extract values
|
|
"""
|
|
result = {}
|
|
|
|
if updtsource.url == 'https://':
|
|
result['text'] = 'invalid url'
|
|
return result
|
|
|
|
res1 = requests.get(
|
|
updtsource.get_url_with_parameter(
|
|
isin = isin,
|
|
nsin = nsin,
|
|
symbol = symbol,
|
|
),
|
|
allow_redirects=True,
|
|
timeout=5.0)
|
|
|
|
result['http_state'] = '%(code)d: %(msg)s' % {
|
|
'code': res1.status_code,
|
|
'msg': res1.reason,
|
|
}
|
|
|
|
if res1.status_code in [200, 204]:
|
|
html = res1.text
|
|
|
|
# remove html-tags
|
|
if updtsource.nohtml:
|
|
o1 = html2text.HTML2Text()
|
|
o1.ignore_links = True
|
|
o1.ignore_tables = True
|
|
o1.bypass_tables = False
|
|
o1.single_line_break = True
|
|
o1.body_width = 0
|
|
html = o1.handle(html)
|
|
del o1
|
|
|
|
if debug:
|
|
result['text'] = html
|
|
|
|
result['rate'] = updtsource.get_regex_result(html, 'rgxrate')
|
|
result['date'] = updtsource.get_regex_result(html, 'rgxdate')
|
|
result['code'] = updtsource.get_regex_result(html, 'rgxident')
|
|
else :
|
|
logger.error('read_from_website: %(code)s, url: %(url)s, redirects: [%(redirects)s]' % {
|
|
'code': res1.status_code,
|
|
'url': res1.url,
|
|
'redirects': ', '.join([x.url for x in res1.history]),
|
|
})
|
|
if debug:
|
|
result['text'] = res1.text
|
|
return result
|
|
|
|
# end OnlineSource
|