optimize detection of xml-content

This commit is contained in:
Frederik Jaeckel 2025-01-20 16:38:12 +01:00
parent 4da7a26f83
commit 4278d99170

View file

@ -8,6 +8,7 @@
import os.path import os.path
import json import json
import copy import copy
import facturx
from lxml import etree from lxml import etree
from datetime import datetime, date from datetime import datetime, date
from decimal import Decimal from decimal import Decimal
@ -22,6 +23,8 @@ from trytond.protocols.jsonrpc import JSONEncoder
xml_types = [ xml_types = [
(['xsd', 'Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
'Factur-X minimum', 'facturx_extended'),
(['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'], (['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'],
'Factur-X extended', 'facturx_extended'), 'Factur-X extended', 'facturx_extended'),
(['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'], (['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'],
@ -30,8 +33,6 @@ xml_types = [
'Factur-X basic', ''), 'Factur-X basic', ''),
(['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'], (['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'],
'Factur-X basic-wl', ''), 'Factur-X basic-wl', ''),
(['xsd', '/Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
'Factur-X minimum', ''),
(['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'], (['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'],
'CrossIndustryInvoice D22', ''), 'CrossIndustryInvoice D22', ''),
(['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'], (['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'],
@ -53,19 +54,29 @@ class Incoming(metaclass=PoolMeta):
def default_company(cls): def default_company(cls):
return Transaction().context.get('company') return Transaction().context.get('company')
@classmethod
def process(cls, documents, with_children=False):
""" check if content can be converted
Args:
documents (list): records of model document.incoming
with_children (bool, optional): convert sub-documents.
Defaults to False.
"""
for document in documents:
cls._readxml_check_content(document)
super().process(documents, with_children)
def _process_supplier_invoice(self): def _process_supplier_invoice(self):
""" try to detect content of 'data', read values """ try to detect content of 'data', read values
""" """
invoice = super()._process_supplier_invoice() invoice = super()._process_supplier_invoice()
self.xsd_type = None
if self.mime_type == 'application/xml': if self.mime_type == 'application/xml':
# detect xml-content # detect xml-content
xml_info = self._facturx_detect_content() xml_info = self._facturx_detect_content()
if xml_info: if xml_info:
(xsd_type, funcname, xmltree) = xml_info (xsd_type, funcname, xmltree) = xml_info
self.xsd_type = xsd_type
xml_read_func = getattr(self, '_readxml_%s' % funcname, None) xml_read_func = getattr(self, '_readxml_%s' % funcname, None)
if not xml_read_func: if not xml_read_func:
raise UserError(gettext( raise UserError(gettext(
@ -74,7 +85,9 @@ class Incoming(metaclass=PoolMeta):
# read xml data, write to 'self.parsed_data' # read xml data, write to 'self.parsed_data'
xml_read_func(xmltree) xml_read_func(xmltree)
# update invoice with imported data # update invoice with imported data
if self.parsed_data:
invoice = self._readxml_update_invoice(invoice) invoice = self._readxml_update_invoice(invoice)
# raise ValueError('stop')
return invoice return invoice
def _readxml_xpath(self, tags): def _readxml_xpath(self, tags):
@ -797,7 +810,7 @@ class Incoming(metaclass=PoolMeta):
('ram:BatchID', 'lot'), ('ram:BatchID', 'lot'),
('ram:BrandName', 'brand_name'), ('ram:BrandName', 'brand_name'),
('ram:ModelName', 'model_name'), ('ram:ModelName', 'model_name'),
('ram:OriginTradeCountry', 'trade_country') (['ram:OriginTradeCountry', 'ram:ID'], 'trade_country')
]: ]:
result[key] = self._readxml_getvalue(xmldata, xpath_line + [ result[key] = self._readxml_getvalue(xmldata, xpath_line + [
'ram:SpecifiedTradeProduct', xkey]) 'ram:SpecifiedTradeProduct', xkey])
@ -809,8 +822,8 @@ class Incoming(metaclass=PoolMeta):
'ram:ApplicableProductCharacteristic'], [ 'ram:ApplicableProductCharacteristic'], [
('ram:TypeCode', 'code'), ('ram:TypeCode', 'code'),
('ram:Description', 'description'), ('ram:Description', 'description'),
('ram:ValueMeasure', 'uom'), ('ram:ValueMeasure', 'value', Decimal),
('ram:ValueY', 'value')]) ('ram:Value', 'uom')])
# classification of product # classification of product
result['classification'] = self._readxml_read_listdata( result['classification'] = self._readxml_read_listdata(
@ -958,7 +971,29 @@ class Incoming(metaclass=PoolMeta):
msg='cannot convert %(val)s' % { msg='cannot convert %(val)s' % {
'val': date_string})) 'val': date_string}))
def _facturx_detect_content(self): @classmethod
def _readxml_check_content(cls, document):
""" try to detect content, fire exception if fail
Args:
document (record): model document.incoming
"""
xml_data = etree.fromstring(document.data)
try:
fx_flavour = facturx.get_flavor(xml_data)
fx_level = facturx.get_level(xml_data)
document._facturx_detect_content(fx_flavour, fx_level)
document.save()
except Exception as e1:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='%(name)s [%(flavour)s|%(level)s] %(msg)s' % {
'name': document.name or '-',
'flavour': fx_flavour,
'level': fx_level,
'msg': str(e1)}))
def _facturx_detect_content(self, flavour=None, level=None):
""" check xml-data against xsd of XRechnung and FacturX, """ check xml-data against xsd of XRechnung and FacturX,
begin with extended, goto minimal, then xrechnung begin with extended, goto minimal, then xrechnung
@ -969,13 +1004,23 @@ class Incoming(metaclass=PoolMeta):
xml_data = etree.fromstring(self.data) xml_data = etree.fromstring(self.data)
for xsdpath, xsdtype, funcname in xml_types: for xsdpath, xsdtype, funcname in xml_types:
if flavour == 'factur-x' and level:
if not (xsdtype.lower().startswith('factur-x') and
xsdtype.lower().endswith(level)):
# skip check if xml-content is already known
continue
fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath) fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath)
schema = etree.XMLSchema(etree.parse(fname)) schema = etree.XMLSchema(etree.parse(fname))
try: try:
schema.assertValid(xml_data) schema.assertValid(xml_data)
except etree.DocumentInvalid: self.xsd_type = xsdtype
pass
return (xsdtype, funcname, xml_data) return (xsdtype, funcname, xml_data)
except etree.DocumentInvalid as e1:
# fire exception for knows xml-content
if flavour == 'factur-x' and level:
raise e1
pass
return None return None
# end Incoming # end Incoming