1058 lines
42 KiB
Python
1058 lines
42 KiB
Python
# -*- coding: utf-8 -*-
|
|
# This file is part of the document-incoming-invoice-xml-module
|
|
# from m-ds for Tryton. The COPYRIGHT file at the top level of
|
|
# this repository contains the full copyright notices and license terms.
|
|
|
|
# https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/021.htm?https://portal3.gefeg.com/projectdata/invoice/deliverables/installed/publishingproject/xrechnung%20%28german%20cius%29/xrechnung_crossindustryinvoice%3B%202017-10-18.scm/html/025.htm
|
|
|
|
import os.path
|
|
import json
|
|
import copy
|
|
import facturx
|
|
from lxml import etree
|
|
from datetime import datetime, date
|
|
from decimal import Decimal
|
|
from trytond.pool import PoolMeta, Pool
|
|
from trytond.report import Report
|
|
from trytond.transaction import Transaction
|
|
from trytond.exceptions import UserError
|
|
from trytond.i18n import gettext
|
|
from trytond.model import fields
|
|
from trytond.pyson import Eval
|
|
from trytond.protocols.jsonrpc import JSONEncoder
|
|
|
|
|
|
xml_types = [
|
|
(['xsd', 'Factur-X_1.07.2_MINIMUM', 'Factur-X_1.07.2_MINIMUM.xsd'],
|
|
'Factur-X minimum', 'facturx_extended'),
|
|
(['xsd', 'Factur-X_1.07.2_EXTENDED', 'Factur-X_1.07.2_EXTENDED.xsd'],
|
|
'Factur-X extended', 'facturx_extended'),
|
|
(['xsd', 'Factur-X_1.07.2_EN16931', 'Factur-X_1.07.2_EN16931.xsd'],
|
|
'Factur-X EN16931', ''),
|
|
(['xsd', 'Factur-X_1.07.2_BASIC', 'Factur-X_1.07.2_BASIC.xsd'],
|
|
'Factur-X basic', ''),
|
|
(['xsd', 'Factur-X_1.07.2_BASICWL', 'Factur-X_1.07.2_BASICWL.xsd'],
|
|
'Factur-X basic-wl', ''),
|
|
(['xsd', 'CII D22B XSD', 'CrossIndustryInvoice_100pD22B.xsd'],
|
|
'CrossIndustryInvoice D22', ''),
|
|
(['xsd', 'os-UBL-2.1', 'xsd/maindoc', 'UBL-Invoice-2.1.xsd'],
|
|
'XRechnung - Invoice', ''),
|
|
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-CreditNote-2.1.xsd'],
|
|
'XRechnung - Credit Note', ''),
|
|
(['xsd', 'os-UBL-2.1', 'xsd', 'maindoc', 'UBL-DebitNote-2.1.xsd'],
|
|
'XRechnung - Debit Note', '')]
|
|
|
|
|
|
class Incoming(metaclass=PoolMeta):
|
|
__name__ = 'document.incoming'
|
|
|
|
xsd_type = fields.Char(
|
|
string='XML Content', readonly=True,
|
|
states={'invisible': Eval('mime_type', '') != 'application/xml'})
|
|
|
|
@classmethod
|
|
def default_company(cls):
|
|
return Transaction().context.get('company')
|
|
|
|
@classmethod
|
|
def process(cls, documents, with_children=False):
|
|
""" check if content can be converted
|
|
|
|
Args:
|
|
documents (list): records of model document.incoming
|
|
with_children (bool, optional): convert sub-documents.
|
|
Defaults to False.
|
|
"""
|
|
for document in documents:
|
|
cls._readxml_check_content(document)
|
|
super().process(documents, with_children)
|
|
|
|
def _process_supplier_invoice(self):
|
|
""" try to detect content of 'data', read values
|
|
"""
|
|
invoice = super()._process_supplier_invoice()
|
|
|
|
if self.mime_type == 'application/xml':
|
|
# detect xml-content
|
|
xml_info = self._facturx_detect_content()
|
|
if xml_info:
|
|
(xsd_type, funcname, xmltree) = xml_info
|
|
xml_read_func = getattr(self, '_readxml_%s' % funcname, None)
|
|
if not xml_read_func:
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_not_implemented',
|
|
xmltype=xsd_type))
|
|
# read xml data, write to 'self.parsed_data'
|
|
xml_read_func(xmltree)
|
|
# update invoice with imported data
|
|
if self.parsed_data:
|
|
invoice = self._readxml_update_invoice(invoice)
|
|
invoice.save()
|
|
self._readxml_check_invoice(invoice)
|
|
# raise ValueError('stop')
|
|
return invoice
|
|
|
|
def _readxml_check_invoice(self, invoice):
|
|
""" check if calculated totals match with data from xml
|
|
|
|
Raises:
|
|
UserError: if calculated values dont match
|
|
with xml-values
|
|
"""
|
|
totals = self.parsed_data.get('total', None)
|
|
if not totals:
|
|
raise UserError(gettext(
|
|
'msg_convert_error.msg_convert_error',
|
|
msg='no totals-section in xml-data'))
|
|
|
|
for xfield, inv_field in [
|
|
('taxbase', 'untaxed_amount'),
|
|
('taxtotal', 'tax_amount'),
|
|
('grand', 'total_amount')]:
|
|
xml_val = totals.get('taxbase', Decimal('0.0'))
|
|
inv_val = getattr(invoice, inv_field)
|
|
if xml_val != inv_val:
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_convert_error',
|
|
msg=' '.join([
|
|
inv_field + ' mismatch',
|
|
'from-xml=' + Report.format_currency(
|
|
xml_val, None, invoice.currency),
|
|
'calculated=' + Report.format_currency(
|
|
inv_val, None, invoice.currency)
|
|
])))
|
|
|
|
def _readxml_xpath(self, tags):
|
|
""" generate xpath
|
|
|
|
Args:
|
|
tags (list): list of string or integer to build path
|
|
"""
|
|
parts = []
|
|
for x in tags:
|
|
if isinstance(x, str):
|
|
parts.append(x)
|
|
elif isinstance(x, int):
|
|
if parts[-1].endswith(']'):
|
|
raise ValueError('multiple list selector')
|
|
parts[-1] += '[%d]' % x
|
|
result = '/' + '/'.join(parts)
|
|
return result
|
|
|
|
def _readxml_getvalue(
|
|
self, xmltree, tags, vtype=None, allow_list=False):
|
|
""" read 'text'-part from xml-xpath,
|
|
convert to 'vtype'
|
|
|
|
Args:
|
|
tags (list): list of tags to build xpath
|
|
vtype (type-class, optional): to convert value of text-part
|
|
Defaults to None.
|
|
allow_list (boolean, optional): get result as list of values,
|
|
Defaults to False.
|
|
|
|
Returns:
|
|
various: converted value or None
|
|
"""
|
|
result = []
|
|
xpath = self._readxml_xpath(tags)
|
|
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
|
|
|
|
if nodes:
|
|
for node1 in nodes:
|
|
result.append(
|
|
node1.text if vtype is None
|
|
else self._readxml_convertdate(node1.text) if vtype == date
|
|
else vtype(node1.text))
|
|
|
|
if not allow_list:
|
|
break
|
|
if not allow_list:
|
|
if result:
|
|
return result[0]
|
|
return None
|
|
return result
|
|
|
|
def _readxml_getattrib(self, xmltree, tags, attrib, vtype=None):
|
|
""" read attribute from xml-xpath,
|
|
convert to 'vtype'
|
|
|
|
Args:
|
|
tags (list): list fo tags to build xpath
|
|
attrib (str): name of attribute to read
|
|
vtype (type-class, optional): to convert value. Defaults to None.
|
|
|
|
Returns:
|
|
various: converted value or None
|
|
"""
|
|
result = None
|
|
xpath = self._readxml_xpath(tags)
|
|
nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap)
|
|
if nodes:
|
|
result = nodes[0].attrib.get(attrib, None)
|
|
|
|
if result and vtype:
|
|
result = vtype(result)
|
|
return result
|
|
|
|
def _readxml_find_party(self, party_data):
|
|
""" find party by search with data from xml-file
|
|
|
|
Args:
|
|
party_data (dict): data of party read from xml
|
|
|
|
Returns:
|
|
int: id of party or None
|
|
"""
|
|
pool = Pool()
|
|
Party = pool.get('party.party')
|
|
Address = pool.get('party.address')
|
|
|
|
if party_data['name']:
|
|
query = [(
|
|
'name', 'ilike',
|
|
'%%%(name)s%%' % {'name': party_data['name']})]
|
|
|
|
if len(set({'postal_code', 'street', 'city'}).intersection(
|
|
set(party_data.keys()))) == 3:
|
|
# ignore capitalization
|
|
address_sql = Address.search([
|
|
('city', 'ilike', party_data['city']),
|
|
('postal_code', '=', party_data['postal_code']),
|
|
('street', 'ilike',
|
|
party_data['street'].split('\n')[0] + '%')],
|
|
query=True)
|
|
query.append(('addresses', 'in', address_sql))
|
|
party = Party.search(query)
|
|
if party:
|
|
# we use this party if its a exact match
|
|
if len(party) == 1:
|
|
return party[0].id
|
|
return None
|
|
|
|
def _readxml_create_party(self, partydata):
|
|
""" create party with data from xml
|
|
|
|
Args:
|
|
partydata (dict): data of party, read from xml
|
|
|
|
Returns:
|
|
int: id of created party
|
|
"""
|
|
pool = Pool()
|
|
Party = pool.get('party.party')
|
|
Invoice = pool.get('account.invoice')
|
|
Company = pool.get('company.company')
|
|
|
|
if not partydata['name']:
|
|
return None
|
|
|
|
to_create = {'name': partydata['name']}
|
|
address = {
|
|
x: partydata[x]
|
|
for x in ['street', 'postal_code', 'city', 'country',
|
|
'subdivision']
|
|
if x in partydata.keys()}
|
|
|
|
# if no country, we use the company-country
|
|
if 'country' not in address.keys():
|
|
company = Invoice.default_company()
|
|
if company:
|
|
company = Company(company)
|
|
company_address = company.party.address_get()
|
|
if company_address and company_address.country:
|
|
address['country'] = company_address.country.id
|
|
if address:
|
|
to_create['addresses'] = [('create', [address])]
|
|
|
|
party, = Party.create([to_create])
|
|
return party.id
|
|
|
|
def _readxml_party_data(self, xmltree, tags, create_party=False):
|
|
""" read party data
|
|
|
|
Args:
|
|
xmltree (xmldata): XML-tree
|
|
tags (list): tags to build xpath
|
|
create_party (boolean, optional): create party if not found
|
|
|
|
Returns:
|
|
dict: data of party
|
|
"""
|
|
pool = Pool()
|
|
Country = pool.get('country.country')
|
|
SubDivision = pool.get('country.subdivision')
|
|
|
|
result = {}
|
|
# name
|
|
result['name'] = self._readxml_getvalue(
|
|
xmltree, tags + ['ram:Name'])
|
|
result['postal_code'] = self._readxml_getvalue(
|
|
xmltree, tags + ['ram:PostalTradeAddress', 'ram:PostcodeCode'])
|
|
if not result['postal_code']:
|
|
del result['postal_code']
|
|
|
|
# address, max. 3 lines
|
|
result['street'] = [self._readxml_getvalue(
|
|
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineOne'])]
|
|
result['street'].append(self._readxml_getvalue(
|
|
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineTwo']))
|
|
result['street'].append(self._readxml_getvalue(
|
|
xmltree, tags + ['ram:PostalTradeAddress', 'ram:LineThree']))
|
|
result['street'] = '\n'.join([x for x in result['street'] if x])
|
|
if not result['street']:
|
|
del result['street']
|
|
|
|
# city
|
|
result['city'] = self._readxml_getvalue(
|
|
xmltree, tags + ['ram:PostalTradeAddress', 'ram:CityName'])
|
|
if not result['city']:
|
|
del result['city']
|
|
|
|
# country
|
|
country_code = self._readxml_getvalue(
|
|
xmltree,
|
|
tags + ['ram:PostalTradeAddress', 'ram:CountryID'])
|
|
if country_code:
|
|
country = Country.search([('code', '=', country_code.upper())])
|
|
if country:
|
|
result['country'] = country[0].id
|
|
|
|
# subdivision
|
|
subdivision = self._readxml_getvalue(
|
|
xmltree,
|
|
tags + ['ram:PostalTradeAddress', 'ram:CountrySubDivisionName'])
|
|
if subdivision and ('country' in result.keys()):
|
|
subdiv = SubDivision.search([
|
|
('name', '=', subdivision),
|
|
('country', '=', result['country'])])
|
|
if subdiv:
|
|
result['subdivision'] = subdiv[0].id
|
|
|
|
party_id = self._readxml_find_party(result)
|
|
if party_id:
|
|
result['party'] = party_id
|
|
else:
|
|
if create_party:
|
|
party_id = self._readxml_create_party(result)
|
|
if party_id:
|
|
result['party'] = party_id
|
|
return result
|
|
|
|
def _readxml_update_invoice(self, invoice):
|
|
""" update invoice with parsed_data
|
|
|
|
Args:
|
|
invoice (record): model account.invoice
|
|
|
|
Returns:
|
|
record: model account.invoice
|
|
"""
|
|
Configuration = Pool().get('document.incoming.configuration')
|
|
|
|
config = Configuration.get_singleton()
|
|
|
|
if config and config.number_target == 'number':
|
|
invoice.number = self.parsed_data.get('invoice_number', None)
|
|
else:
|
|
invoice.reference = self.parsed_data.get('invoice_number', None)
|
|
invoice.invoice_date = self.parsed_data.get('invoice_date', None)
|
|
|
|
note_list = self.parsed_data.get('note_list', None)
|
|
if note_list:
|
|
invoice.description = note_list[0].get('Content', None)
|
|
invoice.comment = '\n'.join([
|
|
'%(code)s%(subj)s%(msg)s' % {
|
|
'code': ('Code=%s, ' % x.get('ContentCode', ''))
|
|
if x.get('ContentCode', '') else '',
|
|
'subj': ('Subject=%s, ' % x.get('SubjectCode', ''))
|
|
if x.get('SubjectCode', '') else '',
|
|
'msg': x.get('Content', ''),
|
|
} for x in note_list[1:]])
|
|
|
|
seller_party = self.parsed_data.get('seller_party', None)
|
|
if seller_party:
|
|
if 'party' in seller_party.keys():
|
|
invoice.party = seller_party['party']
|
|
invoice.on_change_party()
|
|
else:
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_no_supplierparty',
|
|
partytxt=', '.join([
|
|
seller_party[x].replace('\n', '; ')
|
|
for x in seller_party.keys()])))
|
|
|
|
# check if we found our company
|
|
buyer_party = self.parsed_data.get('buyer_party', None)
|
|
if buyer_party and config and not config.accept_other_company:
|
|
company_party_id = self._readxml_find_party(buyer_party)
|
|
if not (company_party_id and
|
|
(company_party_id == self.company.party.id)):
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_not_our_company',
|
|
partytxt=', '.join([
|
|
buyer_party[x].replace('\n', '; ')
|
|
for x in buyer_party.keys()])))
|
|
|
|
lines_data = copy.deepcopy(self.parsed_data.get('lines_data', None))
|
|
if lines_data:
|
|
lines = [
|
|
self._readxml_getinvoiceline(invoice, x)
|
|
for x in lines_data]
|
|
for x in range(len(lines)):
|
|
lines[x].sequence = x + 1
|
|
invoice.lines = lines
|
|
invoice.on_change_lines()
|
|
return invoice
|
|
|
|
def _readxml_facturx_extended(self, xmltree):
|
|
""" read factur-x extended
|
|
|
|
Args:
|
|
invoice (record): model account.invoice
|
|
|
|
Returns:
|
|
record: model account.invoice
|
|
"""
|
|
Configuration = Pool().get('document.incoming.configuration')
|
|
|
|
config = Configuration.get_singleton()
|
|
|
|
if not hasattr(self, 'parsed_data'):
|
|
self.parsed_data = {}
|
|
if not isinstance(self.parsed_data, dict):
|
|
self.parsed_data = {}
|
|
|
|
# rsm:CrossIndustryInvoice
|
|
xpath_cross_ind = ['rsm:CrossIndustryInvoice']
|
|
# rsm:CrossIndustryInvoice/rsm:ExchangedDocument
|
|
xpath_exchg_doc = xpath_cross_ind + ['rsm:ExchangedDocument']
|
|
|
|
# check invoice-type:
|
|
# allowed codes 380 incoice, 381 credit note
|
|
inv_code = self._readxml_getvalue(
|
|
xmltree, xpath_exchg_doc + ['ram:TypeCode'], int)
|
|
if inv_code not in [380, 381]:
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_convert_error',
|
|
msg='invalid type-code: %(code)s (expect: 380, 381)' % {
|
|
'code': str(inv_code)}))
|
|
|
|
self.parsed_data['invoice_number'] = self._readxml_getvalue(
|
|
xmltree, xpath_exchg_doc + ['ram:ID'])
|
|
|
|
# invoice-date
|
|
date_path = xpath_exchg_doc + [
|
|
'ram:IssueDateTime', 'udt:DateTimeString']
|
|
date_format = self._readxml_getattrib(xmltree, date_path, 'format')
|
|
if date_format != '102':
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_convert_error',
|
|
msg='invalid date-format: %(code)s (expect: 102)' % {
|
|
'code': str(date_format)}))
|
|
self.parsed_data['invoice_date'] = self._readxml_convertdate(
|
|
self._readxml_getvalue(xmltree, date_path))
|
|
|
|
# IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment'
|
|
xpath_notes = xpath_exchg_doc + ['ram:IncludedNote']
|
|
# count number of notes
|
|
num_nodes = len(xmltree.xpath(
|
|
self._readxml_xpath(xpath_notes), namespaces=xmltree.nsmap))
|
|
|
|
# read notes and their fields
|
|
note_list = []
|
|
for x in range(1, num_nodes + 1):
|
|
note_list.append({
|
|
y: self._readxml_getvalue(
|
|
xmltree, xpath_notes + [x, 'ram:%s' % y])
|
|
for y in ['Content', 'ContentCode', 'SubjectCode']
|
|
})
|
|
if note_list:
|
|
self.parsed_data['note_list'] = note_list
|
|
|
|
# rsm:CrossIndustryInvoice/rsm:SupplyChainTradeTransaction
|
|
xpath_suppl_chain = xpath_cross_ind + [
|
|
'rsm:SupplyChainTradeTransaction']
|
|
# rsm:CrossIndustryInvoice/
|
|
# rsm:SupplyChainTradeTransaction/
|
|
# ram:ApplicableHeaderTradeAgreement
|
|
xpath_appl_head_agree = xpath_suppl_chain + [
|
|
'ram:ApplicableHeaderTradeAgreement']
|
|
|
|
# supplier party
|
|
add_party = config and config.create_supplier
|
|
seller_party = self._readxml_party_data(
|
|
xmltree, xpath_appl_head_agree + ['ram:SellerTradeParty'],
|
|
create_party=add_party)
|
|
if seller_party:
|
|
self.parsed_data['seller_party'] = seller_party
|
|
|
|
# company party
|
|
buyer_party = self._readxml_party_data(
|
|
xmltree, xpath_appl_head_agree + ['ram:BuyerTradeParty'],
|
|
create_party=False)
|
|
if buyer_party:
|
|
self.parsed_data['buyer_party'] = buyer_party
|
|
|
|
# invoice - lines
|
|
# rsm:CrossIndustryInvoice/
|
|
# rsm:SupplyChainTradeTransaction/
|
|
# ram:IncludedSupplyChainTradeLineItem
|
|
xpath_line_item = xpath_suppl_chain + [
|
|
'ram:IncludedSupplyChainTradeLineItem']
|
|
# get number of invoice-lines
|
|
num_lines = len(xmltree.xpath(
|
|
self._readxml_xpath(xpath_line_item), namespaces=xmltree.nsmap))
|
|
lines_data = []
|
|
for x in range(1, num_lines + 1):
|
|
lines_data.append(
|
|
self._readxml_invoice_line(xmltree, xpath_line_item, x))
|
|
if lines_data:
|
|
self.parsed_data['lines_data'] = lines_data
|
|
|
|
# payment data
|
|
xpath_payment = xpath_suppl_chain + [
|
|
'ram:ApplicableHeaderTradeSettlement']
|
|
self.parsed_data['payment'] = {}
|
|
self.parsed_data['payment']['reference'] = self._readxml_getvalue(
|
|
xmltree, xpath_payment + ['ram:PaymentReference'])
|
|
self.parsed_data['payment']['currency'] = self._readxml_getvalue(
|
|
xmltree, xpath_payment + ['ram:InvoiceCurrencyCode'])
|
|
|
|
# num bank accounts
|
|
xpath_bankaccounts = xpath_payment + [
|
|
'ram:SpecifiedTradeSettlementPaymentMeans']
|
|
num_bank = len(xmltree.xpath(
|
|
self._readxml_xpath(xpath_bankaccounts), namespaces=xmltree.nsmap))
|
|
bank_accounts = []
|
|
for x in range(num_bank):
|
|
bank_account = {
|
|
'info': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts + [x + 1, 'ram:Information']),
|
|
'type': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts + [x + 1, 'ram:TypeCode']),
|
|
# debitor
|
|
'debitor_iban': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts +
|
|
[x + 1, 'ram:PayerPartyDebtorFinancialAccount',
|
|
'ram:IBANID']),
|
|
# creditor
|
|
'creditor_iban': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts +
|
|
[x + 1, 'ram:PayeePartyCreditorFinancialAccount',
|
|
'ram:IBANID']),
|
|
'creditor_name': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts +
|
|
[x + 1, 'ram:PayeePartyCreditorFinancialAccount',
|
|
'ram:AccountName']),
|
|
# financial card
|
|
'card_id': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts +
|
|
[x + 1, 'ram:ApplicableTradeSettlementFinancialCard',
|
|
'ram:ID']),
|
|
'card_holder_name': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts +
|
|
[x + 1, 'ram:ApplicableTradeSettlementFinancialCard',
|
|
'ram:CardholderName']),
|
|
# bank name
|
|
'institution': self._readxml_getvalue(
|
|
xmltree, xpath_bankaccounts +
|
|
[x + 1, 'ram:PayeeSpecifiedCreditorFinancialInstitution',
|
|
'ram:BICID']),
|
|
}
|
|
|
|
# add to list of bank accounts, skip empty values
|
|
bank_accounts.append({
|
|
x: bank_account[x]
|
|
for x in bank_account.keys()
|
|
if bank_account[x]})
|
|
if bank_accounts:
|
|
self.parsed_data['payment']['bank'] = bank_accounts
|
|
|
|
# invoice-taxes
|
|
xpath_invoice_taxes = xpath_payment + ['ram:ApplicableTradeTax']
|
|
taxes = self._readxml_read_listdata(xmltree, xpath_invoice_taxes, [], [
|
|
('ram:CalculatedAmount', 'amount', Decimal),
|
|
('ram:TypeCode', 'type'),
|
|
('ram:ExemptionReason', 'reason'),
|
|
('ram:BasisAmount', 'base', Decimal),
|
|
('ram:LineTotalBasisAmount', 'basetotal', Decimal),
|
|
('ram:AllowanceChargeBasisAmount', 'allowancebase', Decimal),
|
|
('ram:CategoryCode', 'category_code'),
|
|
('ram:ExemptionReasonCode', 'reason_code'),
|
|
('ram:TaxPointDate', 'taxdate', date),
|
|
('ram:DueDateTypeCode', 'duecode'),
|
|
('ram:RateApplicablePercent', 'percent', Decimal)])
|
|
if taxes:
|
|
self.parsed_data['payment']['taxes'] = taxes
|
|
|
|
xpath_payterms = xpath_payment + ['ram:SpecifiedTradePaymentTerms']
|
|
xpath_discount = ['ram:ApplicableTradePaymentDiscountTerms']
|
|
pay_terms = self._readxml_read_listdata(xmltree, xpath_payterms, [], [
|
|
('ram:Description', 'description'),
|
|
(['ram:DueDateDateTime', 'udt:DateTimeString'],
|
|
'duedate', date),
|
|
('ram:DirectDebitMandateID', 'mandat_id'),
|
|
('ram:PartialPaymentAmount', 'amount', Decimal),
|
|
(xpath_discount + ['ram:BasisDateTime', 'udt:DateTimeString'],
|
|
'discount_date', date),
|
|
(xpath_discount + ['ram:BasisPeriodMeasure'],
|
|
'discount_measure', Decimal),
|
|
(xpath_discount + ['ram:BasisAmount'],
|
|
'discount_base', Decimal),
|
|
(xpath_discount + ['ram:CalculationPercent'],
|
|
'discount_perc', Decimal),
|
|
(xpath_discount + ['ram:ActualDiscountAmount'],
|
|
'discount_amount', Decimal)
|
|
])
|
|
if pay_terms:
|
|
self.parsed_data['payment']['terms'] = pay_terms
|
|
|
|
# total
|
|
xpath_total = xpath_payment + [
|
|
'ram:SpecifiedTradeSettlementHeaderMonetarySummation']
|
|
total = self._readxml_read_listdata(xmltree, xpath_total, [], [
|
|
('ram:LineTotalAmount', 'amount', Decimal),
|
|
('ram:ChargeTotalAmount', 'charge', Decimal),
|
|
('ram:AllowanceTotalAmount', 'allowance', Decimal),
|
|
('ram:TaxBasisTotalAmount', 'taxbase', Decimal),
|
|
('ram:TaxTotalAmount', 'taxtotal', Decimal),
|
|
('ram:RoundingAmount', 'rounding', Decimal),
|
|
('ram:GrandTotalAmount', 'grand', Decimal),
|
|
('ram:TotalPrepaidAmount', 'prepaid', Decimal),
|
|
('ram:DuePayableAmount', 'duepayable', Decimal),
|
|
])
|
|
if total:
|
|
self.parsed_data['total'] = total[0]
|
|
|
|
def _readxml_getinvoiceline(self, invoice, line_data):
|
|
""" create invoice line in memory
|
|
|
|
Args:
|
|
line_data (dict): values from xml to create
|
|
invoice line
|
|
"""
|
|
pool = Pool()
|
|
Line = pool.get('account.invoice.line')
|
|
Tax = pool.get('account.tax')
|
|
Uom = pool.get('product.uom')
|
|
Configuration = pool.get('document.incoming.configuration')
|
|
ModelData = pool.get('ir.model.data')
|
|
|
|
cfg1 = Configuration.get_singleton()
|
|
|
|
def get_tax_by_percent(percent):
|
|
""" search for tax by percent of supplier tax
|
|
|
|
Args:
|
|
percent (Decimal): tax percent, 7% --> 0.07
|
|
|
|
Raises:
|
|
UserError: if no product category is configured or
|
|
no matching tax was found
|
|
|
|
Returns:
|
|
tuple: (model 'account.tax, model 'product.category')
|
|
"""
|
|
if not (cfg1 and cfg1.product_category):
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_no_prodcat_configured'))
|
|
|
|
for pcat in cfg1.product_category:
|
|
for s_tax in pcat.supplier_taxes:
|
|
# get current taxes of supplier-tax at
|
|
# invoice-date
|
|
current_taxes = Tax.compute(
|
|
[s_tax], Decimal('1'), 1.0, invoice.invoice_date)
|
|
|
|
# skip result of multiple or none taxes
|
|
if len(current_taxes) != 1:
|
|
continue
|
|
|
|
if (current_taxes[0]['tax'].rate == percent) and (
|
|
current_taxes[0]['tax'].type == 'percentage'):
|
|
# found it
|
|
return (s_tax, pcat)
|
|
|
|
# no product category found
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_no_prodcat_found',
|
|
percent=percent * Decimal('100.0')))
|
|
|
|
# uom
|
|
xml_uom = Uom(ModelData.get_id('product', 'uom_unit'))
|
|
for x_attr in line_data.get('attributes', []):
|
|
x_uom = x_attr.get('uom', None)
|
|
if not x_uom:
|
|
continue
|
|
units = Uom.search([('symbol', '=', x_uom)])
|
|
if units:
|
|
xml_uom = units[0]
|
|
|
|
line = Line(
|
|
invoice=invoice,
|
|
type='line',
|
|
unit=xml_uom,
|
|
quantity=line_data.get('quantity', {}).pop('billed', None),
|
|
unit_price=line_data.get('unit_net_price', {}).pop('amount', None))
|
|
line_no = line_data.pop('line_no', None)
|
|
|
|
# description
|
|
descr = [
|
|
'[%s]' % line_no if line_no else None,
|
|
line_data.pop('name', None),
|
|
line_data.pop('description', None)]
|
|
line.description = '; '.join([x for x in descr if x])
|
|
|
|
# get taxes and product-category from settings
|
|
tax_and_category = []
|
|
line_taxes = line_data.pop('taxes', [])
|
|
for x in line_taxes:
|
|
percent = x.get('percent', None)
|
|
if (x.get('type', '') == 'VAT') and (percent is not None):
|
|
percent = percent / Decimal('100')
|
|
tax_and_category.append(get_tax_by_percent(percent))
|
|
|
|
# check result
|
|
if len(line_taxes) != len(tax_and_category):
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_numtaxes_invalid',
|
|
descr=line.description,
|
|
taxin='|'.join([
|
|
str(x.get('percent', '-')) for x in line_taxes]),
|
|
taxout='|'.join([
|
|
str(x[0].rate * Decimal('100.0'))
|
|
for x in tax_and_category])))
|
|
|
|
# set taxes and account for product
|
|
line.taxes = [x[0] for x in tax_and_category]
|
|
expense_accounts = [
|
|
x[1].account_expense
|
|
for x in tax_and_category if x[1].account_expense]
|
|
if expense_accounts:
|
|
line.account = expense_accounts[0]
|
|
|
|
# check if calculated 'amount' matches with amount from xml-data
|
|
assert invoice.currency is not None
|
|
|
|
xml_amount = line_data.get('total', {}).pop('amount', None)
|
|
if xml_amount is not None:
|
|
if xml_amount != line.get_amount(None):
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_line_amount_invalid',
|
|
linetxt=line.description,
|
|
calcsum=Report.format_currency(
|
|
line.get_amount(None), None, invoice.currency),
|
|
xmlsum=Report.format_currency(
|
|
xml_amount, None, invoice.currency)))
|
|
|
|
# cleanup used values
|
|
for x in ['quantity', 'unit_net_price', 'total']:
|
|
if not line_data.get(x, {}):
|
|
del line_data[x]
|
|
|
|
# note of line
|
|
notes = [line_data.pop('line_note', None)]
|
|
# skipped xml-data
|
|
convert_notes = line_data.pop('convert_note', None)
|
|
if convert_notes:
|
|
notes.extend(
|
|
[' '] +
|
|
[gettext('document_incoming_invoice_xml.msg_convert_note')] +
|
|
convert_notes
|
|
if isinstance(convert_notes, list) else [str(convert_notes)])
|
|
# values not used to create invoice-line
|
|
if line_data:
|
|
notes.extend([
|
|
' ',
|
|
gettext(
|
|
'document_incoming_invoice_xml.msg_unused_linevalues'),
|
|
json.dumps(line_data, cls=JSONEncoder, indent=3)])
|
|
line.note = '\n'.join(x for x in notes if x)
|
|
line.on_change_invoice()
|
|
return line
|
|
|
|
def _readxml_read_listdata(self, xmldata, xpth, xtag, key_list):
|
|
""" read list of values from xml
|
|
|
|
Args:
|
|
xmldata (xtree): xml-tree
|
|
xpth (list): list of xml-tags until index-counter
|
|
pos (int): position in list
|
|
xtag (str): xml-tag to read as list
|
|
key_list (list): [('<xtag>', '<key>')]
|
|
|
|
Returns:
|
|
list: list of dict
|
|
"""
|
|
if isinstance(xtag, str):
|
|
xtag = [xtag]
|
|
xpath_list = xpth + xtag
|
|
num_listitem = len(xmldata.xpath(
|
|
self._readxml_xpath(xpath_list), namespaces=xmldata.nsmap))
|
|
listdata = []
|
|
for x in range(1, num_listitem + 1):
|
|
values = {}
|
|
for list_item in key_list:
|
|
(xkey, key, xtype) = (
|
|
list_item if len(list_item) == 3
|
|
else tuple(list(list_item) + [None]))
|
|
|
|
value = self._readxml_getvalue(
|
|
xmldata,
|
|
xpath_list + [x] +
|
|
(xkey if isinstance(xkey, list) else [xkey]),
|
|
vtype=xtype)
|
|
if value is not None:
|
|
values[key] = value
|
|
if values:
|
|
listdata.append(values)
|
|
return listdata
|
|
|
|
def _readxml_invoice_line(self, xmldata, xpth, pos):
|
|
""" read invoice-line from xml
|
|
|
|
Args:
|
|
xmldata (): xml-tree object
|
|
xpth (list): xpath to invoice-line
|
|
pos (int): number of line to read
|
|
|
|
Returns:
|
|
dict: invoice line data
|
|
"""
|
|
xpath_line = xpth + [pos]
|
|
result = {'convert_note': []}
|
|
result['line_no'] = self._readxml_getvalue(xmldata, xpath_line + [
|
|
'ram:AssociatedDocumentLineDocument', 'ram:LineID'])
|
|
|
|
result['line_note'] = '\n'.join(self._readxml_getvalue(
|
|
xmldata, xpath_line + [
|
|
'ram:AssociatedDocumentLineDocument', 'ram:IncludedNote',
|
|
'ram:Content'], allow_list=True))
|
|
|
|
for xkey, key in [
|
|
('ram:ID', 'prod_id'),
|
|
('ram:GlobalID', 'glob_id'),
|
|
('ram:SellerAssignedID', 'seller_id'),
|
|
('ram:BuyerAssignedID', 'buyer_id'),
|
|
('ram:IndustryAssignedID', 'industy_id'),
|
|
('ram:ModelID', 'model_id'),
|
|
('ram:Name', 'name'),
|
|
('ram:Description', 'description'),
|
|
('ram:BatchID', 'lot'),
|
|
('ram:BrandName', 'brand_name'),
|
|
('ram:ModelName', 'model_name'),
|
|
(['ram:OriginTradeCountry', 'ram:ID'], 'trade_country')
|
|
]:
|
|
result[key] = self._readxml_getvalue(xmldata, xpath_line + [
|
|
'ram:SpecifiedTradeProduct', xkey])
|
|
|
|
# attributes of product
|
|
result['attributes'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line, [
|
|
'ram:SpecifiedTradeProduct',
|
|
'ram:ApplicableProductCharacteristic'], [
|
|
('ram:TypeCode', 'code'),
|
|
('ram:Description', 'description'),
|
|
('ram:ValueMeasure', 'value', Decimal),
|
|
('ram:Value', 'uom')])
|
|
|
|
# classification of product
|
|
result['classification'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line, [
|
|
'ram:SpecifiedTradeProduct',
|
|
'ram:DesignatedProductClassification'], [
|
|
('ram:ClassCode', 'code'),
|
|
('ram:ClassName', 'name')])
|
|
|
|
# serial-numbers of product
|
|
result['serialno'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line, [
|
|
'ram:SpecifiedTradeProduct',
|
|
'ram:IndividualTradeProductInstance'], [
|
|
('ram:BatchID', 'lot'),
|
|
('ram:SupplierAssignedSerialID', 'serial')])
|
|
|
|
# referenced product
|
|
result['refprod'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line,
|
|
['ram:SpecifiedTradeProduct', 'ram:IncludedReferencedProduct'], [
|
|
('ram:ID', 'id'),
|
|
('ram:GlobalID', 'global_id'),
|
|
('ram:SellerAssignedID', 'seller_id'),
|
|
('ram:BuyerAssignedID', 'buyer_id'),
|
|
('ram:Name', 'name'),
|
|
('ram:Description', 'description'),
|
|
('ram:UnitQuantity', 'quantity', Decimal),
|
|
])
|
|
|
|
# net price
|
|
xpath_netprice = xpath_line + [
|
|
'ram:SpecifiedLineTradeAgreement', 'ram:NetPriceProductTradePrice']
|
|
if self._readxml_getvalue(xmldata, xpath_netprice):
|
|
result['unit_net_price'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line, [
|
|
'ram:SpecifiedLineTradeAgreement',
|
|
'ram:NetPriceProductTradePrice'], [
|
|
('ram:ChargeAmount', 'amount', Decimal),
|
|
('ram:BasisQuantity', 'basequantity', Decimal)])[0]
|
|
# notice ignored field
|
|
if self._readxml_getvalue(xmldata, xpath_netprice + [
|
|
'ram:AppliedTradeAllowanceCharge']):
|
|
result['convert_note'].append(
|
|
'skip: ' + self._readxml_xpath(
|
|
xpath_netprice + ['ram:AppliedTradeAllowanceCharge']))
|
|
|
|
# gross price
|
|
xpath_grossprice = xpath_line + [
|
|
'ram:SpecifiedLineTradeAgreement',
|
|
'ram:GrossPriceProductTradePrice']
|
|
if self._readxml_getvalue(xmldata, xpath_grossprice):
|
|
result['unit_gross_price'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line, [
|
|
'ram:SpecifiedLineTradeAgreement',
|
|
'ram:GrossPriceProductTradePrice'], [
|
|
('ram:ChargeAmount', 'amount', Decimal),
|
|
('ram:BasisQuantity', 'basequantity', Decimal)])[0]
|
|
# notice ignored field
|
|
if self._readxml_getvalue(xmldata, xpath_grossprice + [
|
|
'ram:AppliedTradeAllowanceCharge']):
|
|
result['convert_note'].append(
|
|
'skip: ' + self._readxml_xpath(
|
|
xpath_grossprice +
|
|
['ram:AppliedTradeAllowanceCharge']))
|
|
|
|
# quantity
|
|
xpath_quantity = xpath_line + ['ram:SpecifiedLineTradeDelivery']
|
|
result['quantity'] = self._readxml_read_listdata(
|
|
xmldata, xpath_line, [
|
|
'ram:SpecifiedLineTradeDelivery'], [
|
|
('ram:BilledQuantity', 'billed', Decimal),
|
|
('ram:ChargeFreeQuantity', 'chargefree', Decimal),
|
|
('ram:PackageQuantity', 'package', Decimal),
|
|
])[0]
|
|
# notice ignored fields
|
|
for x in [
|
|
'ShipToTradeParty', 'UltimateShipToTradeParty',
|
|
'ActualDeliverySupplyChainEvent',
|
|
'DespatchAdviceReferencedDocument',
|
|
'ReceivingAdviceReferencedDocument',
|
|
'DeliveryNoteReferencedDocument']:
|
|
xp_to_check = xpath_quantity + ['ram:' + x]
|
|
if self._readxml_getvalue(xmldata, xp_to_check):
|
|
result['convert_note'].append(
|
|
'skip: ' + self._readxml_xpath(xp_to_check))
|
|
|
|
# taxes
|
|
xpath_trade = xpath_line + ['ram:SpecifiedLineTradeSettlement']
|
|
result['taxes'] = self._readxml_read_listdata(xmldata, xpath_line, [
|
|
'ram:SpecifiedLineTradeSettlement',
|
|
'ram:ApplicableTradeTax'], [
|
|
('ram:CalculatedAmount', 'amount', Decimal),
|
|
('ram:TypeCode', 'type'),
|
|
('ram:ExemptionReason', 'reason'),
|
|
('ram:BasisAmount', 'base', Decimal),
|
|
('ram:LineTotalBasisAmount', 'basetotal', Decimal),
|
|
('ram:AllowanceChargeBasisAmount', 'allowancebase', Decimal),
|
|
('ram:CategoryCode', 'category_code'),
|
|
('ram:ExemptionReasonCode', 'reason_code'),
|
|
('ram:TaxPointDate', 'taxdate', date),
|
|
('ram:DueDateTypeCode', 'duecode'),
|
|
('ram:RateApplicablePercent', 'percent', Decimal)])
|
|
|
|
# total amounts
|
|
result['total'] = self._readxml_read_listdata(xmldata, xpath_line, [
|
|
'ram:SpecifiedLineTradeSettlement',
|
|
'ram:SpecifiedTradeSettlementLineMonetarySummation'], [
|
|
('ram:LineTotalAmount', 'amount', Decimal),
|
|
('ram:ChargeTotalAmount', 'charge', Decimal),
|
|
('ram:AllowanceTotalAmount', 'fee', Decimal),
|
|
('ram:TaxTotalAmount', 'tax', Decimal),
|
|
('ram:GrandTotalAmount', 'grand', Decimal),
|
|
('ram:TotalAllowanceChargeAmount', 'feecharge', Decimal),
|
|
])[0]
|
|
|
|
# notice ignored fields
|
|
for x in [
|
|
'BillingSpecifiedPeriod', 'SpecifiedTradeAllowanceCharge',
|
|
'ActualDeliverySupplyChainEvent',
|
|
'InvoiceReferencedDocument',
|
|
'AdditionalReferencedDocument',
|
|
'ReceivableSpecifiedTradeAccountingAccount']:
|
|
xp_to_check = xpath_trade + ['ram:' + x]
|
|
if self._readxml_getvalue(xmldata, xp_to_check):
|
|
result['convert_note'].append(
|
|
'skip: ' + self._readxml_xpath(xp_to_check))
|
|
|
|
# skip None values
|
|
return {x: result[x] for x in result.keys() if result[x]}
|
|
|
|
def _readxml_convertdate(self, date_string):
|
|
""" convert date-string to python-date
|
|
|
|
Args:
|
|
date_string (_type_): _description_
|
|
"""
|
|
if not date_string:
|
|
return None
|
|
try:
|
|
return datetime.strptime(date_string, '%Y%m%d').date()
|
|
except ValueError:
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_convert_error',
|
|
msg='cannot convert %(val)s' % {
|
|
'val': date_string}))
|
|
|
|
@classmethod
|
|
def _readxml_check_content(cls, document):
|
|
""" try to detect content, fire exception if fail
|
|
|
|
Args:
|
|
document (record): model document.incoming
|
|
"""
|
|
xml_data = etree.fromstring(document.data)
|
|
try:
|
|
fx_flavour = facturx.get_flavor(xml_data)
|
|
fx_level = facturx.get_level(xml_data)
|
|
document._facturx_detect_content(fx_flavour, fx_level)
|
|
document.save()
|
|
except Exception as e1:
|
|
raise UserError(gettext(
|
|
'document_incoming_invoice_xml.msg_convert_error',
|
|
msg='%(name)s [%(flavour)s|%(level)s] %(msg)s' % {
|
|
'name': document.name or '-',
|
|
'flavour': fx_flavour,
|
|
'level': fx_level,
|
|
'msg': str(e1)}))
|
|
|
|
def _facturx_detect_content(self, flavour=None, level=None):
|
|
""" check xml-data against xsd of XRechnung and FacturX,
|
|
begin with extended, goto minimal, then xrechnung
|
|
|
|
Returns:
|
|
tuple: ('xsd tyle', '<function to read xml>', <xml etree>)
|
|
defaults to None if not detected
|
|
"""
|
|
xml_data = etree.fromstring(self.data)
|
|
|
|
for xsdpath, xsdtype, funcname in xml_types:
|
|
if flavour == 'factur-x' and level:
|
|
if not (xsdtype.lower().startswith('factur-x') and
|
|
xsdtype.lower().endswith(level)):
|
|
# skip check if xml-content is already known
|
|
continue
|
|
|
|
fname = os.path.join(*[os.path.split(__file__)[0]] + xsdpath)
|
|
schema = etree.XMLSchema(etree.parse(fname))
|
|
try:
|
|
schema.assertValid(xml_data)
|
|
self.xsd_type = xsdtype
|
|
return (xsdtype, funcname, xml_data)
|
|
except etree.DocumentInvalid as e1:
|
|
# fire exception for knows xml-content
|
|
if flavour == 'factur-x' and level:
|
|
raise e1
|
|
pass
|
|
return None
|
|
|
|
# end Incoming
|