document_incoming_invoice_xml/document.py
2025-01-07 16:08:44 +01:00

427 lines
16 KiB
Python

# -*- coding: utf-8 -*-
# This file is part of the document-incoming-invoice-xml-module
# from m-ds for Tryton. The COPYRIGHT file at the top level of
# this repository contains the full copyright notices and license terms.
# https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/021.htm?https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/025.htm
import os.path
from lxml import etree
from datetime import datetime
from trytond.pool import PoolMeta, Pool
from trytond.transaction import Transaction
from trytond.exceptions import UserError
from trytond.i18n import gettext
from trytond.model import fields
from trytond.pyson import Eval
xml_types = [
(['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'],
'Factur-X extended', 'facturx_extended'),
(['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'],
'Factur-X EN16931', ''),
(['xsd', 'Factur-X_1.07.2_BASIC', 'Factur-X_1.07.2_BASIC.xsd'],
'Factur-X basic', ''),
(['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'],
'Factur-X basic-wl', ''),
(['xsd', '/Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
'Factur-X minimum', ''),
(['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'],
'CrossIndustryInvoice D22', ''),
(['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'],
'XRechnung - Invoice', ''),
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-CreditNote-2.1.xsd'],
'XRechnung - Credit Note', ''),
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-DebitNote-2.1.xsd'],
'XRechnung - Debit Note', '')]
class Incoming(metaclass=PoolMeta):
__name__ = 'document.incoming'
xsd_type = fields.Char(
string='XML Content', readonly=True,
states={'invisible': Eval('mime_type', '') != 'application/xml'})
@classmethod
def default_company(cls):
return Transaction().context.get('company')
def _process_supplier_invoice(self):
""" try to detect content of 'data', read values
"""
invoice = super()._process_supplier_invoice()
self.xsd_type = None
if self.mime_type == 'application/xml':
# detect xml-content
xml_info = self._facturx_detect_content()
if xml_info:
(xsd_type, funcname, xmltree) = xml_info
self.xsd_type = xsd_type
xml_read_func = getattr(self, '_readxml_%s' % funcname, None)
if not xml_read_func:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_implemented',
xmltype=xsd_type))
# read xml data
invoice = xml_read_func(invoice, xmltree)
return invoice
def _readxml_getvalue(
self, xmltree, tags, vtype=None, allow_list=False,
childs=[]):
""" read 'text'-part from xml-xpath,
convert to 'vtype'
Args:
tags (list): list of tags to build xpath
vtype (type-class, optional): to convert value of text-part
(if not childs). Defaults to None.
allow_list (boolean, optional): get result as list of values,
Defaults to False.
childs (list): read child-items of selected node
[('ns', 'tag', <type>),...],
if emtpy: read text-part of selected node, Defaults to []
Returns:
various: converted value or None
"""
result = []
xpath = '/' + '/'.join(tags)
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
# dict to find children of selected node
childs_dict = {
'{%(ns)s}%(tag)s' % {'ns': xmltree.nsmap[x[0]], 'tag': x[1]}: {
'type': x[2], 'tag': x[1]}
for x in childs}
if nodes:
for node1 in nodes:
if not childs_dict:
result.append(
node1.text if vtype is None else vtype(node1.text))
else:
values = {}
for x in node1.getchildren():
if x.tag in childs_dict.keys():
values[childs_dict[x.tag]['tag']] = (
x.text if childs_dict[x.tag]['type'] is None
else childs_dict[x.tag]['type'](x.text))
result.append(values)
if not allow_list:
break
if not allow_list:
if result:
return result[0]
return None
return result
def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None):
""" read attribute from xml-xpath,
convert to 'vtype'
Args:
tags (list): list fo tags to build xpath
attrib (str): name of attribute to read
vtype (type-class, optional): to convert value. Defaults to None.
Returns:
various: converted value or None
"""
result = None
xpath = '/' + '/'.join(tags)
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
if nodes:
result = nodes[0].attrib.get(attrib, None)
if result and vtype:
result = vtype(result)
return result
def _readxml_find_party(self, party_data):
""" find party by search with data from xml-file
Args:
party_data (dict): data of party read from xml
Returns:
int: id of party or None
"""
pool = Pool()
Party = pool.get('party.party')
Address = pool.get('party.address')
if party_data['name']:
query = [(
'name', 'ilike',
'%%%(name)s%%' % {'name': party_data['name']})]
if len(set({'postal_code', 'street', 'city'}).intersection(
set(party_data.keys()))) == 3:
# ignore capitalization
address_sql = Address.search([
('city', 'ilike', party_data['city']),
('postal_code', '=', party_data['postal_code']),
('street', 'ilike',
party_data['street'].split('\n')[0] + '%')],
query=True)
query.append(('addresses', 'in', address_sql))
party = Party.search(query)
if party:
# we use this party if its a exact match
if len(party) == 1:
return party[0].id
return None
def _readxml_create_party(self, partydata):
""" create party with data from xml
Args:
partydata (dict): data of party, read from xml
Returns:
int: id of created party
"""
pool = Pool()
Party = pool.get('party.party')
Invoice = pool.get('account.invoice')
Company = pool.get('company.company')
if not partydata['name']:
return None
to_create = {'name': partydata['name']}
address = {
x: partydata[x]
for x in ['street', 'postal_code', 'city', 'country',
'subdivision']
if x in partydata.keys()}
# if no country, we use the company-country
if 'country' not in address.keys():
company = Invoice.default_company()
if company:
company = Company(company)
company_address = company.party.address_get()
if company_address and company_address.country:
address['country'] = company_address.country.id
if address:
to_create['addresses'] = [('create', [address])]
party, = Party.create([to_create])
return party.id
def _readxml_party_data(self, xmltree, tags, create_party=False):
""" read party data
Args:
xmltree (xmldata): XML-tree
tags (list): tags to build xpath
create_party (boolean, optional): create party if not found
Returns:
dict: data of party
"""
pool = Pool()
Country = pool.get('country.country')
SubDivision = pool.get('country.subdivision')
result = {}
# name
result['name'] = self._readxml_getvalue(
xmltree, tags + ['ram:Name'])
result['postal_code'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:PostcodeCode'])
if not result['postal_code']:
del result['postal_code']
# address, max. 3 lines
result['street'] = [self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineOne'])]
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineTwo']))
result['street'].append(self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineThree']))
result['street'] = '\n'.join([x for x in result['street'] if x])
if not result['street']:
del result['street']
# city
result['city'] = self._readxml_getvalue(
xmltree, tags + ['ram:PostalTradeAddress', 'ram:CityName'])
if not result['city']:
del result['city']
# country
country_code = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountryID'])
if country_code:
country = Country.search([('code', '=', country_code.upper())])
if country:
result['country'] = country[0].id
# subdivision
subdivision = self._readxml_getvalue(
xmltree,
tags + ['ram:PostalTradeAddress', 'ram:CountrySubDivisionName'])
if subdivision and ('country' in result.keys()):
subdiv = SubDivision.search([
('name', '=', subdivision),
('country', '=', result['country'])])
if subdiv:
result['subdivision'] = subdiv[0].id
party_id = self._readxml_find_party(result)
if party_id:
result['party'] = party_id
else:
if create_party:
party_id = self._readxml_create_party(result)
if party_id:
result['party'] = party_id
return result
def _readxml_facturx_extended(self, invoice, xmltree):
""" read factur-x extended
Args:
invoice (record): model account.invoice
Returns:
record: model account.invoice
"""
Configuration = Pool().get('document.incoming.configuration')
config = Configuration.get_singleton()
# check invoice-type:
# allowed codes 380 incoice, 381 credit note
inv_code = self._readxml_getvalue(xmltree, [
'rsm:CrossIndustryInvoice',
'rsm:ExchangedDocument', 'ram:TypeCode'], int)
if inv_code not in [380, 381]:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='invalid type-code: %(code)s (expect: 380, 381)' % {
'code': str(inv_code)}))
invoice_number = self._readxml_getvalue(xmltree, [
'rsm:CrossIndustryInvoice',
'rsm:ExchangedDocument', 'ram:ID'])
if config and config.number_target == 'number':
invoice.number = invoice_number
else:
invoice.reference = invoice_number
# invoice-date
date_path = [
'rsm:CrossIndustryInvoice', 'rsm:ExchangedDocument',
'ram:IssueDateTime', 'udt:DateTimeString']
date_format = self._readxml_getattrib(xmltree, date_path, 'format')
if date_format != '102':
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='invalid date-format: %(code)s (expect: 102)' % {
'code': str(date_format)}))
invoice.invoice_date = self._readxml_convertdate(
self._readxml_getvalue(xmltree, date_path))
# IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment'
note_list = self._readxml_getvalue(xmltree, [
'rsm:CrossIndustryInvoice',
'rsm:ExchangedDocument', 'ram:IncludedNote'],
allow_list=True,
childs=[('ram', 'Content', str), ('ram', 'ContentCode', str),
('ram', 'SubjectCode', str)])
if note_list:
invoice.description = note_list[0].get('Content', None)
invoice.comment = '\n'.join([
'%(code)s%(subj)s%(msg)s' % {
'code': ('Code=%s, ' % x.get('ContentCode', ''))
if x.get('ContentCode', '') else '',
'subj': ('Subject=%s, ' % x.get('SubjectCode', ''))
if x.get('SubjectCode', '') else '',
'msg': x.get('Content', ''),
} for x in note_list[1:]])
# supplier party
add_party = config and config.create_supplier
seller_party = self._readxml_party_data(xmltree, [
'rsm:CrossIndustryInvoice', 'rsm:SupplyChainTradeTransaction',
'ram:ApplicableHeaderTradeAgreement',
'ram:SellerTradeParty'], create_party=add_party)
if seller_party:
if 'party' in seller_party.keys():
invoice.party = seller_party['party']
invoice.on_change_party()
else:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_no_supplierparty',
partytxt=', '.join([
seller_party[x].replace('\n', '; ')
for x in seller_party.keys()])))
# company party
buyer_party = self._readxml_party_data(xmltree, [
'rsm:CrossIndustryInvoice', 'rsm:SupplyChainTradeTransaction',
'ram:ApplicableHeaderTradeAgreement',
'ram:BuyerTradeParty'], create_party=False)
# check if we found our company
if config and not config.accept_other_company:
company_party_id = self._readxml_find_party(buyer_party)
if not (company_party_id and
(company_party_id == self.company.party.id)):
raise UserError(gettext(
'document_incoming_invoice_xml.msg_not_our_company',
partytxt=', '.join([
buyer_party[x].replace('\n', '; ')
for x in buyer_party.keys()])))
# lines of invoice
return invoice
def _readxml_convertdate(self, date_string):
""" convert date-string to python-date
Args:
date_string (_type_): _description_
"""
if not date_string:
return None
try:
return datetime.strptime(date_string, '%Y%m%d').date()
except ValueError:
raise UserError(gettext(
'document_incoming_invoice_xml.msg_convert_error',
msg='cannot convert %(val)s' % {
'val': date_string}))
def _facturx_detect_content(self):
""" check xml-data against xsd of XRechnung and FacturX,
begin with extended, goto minimal, then xrechnung
Returns:
tuple: ('xsd tyle', '<function to read xml>', <xml etree>)
defaults to None if not detected
"""
xml_data = etree.fromstring(self.data)
for xsdpath, xsdtype, funcname in xml_types:
fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath)
schema = etree.XMLSchema(etree.parse(fname))
try:
schema.assertValid(xml_data)
except etree.DocumentInvalid:
pass
return (xsdtype, funcname, xml_data)
return None
# end Incoming