From 1b440525ac8658cbc1f8650e273dbda7d247d0e1 Mon Sep 17 00:00:00 2001 From: Frederik Jaeckel Date: Thu, 9 Jan 2025 17:01:34 +0100 Subject: [PATCH] import invoice-line --- document.py | 287 ++++++++++++++++++++++++++++++------- tests/facturx-extended.xml | 100 +++++++++++++ 2 files changed, 339 insertions(+), 48 deletions(-) diff --git a/document.py b/document.py index 6333629..16f9d30 100644 --- a/document.py +++ b/document.py @@ -8,6 +8,7 @@ import os.path from lxml import etree from datetime import datetime +from decimal import Decimal from trytond.pool import PoolMeta, Pool from trytond.transaction import Transaction from trytond.exceptions import UserError @@ -70,48 +71,46 @@ class Incoming(metaclass=PoolMeta): invoice = xml_read_func(invoice, xmltree) return invoice + def _readxml_xpath(self, tags): + """ generate xpath + + Args: + tags (list): list of string or integer to build path + """ + parts = [] + for x in tags: + if isinstance(x, str): + parts.append(x) + elif isinstance(x, int): + if parts[-1].endswith(']'): + raise ValueError('multiple list selector') + parts[-1] += '[%d]' % x + result = '/' + '/'.join(parts) + return result + def _readxml_getvalue( - self, xmltree, tags, vtype=None, allow_list=False, - childs=[]): + self, xmltree, tags, vtype=None, allow_list=False): """ read 'text'-part from xml-xpath, convert to 'vtype' Args: tags (list): list of tags to build xpath vtype (type-class, optional): to convert value of text-part - (if not childs). Defaults to None. + Defaults to None. allow_list (boolean, optional): get result as list of values, Defaults to False. - childs (list): read child-items of selected node - [('ns', 'tag', ),...], - if emtpy: read text-part of selected node, Defaults to [] Returns: various: converted value or None """ result = [] - xpath = '/' + '/'.join(tags) + xpath = self._readxml_xpath(tags) nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap) - # dict to find children of selected node - childs_dict = { - '{%(ns)s}%(tag)s' % {'ns': xmltree.nsmap[x[0]], 'tag': x[1]}: { - 'type': x[2], 'tag': x[1]} - for x in childs} - if nodes: for node1 in nodes: - if not childs_dict: - result.append( - node1.text if vtype is None else vtype(node1.text)) - else: - values = {} - for x in node1.getchildren(): - if x.tag in childs_dict.keys(): - values[childs_dict[x.tag]['tag']] = ( - x.text if childs_dict[x.tag]['type'] is None - else childs_dict[x.tag]['type'](x.text)) - result.append(values) + result.append( + node1.text if vtype is None else vtype(node1.text)) if not allow_list: break @@ -134,7 +133,7 @@ class Incoming(metaclass=PoolMeta): various: converted value or None """ result = None - xpath = '/' + '/'.join(tags) + xpath = self._readxml_xpath(tags) nodes = xmltree.xpath(xpath, namespaces=xmltree.nsmap) if nodes: result = nodes[0].attrib.get(attrib, None) @@ -300,28 +299,30 @@ class Incoming(metaclass=PoolMeta): config = Configuration.get_singleton() + # rsm:CrossIndustryInvoice + xpath_cross_ind = ['rsm:CrossIndustryInvoice'] + # rsm:CrossIndustryInvoice/rsm:ExchangedDocument + xpath_exchg_doc = xpath_cross_ind + ['rsm:ExchangedDocument'] + # check invoice-type: # allowed codes 380 incoice, 381 credit note - inv_code = self._readxml_getvalue(xmltree, [ - 'rsm:CrossIndustryInvoice', - 'rsm:ExchangedDocument', 'ram:TypeCode'], int) + inv_code = self._readxml_getvalue( + xmltree, xpath_exchg_doc + ['ram:TypeCode'], int) if inv_code not in [380, 381]: raise UserError(gettext( 'document_incoming_invoice_xml.msg_convert_error', msg='invalid type-code: %(code)s (expect: 380, 381)' % { 'code': str(inv_code)})) - invoice_number = self._readxml_getvalue(xmltree, [ - 'rsm:CrossIndustryInvoice', - 'rsm:ExchangedDocument', 'ram:ID']) + invoice_number = self._readxml_getvalue( + xmltree, xpath_exchg_doc + ['ram:ID']) if config and config.number_target == 'number': invoice.number = invoice_number else: invoice.reference = invoice_number # invoice-date - date_path = [ - 'rsm:CrossIndustryInvoice', 'rsm:ExchangedDocument', + date_path = xpath_exchg_doc + [ 'ram:IssueDateTime', 'udt:DateTimeString'] date_format = self._readxml_getattrib(xmltree, date_path, 'format') if date_format != '102': @@ -333,12 +334,19 @@ class Incoming(metaclass=PoolMeta): self._readxml_getvalue(xmltree, date_path)) # IncludedNote, 1st line --> 'description', 2nd ff. --> 'comment' - note_list = self._readxml_getvalue(xmltree, [ - 'rsm:CrossIndustryInvoice', - 'rsm:ExchangedDocument', 'ram:IncludedNote'], - allow_list=True, - childs=[('ram', 'Content', str), ('ram', 'ContentCode', str), - ('ram', 'SubjectCode', str)]) + xpath_notes = xpath_exchg_doc + ['ram:IncludedNote'] + # count number of notes + num_nodes = len(xmltree.xpath( + self._readxml_xpath(xpath_notes), namespaces=xmltree.nsmap)) + + # read notes and their fields + note_list = [] + for x in range(1, num_nodes + 1): + note_list.append({ + y: self._readxml_getvalue( + xmltree, xpath_notes + [x, 'ram:%s' % y]) + for y in ['Content', 'ContentCode', 'SubjectCode'] + }) if note_list: invoice.description = note_list[0].get('Content', None) @@ -351,12 +359,20 @@ class Incoming(metaclass=PoolMeta): 'msg': x.get('Content', ''), } for x in note_list[1:]]) + # rsm:CrossIndustryInvoice/rsm:SupplyChainTradeTransaction + xpath_suppl_chain = xpath_cross_ind + [ + 'rsm:SupplyChainTradeTransaction'] + # rsm:CrossIndustryInvoice/ + # rsm:SupplyChainTradeTransaction/ + # ram:ApplicableHeaderTradeAgreement + xpath_appl_head_agree = xpath_suppl_chain + [ + 'ram:ApplicableHeaderTradeAgreement'] + # supplier party add_party = config and config.create_supplier - seller_party = self._readxml_party_data(xmltree, [ - 'rsm:CrossIndustryInvoice', 'rsm:SupplyChainTradeTransaction', - 'ram:ApplicableHeaderTradeAgreement', - 'ram:SellerTradeParty'], create_party=add_party) + seller_party = self._readxml_party_data( + xmltree, xpath_appl_head_agree + ['ram:SellerTradeParty'], + create_party=add_party) if seller_party: if 'party' in seller_party.keys(): invoice.party = seller_party['party'] @@ -369,10 +385,9 @@ class Incoming(metaclass=PoolMeta): for x in seller_party.keys()]))) # company party - buyer_party = self._readxml_party_data(xmltree, [ - 'rsm:CrossIndustryInvoice', 'rsm:SupplyChainTradeTransaction', - 'ram:ApplicableHeaderTradeAgreement', - 'ram:BuyerTradeParty'], create_party=False) + buyer_party = self._readxml_party_data( + xmltree, xpath_appl_head_agree + ['ram:BuyerTradeParty'], + create_party=False) # check if we found our company if config and not config.accept_other_company: company_party_id = self._readxml_find_party(buyer_party) @@ -384,10 +399,186 @@ class Incoming(metaclass=PoolMeta): buyer_party[x].replace('\n', '; ') for x in buyer_party.keys()]))) - # lines of invoice + # invoice - lines + # rsm:CrossIndustryInvoice/ + # rsm:SupplyChainTradeTransaction/ + # ram:IncludedSupplyChainTradeLineItem + xpath_line_item = xpath_suppl_chain + [ + 'ram:IncludedSupplyChainTradeLineItem'] + # get number of invoice-lines + num_lines = len(xmltree.xpath( + self._readxml_xpath(xpath_line_item), namespaces=xmltree.nsmap)) + print('\n## num_lines:', num_lines) + lines_data = [] + for x in range(1, num_lines + 1): + lines_data.append( + self._readxml_invoice_line(xmltree, xpath_line_item, x)) + print('\n## lines_data:', lines_data) + raise ValueError('stop') return invoice + def _readxml_invoice_line(self, xmldata, xpth, pos): + """ read invoice-line from xml + + Args: + xmldata (): xml-tree object + xpth (list): xpath to invoice-line + pos (int): number of line to read + + Returns: + dict: invoice line data + """ + def read_listdata(xtag, key_list): + """ read list of values from xml + + Args: + xtag (str): xml-tag to read as list + key_list (list): [('', '')] + + Returns: + list: list of dict + """ + if isinstance(xtag, str): + xtag = [xtag] + xpath_list = xpth + [pos] + xtag + num_listitem = len(xmldata.xpath( + self._readxml_xpath(xpath_list), namespaces=xmldata.nsmap)) + listdata = [] + for x in range(1, num_listitem + 1): + values = {} + for list_item in key_list: + (xkey, key, xtype) = ( + list_item if len(list_item) == 3 + else tuple(list(list_item) + [None])) + + value = self._readxml_getvalue( + xmldata, xpath_list + [x, xkey], vtype=xtype) + if value is not None: + values[key] = value + if values: + listdata.append(values) + return listdata + + result = {'convert_note': []} + result['line_no'] = self._readxml_getvalue(xmldata, xpth + [pos] + [ + 'ram:AssociatedDocumentLineDocument', 'ram:LineID']) + + result['line_note'] = '\n'.join(self._readxml_getvalue( + xmldata, xpth + [pos] + [ + 'ram:AssociatedDocumentLineDocument', 'ram:IncludedNote', + 'ram:Content'], allow_list=True)) + + for xkey, key in [ + ('ram:ID', 'prod_id'), + ('ram:GlobalID', 'glob_id'), + ('ram:SellerAssignedID', 'seller_id'), + ('ram:BuyerAssignedID', 'buyer_id'), + ('ram:IndustryAssignedID', 'industy_id'), + ('ram:ModelID', 'model_id'), + ('ram:Name', 'name'), + ('ram:Description', 'description'), + ('ram:BatchID', 'lot'), + ('ram:BrandName', 'brand_name'), + ('ram:ModelName', 'model_name'), + ('ram:OriginTradeCountry', 'trade_country') + ]: + result[key] = self._readxml_getvalue(xmldata, xpth + [pos] + [ + 'ram:SpecifiedTradeProduct', xkey]) + + # attributes of product + result['attributes'] = read_listdata([ + 'ram:SpecifiedTradeProduct', + 'ram:ApplicableProductCharacteristic'], [ + ('ram:TypeCode', 'code'), + ('ram:Description', 'description'), + ('ram:ValueMeasure', 'uom'), + ('ram:ValueY', 'value')]) + + # classification of product + result['classification'] = read_listdata([ + 'ram:SpecifiedTradeProduct', + 'ram:DesignatedProductClassification'], [ + ('ram:ClassCode', 'code'), + ('ram:ClassName', 'name')]) + + # serial-numbers of product + result['serialno'] = read_listdata([ + 'ram:SpecifiedTradeProduct', + 'ram:IndividualTradeProductInstance'], [ + ('ram:BatchID', 'lot'), + ('ram:SupplierAssignedSerialID', 'serial')]) + + # referenced product + result['refprod'] = read_listdata( + ['ram:SpecifiedTradeProduct', 'ram:IncludedReferencedProduct'], [ + ('ram:ID', 'id'), + ('ram:GlobalID', 'global_id'), + ('ram:SellerAssignedID', 'seller_id'), + ('ram:BuyerAssignedID', 'buyer_id'), + ('ram:Name', 'name'), + ('ram:Description', 'description'), + ('ram:UnitQuantity', 'quantity', Decimal), + ]) + + # net price + xpath_netprice = xpth + [pos] + [ + 'ram:SpecifiedLineTradeAgreement', 'ram:NetPriceProductTradePrice'] + if self._readxml_getvalue(xmldata, xpath_netprice): + result['unit_net_price'] = read_listdata([ + 'ram:SpecifiedLineTradeAgreement', + 'ram:NetPriceProductTradePrice'], [ + ('ram:ChargeAmount', 'amount', Decimal), + ('ram:BasisQuantity', 'basequantity', Decimal)]) + # notice ignored field + if self._readxml_getvalue(xmldata, xpath_netprice + [ + 'ram:AppliedTradeAllowanceCharge']): + result['convert_note'].append( + 'skip: ' + self._readxml_xpath( + xpath_netprice + ['ram:AppliedTradeAllowanceCharge'])) + + # gross price + xpath_grossprice = xpth + [pos] + [ + 'ram:SpecifiedLineTradeAgreement', + 'ram:GrossPriceProductTradePrice'] + if self._readxml_getvalue(xmldata, xpath_grossprice): + result['unit_gross_price'] = read_listdata([ + 'ram:SpecifiedLineTradeAgreement', + 'ram:GrossPriceProductTradePrice'], [ + ('ram:ChargeAmount', 'amount', Decimal), + ('ram:BasisQuantity', 'basequantity', Decimal)]) + # notice ignored field + if self._readxml_getvalue(xmldata, xpath_grossprice + [ + 'ram:AppliedTradeAllowanceCharge']): + result['convert_note'].append( + 'skip: ' + self._readxml_xpath( + xpath_grossprice + ['ram:AppliedTradeAllowanceCharge'])) + + # quantity + xpath_quantity = xpth + [pos] + ['ram:SpecifiedLineTradeDelivery'] + result['quantity'] = read_listdata( + ['ram:SpecifiedLineTradeDelivery'], [ + ('ram:BilledQuantity', 'billed', Decimal), + ('ram:ChargeFreeQuantity', 'chargefree', Decimal), + ('ram:PackageQuantity', 'package', Decimal), + ]) + # notice ignored fields + for x in [ + 'ShipToTradeParty', 'UltimateShipToTradeParty', + 'ActualDeliverySupplyChainEvent', + 'DespatchAdviceReferencedDocument', + 'ReceivingAdviceReferencedDocument', + 'DeliveryNoteReferencedDocument']: + xp_to_check = xpath_quantity + ['ram:' + x] + if self._readxml_getvalue(xmldata, xp_to_check): + result['convert_note'].append( + 'skip: ' + self._readxml_xpath(xp_to_check)) + + # taxes + + # skip None values + return {x: result[x] for x in result.keys() if result[x]} + def _readxml_convertdate(self, date_string): """ convert date-string to python-date diff --git a/tests/facturx-extended.xml b/tests/facturx-extended.xml index 85eed53..57f2f9e 100644 --- a/tests/facturx-extended.xml +++ b/tests/facturx-extended.xml @@ -52,6 +52,106 @@ + + + 2 + + Description of Line 2 + + + Description of Line 2, line 2 + + + + 2 + 3 + 4 + 5 + 6 + 7 + Name of Product 2 + Description of Product 2 + batch23 + Brand-Name + Model-Name + + 123 + Kilogram + kg + 23 + + + 3c + product-class 1 + + + 22 + 1234 + + DE + + 1 + 2 + 3 + 4 + ref-prod-1 + description of ref-prod-1 + 1.0 + + + + + 800.00 + 1.0 + + + 950.00 + 1.0 + + + + 1.5 + 1.5 + 20240101 + + + + VAT + S + 19.00 + + + 1200.00 + + + + + + 3 + + + Name of Product 3 + Description of Product 3 + + + + 150.00 + + + + 2.0 + + + + VAT + S + 7.00 + + + 300.00 + + + Name of the Supplier