228 lines
8.6 KiB
Python
228 lines
8.6 KiB
Python
![]() |
# -*- coding: utf-8 -*-
|
||
|
# This file is part of the document-incoming-invoice-xml-module
|
||
|
# from m-ds for Tryton. The COPYRIGHT file at the top level of
|
||
|
# this repository contains the full copyright notices and license terms.
|
||
|
|
||
|
# https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/021.htm?https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/025.htm
|
||
|
|
||
|
import os.path
|
||
|
from lxml import etree
|
||
|
from datetime import datetime
|
||
|
from trytond.pool import PoolMeta
|
||
|
from trytond.transaction import Transaction
|
||
|
from trytond.exceptions import UserError
|
||
|
from trytond.i18n import gettext
|
||
|
from trytond.model import fields
|
||
|
from trytond.pyson import Eval
|
||
|
|
||
|
|
||
|
xml_types = [
|
||
|
(['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'],
|
||
|
'Factur-X extended', 'facturx_extended'),
|
||
|
(['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'],
|
||
|
'Factur-X EN16931', ''),
|
||
|
(['xsd', 'Factur-X_1.07.2_BASIC', 'Factur-X_1.07.2_BASIC.xsd'],
|
||
|
'Factur-X basic', ''),
|
||
|
(['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'],
|
||
|
'Factur-X basic-wl', ''),
|
||
|
(['xsd', '/Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
|
||
|
'Factur-X minimum', ''),
|
||
|
(['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'],
|
||
|
'CrossIndustryInvoice D22', ''),
|
||
|
(['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'],
|
||
|
'XRechnung - Invoice', ''),
|
||
|
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-CreditNote-2.1.xsd'],
|
||
|
'XRechnung - Credit Note', ''),
|
||
|
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-DebitNote-2.1.xsd'],
|
||
|
'XRechnung - Debit Note', '')]
|
||
|
|
||
|
|
||
|
class Incoming(metaclass=PoolMeta):
|
||
|
__name__ = 'document.incoming'
|
||
|
|
||
|
xsd_type = fields.Char(
|
||
|
string='XML Content', readonly=True,
|
||
|
states={'invisible': Eval('mime_type', '') != 'application/xml'})
|
||
|
|
||
|
@classmethod
|
||
|
def default_company(cls):
|
||
|
return Transaction().context.get('company')
|
||
|
|
||
|
def _process_supplier_invoice(self):
|
||
|
""" try to detect content of 'data', read values
|
||
|
"""
|
||
|
invoice = super()._process_supplier_invoice()
|
||
|
|
||
|
self.xsd_type = None
|
||
|
if self.mime_type == 'application/xml':
|
||
|
# detect xml-content
|
||
|
xml_info = self._facturx_detect_content()
|
||
|
if xml_info:
|
||
|
(xsd_type, funcname, xmltree) = xml_info
|
||
|
self.xsd_type = xsd_type
|
||
|
|
||
|
xml_read_func = getattr(self, '_readxml_%s' % funcname, None)
|
||
|
if not xml_read_func:
|
||
|
raise UserError(gettext(
|
||
|
'document_incoming_invoice_xml.msg_not_implemented',
|
||
|
xmltype=xsd_type))
|
||
|
# read xml data
|
||
|
invoice = xml_read_func(invoice, xmltree)
|
||
|
return invoice
|
||
|
|
||
|
def _readxml_getvalue(
|
||
|
self, xmltree, tags, vtype=None, allow_list=False,
|
||
|
text_field=True):
|
||
|
""" read 'text'-part from xml-xpath,
|
||
|
convert to 'vtype'
|
||
|
|
||
|
Args:
|
||
|
tags (list): list of tags to build xpath
|
||
|
vtype (type-class, optional): to convert value. Defaults to None.
|
||
|
allow_list (boolean): get result as list of values
|
||
|
text_field (boolean): return content of 'text'-part of node
|
||
|
|
||
|
Returns:
|
||
|
various: converted value or None
|
||
|
"""
|
||
|
result = []
|
||
|
xpath = '/' + '/'.join(tags)
|
||
|
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
|
||
|
if nodes:
|
||
|
if not text_field:
|
||
|
result.extend(nodes)
|
||
|
else:
|
||
|
for node1 in nodes:
|
||
|
if node1.text:
|
||
|
result.append(
|
||
|
node1.text if vtype is None
|
||
|
else vtype(node1.text))
|
||
|
if not allow_list:
|
||
|
return result[0]
|
||
|
return result
|
||
|
|
||
|
def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None):
|
||
|
""" read attribute from xml-xpath,
|
||
|
convert to 'vtype'
|
||
|
|
||
|
Args:
|
||
|
tags (list): list fo tags to build xpath
|
||
|
attrib (str): name of attribute to read
|
||
|
vtype (type-class, optional): to convert value. Defaults to None.
|
||
|
|
||
|
Returns:
|
||
|
various: converted value or None
|
||
|
"""
|
||
|
result = None
|
||
|
xpath = '/' + '/'.join(tags)
|
||
|
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
|
||
|
if nodes:
|
||
|
result = nodes[0].attrib.get(attrib, None)
|
||
|
|
||
|
if result and vtype:
|
||
|
result = vtype(result)
|
||
|
return result
|
||
|
|
||
|
def _readxml_facturx_extended(self, invoice, xmltree):
|
||
|
""" read factur-x extended
|
||
|
|
||
|
Args:
|
||
|
invoice (record): model account.invoice
|
||
|
|
||
|
Returns:
|
||
|
record: model account.invoice
|
||
|
"""
|
||
|
# check invoice-type:
|
||
|
# allowed codes 380 incoice, 381 credit note
|
||
|
inv_code = self._readxml_getvalue(xmltree, [
|
||
|
'rsm:CrossIndustryInvoice',
|
||
|
'rsm:ExchangedDocument', 'ram:TypeCode'], int)
|
||
|
if inv_code not in [380, 381]:
|
||
|
raise UserError(gettext(
|
||
|
'document_incoming_invoice_xml.msg_convert_error',
|
||
|
msg='invalid type-code: %(code)s (expect: 380, 381)' % {
|
||
|
'code': str(inv_code)}))
|
||
|
|
||
|
invoice.number = self._readxml_getvalue(xmltree, [
|
||
|
'rsm:CrossIndustryInvoice',
|
||
|
'rsm:ExchangedDocument', 'ram:ID'])
|
||
|
|
||
|
# invoice-date
|
||
|
date_path = [
|
||
|
'rsm:CrossIndustryInvoice', 'rsm:ExchangedDocument',
|
||
|
'ram:IssueDateTime', 'udt:DateTimeString']
|
||
|
date_format = self._readxml_getattrib(xmltree, date_path, 'format')
|
||
|
if date_format != '102':
|
||
|
raise UserError(gettext(
|
||
|
'document_incoming_invoice_xml.msg_convert_error',
|
||
|
msg='invalid date-format: %(code)s (expect: 102)' % {
|
||
|
'code': str(date_format)}))
|
||
|
invoice.invoice_date = self._readxml_convertdate(
|
||
|
self._readxml_getvalue(xmltree, date_path))
|
||
|
|
||
|
# IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment'
|
||
|
note_list = self._readxml_getvalue(xmltree, [
|
||
|
'rsm:CrossIndustryInvoice',
|
||
|
'rsm:ExchangedDocument', 'ram:IncludedNote'],
|
||
|
allow_list=True, text_field=False)
|
||
|
print('\n## note_list:', (note_list,))
|
||
|
if note_list:
|
||
|
invoice.description = self._readxml_getvalue(
|
||
|
note_list[0], ['ram:Content'], allow_list=False)
|
||
|
if len(note_list) > 1:
|
||
|
for x in note_list[1:]:
|
||
|
print('- x:', x)
|
||
|
cont_code = self._readxml_getvalue(
|
||
|
x, ['ram:ContentCode'], allow_list=False)
|
||
|
print('- cont_code:', cont_code)
|
||
|
cont_str = self._readxml_getvalue(
|
||
|
x, ['ram:Content'], allow_list=False)
|
||
|
|
||
|
# descr_list = self._readxml_getvalue(xmltree, [
|
||
|
# 'rsm:CrossIndustryInvoice',
|
||
|
# 'rsm:ExchangedDocument', 'ram:IncludedNote',
|
||
|
# 'ram:Content'], allow_list=True)
|
||
|
# if descr_list:
|
||
|
# invoice.description = descr_list[0]
|
||
|
# if len(descr_list) > 1:
|
||
|
# invoice.comment = '\n'.join(descr_list[1:])
|
||
|
return invoice
|
||
|
|
||
|
def _readxml_convertdate(self, date_string):
|
||
|
""" convert date-string to python-date
|
||
|
|
||
|
Args:
|
||
|
date_string (_type_): _description_
|
||
|
"""
|
||
|
if not date_string:
|
||
|
return None
|
||
|
try:
|
||
|
return datetime.strptime(date_string, '%Y%m%d').date()
|
||
|
except ValueError:
|
||
|
raise UserError(gettext(
|
||
|
'document_incoming_invoice_xml.msg_convert_error',
|
||
|
msg='cannot convert %(val)s' % {
|
||
|
'val': date_string}))
|
||
|
|
||
|
def _facturx_detect_content(self):
|
||
|
""" check xml-data against xsd of XRechnung and FacturX,
|
||
|
begin with extended, goto minimal, then xrechnung
|
||
|
|
||
|
Returns:
|
||
|
tuple: ('xsd tyle', '<function to read xml>', <xml etree>)
|
||
|
defaults to None if not detected
|
||
|
"""
|
||
|
xml_data = etree.fromstring(self.data)
|
||
|
|
||
|
for xsdpath, xsdtype, funcname in xml_types:
|
||
|
fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath)
|
||
|
schema = etree.XMLSchema(etree.parse(fname))
|
||
|
try:
|
||
|
schema.assertValid(xml_data)
|
||
|
except etree.DocumentInvalid:
|
||
|
pass
|
||
|
return (xsdtype, funcname, xml_data)
|
||
|
return None
|
||
|
|
||
|
# end Incoming
|