Package pyx12 :: Module map_if
[hide private]

Source Code for Module pyx12.map_if

   1  ###################################################################### 
   2  # Copyright (c) 2001-2005 Kalamazoo Community Mental Health Services, 
   3  #   John Holland <jholland@kazoocmh.org> <john@zoner.org> 
   4  # All rights reserved. 
   5  # 
   6  # This software is licensed as described in the file LICENSE.txt, which 
   7  # you should have received as part of this distribution. 
   8  # 
   9  ###################################################################### 
  10   
  11  #    $Id: map_if.py 1149 2007-08-29 15:10:30Z johnholland $ 
  12   
  13  """ 
  14  Interface to a X12N IG Map 
  15  """ 
  16  import cPickle 
  17  import libxml2 
  18  import libxslt 
  19  import logging 
  20  import os.path 
  21  #import pdb 
  22  import string 
  23  import sys 
  24  import re 
  25  from types import * 
  26  from stat import ST_MTIME 
  27  from stat import ST_SIZE 
  28   
  29  # Intrapackage imports 
  30  import errors 
  31  from errors import IsValidError 
  32  import codes 
  33  import dataele 
  34   
  35  #Global Variables 
  36  NodeType = {'element_start': 1, 'element_end': 15, 'attrib': 2, 'text': 3,  
  37      'CData': 4, 'entity_ref': 5, 'entity_decl':6, 'pi': 7, 'comment': 8,  
  38      'doc': 9, 'dtd': 10, 'doc_frag': 11, 'notation': 12} 
  39   
  40  MAXINT = 2147483647 
  41   
  42  ############################################################ 
  43  # X12 Node Superclass 
  44  ############################################################ 
45 -class x12_node(object):
46 """ 47 X12 Node Superclass 48 """
49 - def __init__(self):
50 self.id = None 51 self.name = None 52 self.parent = None 53 # self.prev_node = None 54 # self.next_node = None 55 self.children = [] 56 self.path = ''
57 58 # def __del__(self): 59 # pass 60
61 - def __len__(self):
62 return len(self.children)
63
64 - def __repr__(self):
65 """ 66 @rtype: string 67 """ 68 return self.name
69
70 - def getnodebypath(self, path):
71 """ 72 """ 73 pathl = path.split('/') 74 if len(pathl) == 0: return None 75 for child in self.children: 76 if child.id.lower() == pathl[0].lower(): 77 if len(pathl) == 1: 78 return child 79 else: 80 if child.is_loop(): 81 return child.getnodebypath(string.join(pathl[1:],'/')) 82 else: 83 break 84 raise errors.EngineError, 'getnodebypath failed. Path "%s" not found' % path
85
86 - def get_child_count(self):
87 return len(self.children)
88
89 - def get_child_node_by_idx(self, idx):
90 """ 91 @param idx: zero based 92 """ 93 if idx >= len(self.children): 94 return None 95 else: 96 return self.children[idx]
97
98 - def get_path(self):
99 """ 100 @return: path - XPath style 101 @rtype: string 102 """ 103 parent_path = self.parent.get_path() 104 if parent_path == '/': 105 return '/' + self.path 106 else: 107 return parent_path + '/' + self.path
108 109 # def walk_tree(self, seg): 110 # pass 111 # handle loop events, pop, push 112 # only concerned with loop and segment nodes 113 114 # repeat of seg 115 # next seg in loop 116 # repeat of loop 117 # child loop 118 # sibling loop 119 # parent loop 120 # - first id element == 121
122 - def is_first_seg_in_loop(self):
123 """ 124 @rtype: boolean 125 """ 126 return False
127
128 - def is_map_root(self):
129 """ 130 @rtype: boolean 131 """ 132 return False
133
134 - def is_loop(self):
135 """ 136 @rtype: boolean 137 """ 138 return False
139
140 - def is_segment(self):
141 """ 142 @rtype: boolean 143 """ 144 return False
145
146 - def is_element(self):
147 """ 148 @rtype: boolean 149 """ 150 return False
151
152 - def is_composite(self):
153 """ 154 @rtype: boolean 155 """ 156 return False
157 158 159 # def debug_print(self): 160 # sys.stdout.write('%s%s %s %s %s\n' % (str(' '*self.base_level), \ 161 # self.base_name, self.base_level, self.id, self.name)) 162 # for node in self.children: 163 # node.debug_print() 164 165 166 ############################################################ 167 # Map file interface 168 ############################################################
169 -class map_if(x12_node):
170 """ 171 Map file interface 172 """
173 - def __init__(self, reader, param):
174 """ 175 @param reader: libxml2 TextReader 176 @param param: map of parameters 177 """ 178 #codes = codes.ExternalCodes() 179 #tab = Indent() 180 x12_node.__init__(self) 181 self.children = None 182 self.pos_map = {} 183 index = 0 184 cur_name = '' 185 self.cur_path = '/transaction' 186 self.path = '/' 187 self.cur_level = -1 188 self.base_level = 0 189 self.base_name = '' 190 self.index = 0 191 self.src_version = '$Revision: 1149 $' 192 193 self.id = None 194 self.name = None 195 196 self.cur_iter_node = self 197 198 self.reader = reader 199 self.param = param 200 #global codes 201 self.ext_codes = codes.ExternalCodes(param.get('map_path'), \ 202 param.get('exclude_external_codes')) 203 self.data_elements = dataele.DataElements(param.get('map_path')) 204 #try: 205 # map_path = param.get('map_path') 206 # self.reader = libxml2.newTextReaderFilename(os.path.join(map_path, \ 207 # map_file)) 208 #except: 209 # raise errors.GSError, 'Map file not found: %s' % (map_file) 210 try: 211 ret = self.reader.Read() 212 if ret == -1: 213 raise errors.XML_Reader_Error, 'Read Error' 214 elif ret == 0: 215 raise errors.XML_Reader_Error, 'End of Map File' 216 while ret == 1: 217 #print 'map_if', self.reader.NodeType(), self.reader.Depth(), self.reader.Name() 218 tmpNodeType = self.reader.NodeType() 219 if tmpNodeType == NodeType['element_start']: 220 # if self.reader.Name() in ('map', 'transaction', 'loop', 'segment', 'element'): 221 # print 't'*self.reader.Depth(), self.reader.Depth(), \ 222 # self.base_level, self.reader.NodeType(), self.reader.Name() 223 #sys.stdout.write('%s%i %s %s %s\n') % ('\t'*self.reader.Depth(), \ 224 # self.reader.Depth(), self.base_level, self.reader.Name()) 225 cur_name = self.reader.Name() 226 if cur_name == 'transaction': 227 self.base_level = self.reader.Depth() 228 self.base_name = 'transaction' 229 while reader.MoveToNextAttribute(): 230 if reader.Name() == 'xid': 231 self.id = reader.Value() 232 elif cur_name == 'segment': 233 seg_node = segment_if(self, self, index) 234 try: 235 self.pos_map[seg_node.pos].append(seg_node) 236 except KeyError: 237 self.pos_map[seg_node.pos] = [seg_node] 238 #self.children.append(segment_if(self, self, index)) 239 #if len(self.children) > 1: 240 # self.children[-1].prev_node = self.children[-2] 241 # self.children[-2].next_node = self.children[-1] 242 index += 1 243 elif cur_name == 'loop': 244 loop_node = loop_if(self, self, index) 245 try: 246 self.pos_map[loop_node.pos].append(loop_node) 247 except KeyError: 248 self.pos_map[loop_node.pos] = [loop_node] 249 #self.children.append(loop_if(self, self, index)) 250 #if len(self.children) > 1: 251 # self.children[-1].prev_node = self.children[-2] 252 # self.children[-2].next_node = self.children[-1] 253 index += 1 254 255 #if self.cur_level < self.reader.Depth(): 256 # self.cur_path = os.path.join(self.cur_path, cur_name) 257 #elif self.cur_level > self.reader.Depth(): 258 # self.cur_path = os.path.dirname(self.cur_path) 259 self.cur_level = self.reader.Depth() 260 elif tmpNodeType == NodeType['element_end']: 261 #print '--', self.reader.Name(), self.base_level, \ 262 # self.self.reader.Depth(), self.reader.Depth() <= self.base_level 263 #print self.reader.Depth(), self.base_level, self.reader.NodeType(), self.reader.Name() 264 if self.reader.Depth() <= self.base_level: 265 ret = self.reader.Read() 266 if ret == -1: 267 raise errors.XML_Reader_Error, 'Read Error' 268 elif ret == 0: 269 raise errors.XML_Reader_Error, 'End of Map File' 270 break 271 #if cur_name == 'transaction': 272 # pass 273 cur_name = '' 274 275 elif tmpNodeType == NodeType['text'] and self.base_level + 2 == self.reader.Depth(): 276 #print cur_name, self.reader.Value() 277 #if cur_name == 'id' and self.base_name == 'transaction': 278 # self.id = self.reader.Value() 279 if cur_name == 'name' and self.base_name == 'transaction': 280 self.name = self.reader.Value() 281 282 ret = self.reader.Read() 283 if ret == -1: 284 raise errors.XML_Reader_Error, 'Read Error' 285 elif ret == 0: 286 raise errors.XML_Reader_Error, 'End of Map File' 287 except errors.XML_Reader_Error: 288 pass 289 290 del self.reader
291 292 #def __del__(self): 293 # print 'Map root de-cronstructor' 294
295 - def debug_print(self):
296 sys.stdout.write(self.__repr__()) 297 pos_keys = self.pos_map.keys() 298 pos_keys.sort() 299 for ord1 in pos_keys: 300 for node in self.pos_map[ord1]: 301 node.debug_print()
302
303 - def __len__(self):
304 i = 0 305 pos_keys = self.pos_map.keys() 306 pos_keys.sort() 307 for ord1 in pos_keys: 308 i += len(self.pos_map[ord1]) 309 return i
310
311 - def get_child_count(self):
312 return self.__len__()
313
314 - def get_first_seg(self):
315 pos_keys = self.pos_map.keys() 316 pos_keys.sort() 317 if len(pos_keys) > 0 and self.pos_map[pos_keys[0]][0].is_segment(): 318 return self.pos_map[pos_keys[0]][0] 319 else: 320 return None
321
322 - def __repr__(self):
323 """ 324 @rtype: string 325 """ 326 #out = '%s%s %s %s %s\n' % (str(' '*self.base_level), \ 327 # self.base_name, self.base_level, self.id, self.name) 328 #out = '%s%s' % (str(' '*self.base_level), self.base_name) 329 #out += '%sid %s\n' % (str(' '*(self.base_level+1)), self.id) 330 #out += '%sname %s\n' % (str(' '*(self.base_level+1)), self.name) 331 return '%s\n' % (self.id)
332
333 - def _path_parent(self):
334 """ 335 @rtype: string 336 """ 337 return os.path.basename(os.path.dirname(self.cur_path))
338
339 - def get_path(self):
340 """ 341 @rtype: string 342 """ 343 return self.path
344
345 - def get_child_node_by_idx(self, idx):
346 """ 347 @param idx: zero based 348 """ 349 raise errors.EngineError, 'map_if.get_child_node_by_idx is not a valid call'
350
351 - def getnodebypath(self, path):
352 """ 353 @param path: Path string; /1000/2000/2000A/NM102-3 354 @type path: string 355 """ 356 pathl = path.split('/')[1:] 357 if len(pathl) == 0: return None 358 #logger.debug('%s %s %s' % (self.base_name, self.id, pathl[1])) 359 pos_keys = self.pos_map.keys() 360 pos_keys.sort() 361 for ord1 in pos_keys: 362 for child in self.pos_map[ord1]: 363 if child.id.lower() == pathl[0].lower(): 364 if len(pathl) == 1: 365 return child 366 else: 367 return child.getnodebypath(string.join(pathl[1:],'/')) 368 raise errors.EngineError, 'getnodebypath failed. Path "%s" not found' % path
369
370 - def is_map_root(self):
371 """ 372 @rtype: boolean 373 """ 374 return True
375
376 - def reset_child_count(self):
377 """ 378 Set cur_count of child nodes to zero 379 """ 380 pos_keys = self.pos_map.keys() 381 pos_keys.sort() 382 for ord1 in pos_keys: 383 for child in self.pos_map[ord1]: 384 child.reset_cur_count()
385
386 - def reset_cur_count(self):
387 """ 388 Set cur_count of child nodes to zero 389 """ 390 self.reset_child_count()
391
392 - def __iter__(self):
393 return self
394 395 # def next(self): 396 # #if self.cur_iter_node.id == 'GS06': 397 # if self.cur_iter_node.id == 'IEA': 398 # raise StopIteration 399 # #first, get first child 400 # if self.cur_iter_node.get_child_count() > 0: 401 # self.cur_iter_node = self.cur_iter_node.children[0] 402 # return self.cur_iter_node 403 # # Get original index of starting node 404 # #node_idx = self.cur_iter_node.index 405 # cur_node = self.cur_iter_node 406 # #node = self._pop_to_parent(cur_node) 407 # while 1: 408 # #second, get next sibling 409 # if cur_node is None: 410 # raise StopIteration 411 # if cur_node.next_node != None: 412 # self.cur_iter_node = cur_node.next_node 413 # return self.cur_iter_node 414 # #last, get siblings of parent 415 # cur_node = cur_node.parent 416 # return None 417 418 419 ############################################################ 420 # Loop Interface 421 ############################################################
422 -class loop_if(x12_node):
423 """ 424 Loop Interface 425 """
426 - def __init__(self, root, parent, my_index):
427 """ 428 @requires: Must be entered with a libxml2 loop node current 429 """ 430 x12_node.__init__(self) 431 self.root = root 432 self.parent = parent 433 self.index = my_index 434 self.children = None 435 self.pos_map = {} 436 self.path = '' 437 self.base_name = 'loop' 438 self.type = 'implicit' 439 self.cur_count = 0 440 441 self.id = None 442 self.name = None 443 self.usage = None 444 #self.seq = None 445 self.pos = None 446 self.repeat = None 447 448 reader = self.root.reader 449 450 index = 0 451 self.base_level = reader.Depth() 452 # if parent == None: 453 # self.path = id 454 # else: 455 # self.path = path + '/' + id 456 457 self.cur_level = reader.Depth() 458 459 while reader.MoveToNextAttribute(): 460 if reader.Name() == 'xid': 461 self.id = reader.Value() 462 self.path = self.id 463 elif reader.Name() == 'type': 464 self.type = reader.Value() 465 466 ret = 1 467 while ret == 1: 468 #print '--- loop while' 469 #print reader.NodeType(), reader.Name() 470 #print 'loop', reader.NodeType(), reader.Depth(), reader.Name() 471 #processNode(reader) 472 tmpNodeType = reader.NodeType() 473 if tmpNodeType == NodeType['element_start']: 474 #if reader.Name() in ('map', 'transaction', 'loop', \ 475 # 'segment', 'element'): 476 # print 'l'*reader.Depth(), reader.Depth(), \ 477 # self.base_level, reader.NodeType(), reader.Name() 478 cur_name = reader.Name() 479 if cur_name == 'loop' and self.base_level < reader.Depth(): 480 loop_node = loop_if(self.root, self, index) 481 try: 482 self.pos_map[loop_node.pos].append(loop_node) 483 except KeyError: 484 self.pos_map[loop_node.pos] = [loop_node] 485 #self.children.append(loop_if(self.root, self, index)) 486 #if len(self.children) > 1: 487 # self.children[-1].prev_node = self.children[-2] 488 # self.children[-2].next_node = self.children[-1] 489 index += 1 490 elif cur_name == 'segment': 491 seg_node = segment_if(self.root, self, index) 492 try: 493 self.pos_map[seg_node.pos].append(seg_node) 494 except KeyError: 495 self.pos_map[seg_node.pos] = [seg_node] 496 #self.children.append(segment_if(self.root, self, index)) 497 #if len(self.children) > 1: 498 # self.children[-1].prev_node = self.children[-2] 499 # self.children[-2].next_node = self.children[-1] 500 index += 1 501 elif cur_name == 'element': 502 raise errors.EngineError, 'This should not happen' 503 #self.children.append(element_if(self.root, self)) 504 #if len(self.children) > 1: 505 # self.children[-1].prev_node = self.children[-2] 506 # self.children[-2].next_node = self.children[-1] 507 508 #if self.cur_level < reader.Depth(): 509 # self.cur_path = os.path.join(self.cur_path, cur_name) 510 #elif self.cur_level > reader.Depth(): 511 # self.cur_path = os.path.dirname(self.cur_path) 512 self.cur_level = reader.Depth() 513 elif tmpNodeType == NodeType['element_end']: 514 #print '--', reader.Name(), self.base_level, reader.Depth(), reader.Depth() <= self.base_level 515 if reader.Depth() <= self.base_level: 516 ret = reader.Read() 517 if ret == -1: 518 raise errors.XML_Reader_Error, 'Read Error' 519 elif ret == 0: 520 raise errors.XML_Reader_Error, 'End of Map File' 521 break 522 #if reader.Name() == 'transaction': 523 # return 524 # pass 525 cur_name = '' 526 527 elif tmpNodeType == NodeType['text'] and self.base_level + 2 == \ 528 reader.Depth(): 529 #print cur_name, reader.Value() 530 #if cur_name == 'id' and self.base_name == 'loop': 531 # self.id = reader.Value() 532 # self.path = self.id 533 if cur_name == 'name' and self.base_name == 'loop': 534 self.name = reader.Value() 535 elif cur_name == 'usage' and self.base_name == 'loop': 536 self.usage = reader.Value() 537 elif cur_name == 'pos' and self.base_name == 'loop': 538 self.pos = int(reader.Value()) 539 #self.seq = self.pos # XXX 540 elif cur_name == 'repeat' and self.base_name == 'loop': 541 self.repeat = reader.Value() 542 543 ret = reader.Read() 544 if ret == -1: 545 raise errors.XML_Reader_Error, 'Read Error' 546 elif ret == 0: 547 raise errors.XML_Reader_Error, 'End of Map File'
548
549 - def debug_print(self):
550 sys.stdout.write(self.__repr__()) 551 pos_keys = self.pos_map.keys() 552 pos_keys.sort() 553 for ord1 in pos_keys: 554 for node in self.pos_map[ord1]: 555 node.debug_print()
556
557 - def __len__(self):
558 i = 0 559 pos_keys = self.pos_map.keys() 560 pos_keys.sort() 561 for ord1 in pos_keys: 562 i += len(self.pos_map[ord1]) 563 return i
564
565 - def __repr__(self):
566 """ 567 @rtype: string 568 """ 569 #out = '%s%s %s %s %s\n' % (str(' '*self.base_level), self.base_name, \ 570 # self.id, self.name, self.base_level) 571 out = '' 572 if self.id: 573 out += '%sLOOP %s' % (str(' '*(self.base_level+1)), self.id) 574 #out += '%sid: %s ' % (str(' '*(self.base_level+1)), self.id) 575 if self.name: 576 out += ' "%s"' % (self.name) 577 if self.usage: 578 out += ' usage: %s' % (self.usage) 579 if self.pos: 580 out += ' pos: %s' % (self.pos) 581 if self.repeat: 582 out += ' repeat: %s' % (self.repeat) 583 out += '\n' 584 return out
585
586 - def get_max_repeat(self):
587 if self.repeat is None: 588 return MAXINT 589 if self.repeat == '&gt;1' or self.repeat == '>1': 590 return MAXINT 591 return int(self.repeat)
592
593 - def get_parent(self):
594 return self.parent
595
596 - def get_first_seg(self):
597 pos_keys = self.pos_map.keys() 598 pos_keys.sort() 599 if len(pos_keys) > 0 and self.pos_map[pos_keys[0]][0].is_segment(): 600 return self.pos_map[pos_keys[0]][0] 601 else: 602 return None
603 604 # def is_valid(self, seg_data, errh): 605 # pass 606 607 # def parse(self): 608 # pass 609
610 - def getnodebypath(self, path):
611 """ 612 @param path: remaining path to match 613 @type path: string 614 @return: matching node, or None is no match 615 """ 616 pathl = path.split('/') 617 if len(pathl) == 0: return None 618 pos_keys = self.pos_map.keys() 619 pos_keys.sort() 620 for ord1 in pos_keys: 621 for child in self.pos_map[ord1]: 622 if child.is_loop(): 623 if child.id.upper() == pathl[0].upper(): 624 if len(pathl) == 1: 625 return child 626 else: 627 return child.getnodebypath(string.join(pathl[1:],'/')) 628 elif child.is_segment() and len(pathl) == 1: 629 if pathl[0].find('[') == -1: # No id to match 630 if pathl[0] == child.id: 631 return child 632 else: 633 seg_id = pathl[0][0:pathl[0].find('[')] 634 id_val = pathl[0][pathl[0].find('[')+1:pathl[0].find(']')] 635 if seg_id == child.id: 636 if child.children[0].is_element() \ 637 and child.children[0].get_data_type() == 'ID' \ 638 and len(child.children[0].valid_codes) > 0 \ 639 and id_val in child.children[0].valid_codes: 640 return child 641 # Special Case for 820 642 elif seg_id == 'ENT' and child.children[1].is_element() \ 643 and child.children[1].get_data_type() == 'ID' \ 644 and len(child.children[1].valid_codes) > 0 \ 645 and id_val in child.children[1].valid_codes: 646 return child 647 elif child.children[0].is_composite() \ 648 and child.children[0].children[0].get_data_type() == 'ID' \ 649 and len(child.children[0].children[0].valid_codes) > 0 \ 650 and id_val in child.children[0].children[0].valid_codes: 651 return child 652 elif seg_id == 'HL' and child.children[2].is_element() \ 653 and len(child.children[2].valid_codes) > 0 \ 654 and id_val in child.children[2].valid_codes: 655 return child 656 raise errors.EngineError, 'getnodebypath failed. Path "%s" not found' % path
657
658 - def get_child_count(self):
659 return self.__len__()
660
661 - def get_child_node_by_idx(self, idx):
662 """ 663 @param idx: zero based 664 """ 665 raise errors.EngineError, 'loop_if.get_child_node_by_idx is not a valid call'
666
667 - def get_seg_count(self):
668 """ 669 @return: Number of child segments 670 @rtype: integer 671 """ 672 i = 0 673 pos_keys = self.pos_map.keys() 674 pos_keys.sort() 675 for ord1 in pos_keys: 676 for child in self.pos_map[ord1]: 677 if child.is_segment(): 678 i += 1 679 return i
680
681 - def is_loop(self):
682 """ 683 @rtype: boolean 684 """ 685 return True
686
687 - def is_match(self, seg_data):
688 """ 689 @type seg_data: L{segment<segment.Segment>} 690 @return: Is the segment a match to this loop? 691 @rtype: boolean 692 """ 693 #child = self.get_child_node_by_idx(0) 694 child = self.pos_map[self.pos_map.keys[0]][0] 695 if child.is_loop(): 696 return child.is_match(seg_data) 697 elif child.is_segment(): 698 if child.is_match(seg_data): 699 return True 700 else: 701 return False # seg does not match the first segment in loop, so not valid 702 else: 703 return False
704
705 - def get_cur_count(self):
706 """ 707 @return: current count 708 @rtype: int 709 """ 710 return self.cur_count
711
712 - def incr_cur_count(self):
713 self.cur_count += 1
714
715 - def reset_child_count(self):
716 """ 717 Set cur_count of child nodes to zero 718 """ 719 pos_keys = self.pos_map.keys() 720 pos_keys.sort() 721 for ord1 in pos_keys: 722 for child in self.pos_map[ord1]: 723 child.reset_cur_count()
724
725 - def reset_cur_count(self):
726 """ 727 Set cur_count of node and child nodes to zero 728 """ 729 self.cur_count = 0 730 self.reset_child_count()
731
732 - def set_cur_count(self, ct):
733 self.cur_count = ct
734
735 - def get_counts_list(self, ct_list):
736 """ 737 Build a list of (path, ct) of the current node and parents 738 Gets the node counts to apply to another map 739 @param ct_list: List to append to 740 @type ct_list: list[(string, int)] 741 """ 742 my_ct = (self.get_path(), self.cur_count) 743 ct_list.append(my_ct) 744 if not self.parent.is_map_root(): 745 self.parent.get_counts_list(ct_list) 746 return True
747 748 749 ############################################################ 750 # Segment Interface 751 ############################################################
752 -class segment_if(x12_node):
753 """ 754 Segment Interface 755 """
756 - def __init__(self, root, parent, my_index):
757 """ 758 @requires: Must be entered with a libxml2 segment node current 759 @param parent: parent node 760 """ 761 762 #global reader 763 reader = root.reader 764 x12_node.__init__(self) 765 self.root = root 766 self.parent = parent 767 self.index = my_index 768 self.children = [] 769 self.path = '' 770 self.base_name = 'segment' 771 self.base_level = reader.Depth() 772 self.check_dte = '20030930' 773 self.cur_count = 0 774 #self.logger = logging.getLogger('pyx12') 775 776 self.id = None 777 self.end_tag = None 778 self.name = None 779 self.usage = None 780 self.pos = None 781 self.max_use = None 782 self.syntax = [] 783 784 # if parent == None: 785 # self.path = id 786 # else: 787 # self.path = path + '/' + id 788 789 self.cur_level = reader.Depth() 790 791 while reader.MoveToNextAttribute(): 792 if reader.Name() == 'xid': 793 self.id = reader.Value() 794 self.path = self.id 795 796 ret = 1 797 while ret == 1: 798 #print '--- segment while' 799 #print 'seg', reader.NodeType(), reader.Depth(), reader.Name() 800 tmpNodeType = reader.NodeType() 801 if tmpNodeType == NodeType['element_start']: 802 #if reader.Name() in ('map', 'transaction', 'loop', 'segment', 'element'): 803 # print 's'*reader.Depth(), reader.Depth(), self.base_level, reader.NodeType(), reader.Name() 804 cur_name = reader.Name() 805 if cur_name == 'segment': 806 self.base_level = reader.Depth() 807 self.base_name = 'segment' 808 elif cur_name == 'element': 809 self.children.append(element_if(self.root, self)) 810 #if len(self.children) > 1: 811 # self.children[-1].prev_node = self.children[-2] 812 # self.children[-2].next_node = self.children[-1] 813 elif cur_name == 'composite': 814 self.children.append(composite_if(self.root, self)) 815 #if len(self.children) > 1: 816 # self.children[-1].prev_node = self.children[-2] 817 # self.children[-2].next_node = self.children[-1] 818 819 #if self.cur_level < reader.Depth(): 820 # self.cur_path = os.path.join(self.cur_path, cur_name) 821 #elif self.cur_level > reader.Depth(): 822 # self.cur_path = os.path.dirname(self.cur_path) 823 self.cur_level = reader.Depth() 824 elif tmpNodeType == NodeType['element_end']: 825 #print '--', reader.Name(), self.base_level, reader.Depth(), reader.Depth() <= self.base_level 826 if reader.Depth() <= self.base_level: 827 ret = reader.Read() 828 if ret == -1: 829 raise errors.XML_Reader_Error, 'Read Error' 830 elif ret == 0: 831 raise errors.XML_Reader_Error, 'End of Map File' 832 break 833 #if reader.Name() == 'transaction': 834 # return 835 # pass 836 cur_name = '' 837 838 elif tmpNodeType == NodeType['text'] and self.base_level + 2 == reader.Depth(): 839 #print cur_name, reader.Value() 840 #if cur_name == 'id' and self.base_name == 'segment': 841 # self.id = reader.Value() 842 # self.path = self.id 843 if cur_name == 'end_tag' and self.base_name == 'segment': 844 self.end_tag = reader.Value() 845 elif cur_name == 'name' and self.base_name == 'segment': 846 self.name = reader.Value() 847 elif cur_name == 'usage' and self.base_name == 'segment': 848 self.usage = reader.Value() 849 elif cur_name == 'pos' and self.base_name == 'segment': 850 self.pos = int(reader.Value()) 851 elif cur_name == 'max_use' and self.base_name == 'segment': 852 self.max_use = reader.Value() 853 elif cur_name == 'syntax' and self.base_name == 'segment': 854 syn_list = self._split_syntax(reader.Value()) 855 if syn_list is not None: 856 self.syntax.append(syn_list) 857 858 ret = reader.Read() 859 if ret == -1: 860 raise errors.XML_Reader_Error, 'Read Error' 861 elif ret == 0: 862 raise errors.XML_Reader_Error, 'End of Map File'
863
864 - def debug_print(self):
865 sys.stdout.write(self.__repr__()) 866 for node in self.children: 867 node.debug_print()
868
869 - def __repr__(self):
870 """ 871 @rtype: string 872 """ 873 t1 = str(' '*self.base_level) 874 #t2 = str(' '*(self.base_level+1)) 875 #self.base_name 876 out = '%s%s "%s"' % (t1, self.id, self.name) 877 #if self.id: 878 # out += '%sid %s\n' % (t2, self.id) 879 #if self.name: 880 # out += '%sname %s\n' % (t2, self.name) 881 if self.usage: 882 out += ' usage: %s' % (self.usage) 883 if self.pos: 884 out += ' pos: %i' % (self.pos) 885 if self.max_use: 886 out += ' max_use: %s' % (self.max_use) 887 out += '\n' 888 return out
889
890 - def get_max_repeat(self):
891 if self.max_use is None or self.max_use == '>1': 892 return MAXINT 893 return int(self.max_use)
894
895 - def get_parent(self):
896 """ 897 @return: ref to parent class instance 898 @rtype: pyx12.x12_node 899 """ 900 return self.parent
901
902 - def get_path(self):
903 """ 904 @return: path - XPath style 905 @rtype: string 906 """ 907 parent_path = self.parent.get_path() 908 if parent_path == '/': 909 ret = '/' + self.path 910 else: 911 ret = parent_path + '/' + self.path 912 return ret
913
914 - def is_first_seg_in_loop(self):
915 """ 916 @rtype: boolean 917 """ 918 if self is self.get_parent().get_first_seg(): 919 return True 920 else: 921 return False
922
923 - def is_match(self, seg):
924 """ 925 Is data segment given a match to this segment node? 926 @param seg: data segment instance 927 @return: boolean 928 @rtype: boolean 929 """ 930 if seg.get_seg_id() == self.id: 931 if self.children[0].is_element() \ 932 and self.children[0].get_data_type() == 'ID' \ 933 and self.children[0].usage == 'R' \ 934 and len(self.children[0].valid_codes) > 0 \ 935 and seg.get_value('01') not in self.children[0].valid_codes: 936 #logger.debug('is_match: %s %s' % (seg.get_seg_id(), seg[1]), self.children[0].valid_codes) 937 return False 938 # Special Case for 820 939 elif seg.get_seg_id() == 'ENT' \ 940 and self.children[1].is_element() \ 941 and self.children[1].get_data_type() == 'ID' \ 942 and len(self.children[1].valid_codes) > 0 \ 943 and seg.get_value('02') not in self.children[1].valid_codes: 944 #logger.debug('is_match: %s %s' % (seg.get_seg_id(), seg[1]), self.children[0].valid_codes) 945 return False 946 elif self.children[0].is_composite() \ 947 and self.children[0].children[0].get_data_type() == 'ID' \ 948 and len(self.children[0].children[0].valid_codes) > 0 \ 949 and seg.get_value('01-1') not in self.children[0].children[0].valid_codes: 950 return False 951 elif seg.get_seg_id() == 'HL' and self.children[2].is_element() \ 952 and len(self.children[2].valid_codes) > 0 \ 953 and seg.get_value('03') not in self.children[2].valid_codes: 954 return False 955 return True 956 else: 957 return False
958
959 - def is_segment(self):
960 """ 961 @rtype: boolean 962 """ 963 return True
964
965 - def is_valid(self, seg_data, errh):
966 """ 967 @param seg_data: data segment instance 968 @type seg_data: L{segment<segment.Segment>} 969 @param errh: instance of error_handler 970 @rtype: boolean 971 """ 972 valid = True 973 child_count = self.get_child_count() 974 if len(seg_data) > child_count: 975 #child_node = self.get_child_node_by_idx(child_count+1) 976 err_str = 'Too many elements in segment "%s" (%s). Has %i, should have %i' % \ 977 (self.name, seg_data.get_seg_id(), len(seg_data), child_count) 978 #self.logger.error(err_str) 979 err_value = seg_data.get_value('%02i' % (child_count+1)) 980 errh.ele_error('3', err_str, err_value) 981 valid = False 982 983 dtype = [] 984 type_list = [] 985 for i in xrange(min(len(seg_data), child_count)): 986 #self.logger.debug('i=%i, len(seg_data)=%i / child_count=%i' % \ 987 # (i, len(seg_data), self.get_child_count())) 988 child_node = self.get_child_node_by_idx(i) 989 if child_node.is_composite(): 990 # Validate composite 991 ref_des = '%02i' % (i+1) 992 comp_data = seg_data.get(ref_des) 993 subele_count = child_node.get_child_count() 994 if seg_data.ele_len(ref_des) > subele_count and child_node.usage != 'N': 995 subele_node = child_node.get_child_node_by_idx(subele_count+1) 996 err_str = 'Too many sub-elements in composite "%s" (%s)' % \ 997 (subele_node.name, subele_node.refdes) 998 err_value = seg_data.get_value(ref_des) 999 errh.ele_error('3', err_str, err_value) 1000 valid &= child_node.is_valid(comp_data, errh, self.check_dte) 1001 elif child_node.is_element(): 1002 # Validate Element 1003 if i == 1 and seg_data.get_seg_id() == 'DTP' \ 1004 and seg_data.get_value('02') in ('RD8', 'D8', 'D6', 'DT', 'TM'): 1005 dtype = [seg_data.get_value('02')] 1006 if child_node.data_ele == '1250': 1007 type_list.extend(child_node.valid_codes) 1008 ele_data = seg_data.get('%02i' % (i+1)) 1009 if i == 2 and seg_data.get_seg_id() == 'DTP': 1010 valid &= child_node.is_valid(ele_data, errh, self.check_dte, dtype) 1011 elif child_node.data_ele == '1251' and len(type_list) > 0: 1012 valid &= child_node.is_valid(ele_data, errh, self.check_dte, type_list) 1013 else: 1014 valid &= child_node.is_valid(ele_data, errh, self.check_dte) 1015 1016 for i in xrange(min(len(seg_data), child_count), child_count): 1017 #missing required elements? 1018 child_node = self.get_child_node_by_idx(i) 1019 valid &= child_node.is_valid(None, errh) 1020 1021 for syn in self.syntax: 1022 (bResult, err_str) = is_syntax_valid(seg_data, syn) 1023 if not bResult: 1024 errh.ele_error('2', err_str, None) 1025 valid &= False 1026 1027 return valid
1028
1029 - def _split_syntax(self, syntax):
1030 """ 1031 Split a Syntax string into a list 1032 """ 1033 if syntax[0] not in ['P', 'R', 'C', 'L', 'E']: 1034 #self.logger.error('Syntax %s is not valid' % (syntax)) 1035 return None 1036 syn = [syntax[0]] 1037 for i in range(len(syntax[1:])/2): 1038 syn.append(int(syntax[i*2+1:i*2+3])) 1039 return syn
1040
1041 - def get_cur_count(self):
1042 """ 1043 @return: current count 1044 @rtype: int 1045 """ 1046 return self.cur_count
1047
1048 - def incr_cur_count(self):
1049 self.cur_count += 1
1050
1051 - def reset_cur_count(self):
1052 """ 1053 Set cur_count of node to zero 1054 """ 1055 self.cur_count = 0
1056
1057 - def set_cur_count(self, ct):
1058 self.cur_count = ct
1059
1060 - def get_counts_list(self, ct_list):
1061 """ 1062 Build a list of (path, ct) of the current node and parents 1063 Gets the node counts to apply to another map 1064 @param ct_list: List to append to 1065 @type ct_list: list[(string, int)] 1066 """ 1067 my_ct = (self.get_path(), self.cur_count) 1068 ct_list.append(my_ct) 1069 if not self.parent.is_map_root(): 1070 self.parent.get_counts_list(ct_list) 1071 return True
1072 1073 1074 ############################################################ 1075 # Element Interface 1076 ############################################################
1077 -class element_if(x12_node):
1078 """ 1079 Element Interface 1080 """ 1081 #data_elements = dataele.DataElements(map_path) 1082
1083 - def __init__(self, root, parent):
1084 """ 1085 @requires: Must be entered with a libxml2 element node current 1086 @param parent: parent node 1087 """ 1088 1089 reader = root.reader 1090 x12_node.__init__(self) 1091 self.children = [] 1092 self.root = root 1093 self.parent = parent 1094 self.path = '' 1095 self.base_name = 'element' 1096 self.base_level = reader.Depth() 1097 1098 self.id = None 1099 self.name = None 1100 self.usage = None 1101 self.data_ele = None 1102 self.seq = None 1103 self.refdes = None 1104 1105 self.valid_codes = [] 1106 self.external_codes = None 1107 self.res = None 1108 self.rec = None 1109 1110 self.cur_level = reader.Depth() 1111 1112 while reader.MoveToNextAttribute(): 1113 if reader.Name() == 'xid': 1114 self.id = reader.Value() 1115 self.refdes = self.id 1116 1117 ret = 1 1118 while ret == 1: 1119 tmpNodeType = reader.NodeType() 1120 if tmpNodeType == NodeType['element_start']: 1121 cur_name = reader.Name() 1122 if cur_name == 'element': 1123 self.base_level = reader.Depth() 1124 self.base_name = 'element' 1125 elif cur_name == 'valid_codes': 1126 while reader.MoveToNextAttribute(): 1127 if reader.Name() == 'external': 1128 self.external_codes = reader.Value() 1129 self.cur_level = reader.Depth() 1130 elif tmpNodeType == NodeType['element_end']: 1131 if reader.Depth() <= self.base_level: 1132 ret = reader.Read() 1133 if ret == -1: 1134 raise errors.XML_Reader_Error, 'Read Error' 1135 elif ret == 0: 1136 raise errors.XML_Reader_Error, 'End of Map File' 1137 break 1138 cur_name = '' 1139 1140 elif tmpNodeType == NodeType['text'] and self.base_level + 2 <= reader.Depth(): 1141 if cur_name == 'name': 1142 self.name = reader.Value() 1143 elif cur_name == 'data_ele': 1144 self.data_ele= reader.Value() 1145 elif cur_name == 'usage': 1146 self.usage = reader.Value() 1147 elif cur_name == 'seq': 1148 self.seq = int(reader.Value()) 1149 self.path = reader.Value() 1150 elif cur_name == 'regex': 1151 self.res = reader.Value() 1152 try: 1153 self.rec = re.compile(self.res, re.S) 1154 except: 1155 pass 1156 #logger.error('Element regex "%s" failed to compile' % (reader.Value())) 1157 elif cur_name == 'code': 1158 self.valid_codes.append(reader.Value()) 1159 1160 ret = reader.Read() 1161 if ret == -1: 1162 raise errors.XML_Reader_Error, 'Read Error' 1163 elif ret == 0: 1164 raise errors.XML_Reader_Error, 'End of Map File'
1165
1166 - def debug_print(self):
1167 sys.stdout.write(self.__repr__()) 1168 for node in self.children: 1169 node.debug_print()
1170
1171 - def __repr__(self):
1172 """ 1173 @rtype: string 1174 """ 1175 (data_type, min_len, max_len) = self.root.data_elements.get_by_elem_num(self.data_ele) 1176 out = '%s%s "%s"' % (str(' '*self.base_level), self.refdes, self.name) 1177 if self.data_ele: 1178 out += ' data_ele: %s' % (self.data_ele) 1179 if self.usage: 1180 out += ' usage: %s' % (self.usage) 1181 if self.seq: 1182 out += ' seq: %i' % (self.seq) 1183 out += ' %s(%i, %i)' % (data_type, min_len, max_len) 1184 if self.external_codes: 1185 out += ' external codes: %s' % (self.external_codes) 1186 out += '\n' 1187 return out
1188 1189 # def __del__(self): 1190 # pass 1191
1192 - def _error(self, errh, err_str, err_cde, elem_val):
1193 """ 1194 Forward the error to an error_handler 1195 """ 1196 errh.ele_error(err_cde, err_str, elem_val) #, pos=self.seq, data_ele=self.data_ele)
1197
1198 - def _valid_code(self, code):
1199 """ 1200 Verify the x12 element value is in the given list of valid codes 1201 @return: True if found, else False 1202 @rtype: boolean 1203 """ 1204 #if not self.valid_codes: 1205 # return True 1206 if code in self.valid_codes: 1207 return True 1208 return False
1209
1210 - def get_parent(self):
1211 """ 1212 @return: ref to parent class instance 1213 """ 1214 return self.parent
1215
1216 - def is_match(self):
1217 """ 1218 @return: 1219 @rtype: boolean 1220 """ 1221 # match also by ID 1222 pass
1223
1224 - def is_valid(self, elem, errh, check_dte=None, type_list=[]):
1225 """ 1226 Is this a valid element? 1227 1228 @param elem: element instance 1229 @type elem: L{element<segment.Element>} 1230 @param errh: instance of error_handler 1231 @param check_dte: date string to check against (YYYYMMDD) 1232 @param type_list: Optional data/time type list 1233 @type type_list: list[string] 1234 @return: True if valid 1235 @rtype: boolean 1236 """ 1237 errh.add_ele(self) 1238 1239 if elem and elem.is_composite(): 1240 err_str = 'Data element "%s" (%s) is an invalid composite' % \ 1241 (self.name, self.refdes) 1242 self._error(errh, err_str, '6', elem.__repr__()) 1243 return False 1244 1245 if elem is None or elem.get_value() == '': 1246 if self.usage in ('N', 'S'): 1247 return True 1248 elif self.usage == 'R': 1249 if self.seq != 1 or not self.parent.is_composite() or self.parent.usage == 'R': 1250 err_str = 'Mandatory data element "%s" (%s) is missing' % (self.name, self.refdes) 1251 self._error(errh, err_str, '1', None) 1252 return False 1253 else: 1254 return True 1255 if self.usage == 'N' and elem.get_value() != '': 1256 err_str = 'Data element "%s" (%s) is marked as Not Used' % (self.name, self.refdes) 1257 self._error(errh, err_str, '10', None) 1258 return False 1259 1260 elem_val = elem.get_value() 1261 (data_type, min_len, max_len) = self.root.data_elements.get_by_elem_num(self.data_ele) 1262 valid = True 1263 # Validate based on data_elem_num 1264 # Then, validate on more specific criteria 1265 if (not data_type is None) and (data_type == 'R' or data_type[0] == 'N'): 1266 elem_strip = string.replace(string.replace(elem_val, '-', ''), '.', '') 1267 if len(elem_strip) < min_len: 1268 err_str = 'Data element "%s" (%s) is too short: "%s" should be at least %i characters' % \ 1269 (self.name, self.refdes, elem_val, min_len) 1270 self._error(errh, err_str, '4', elem_val) 1271 valid = False 1272 if len(elem_strip) > max_len: 1273 err_str = 'Element "%s" (%s) is too long: "%s" should only be %i characters' % \ 1274 (self.name, self.refdes, elem_val, max_len) 1275 self._error(errh, err_str, '5', elem_val) 1276 valid = False 1277 else: 1278 if len(elem_val) < min_len: 1279 err_str = 'Data element "%s" (%s) is too short: "%s" should be at least %i characters' % \ 1280 (self.name, self.refdes, elem_val, min_len) 1281 self._error(errh, err_str, '4', elem_val) 1282 valid = False 1283 if len(elem_val) > max_len: 1284 err_str = 'Element "%s" (%s) is too long: "%s" should only be %i characters' % \ 1285 (self.name, self.refdes, elem_val, max_len) 1286 self._error(errh, err_str, '5', elem_val) 1287 valid = False 1288 1289 if data_type in ['AN', 'ID'] and elem_val[-1] == ' ': 1290 if len(elem_val.rstrip()) >= min_len: 1291 err_str = 'Element "%s" (%s) has unnecessary trailing spaces. (%s)' % \ 1292 (self.name, self.refdes, elem_val) 1293 self._error(errh, err_str, '6', elem_val) 1294 valid = False 1295 1296 if not self._is_valid_code(elem_val, errh, check_dte): 1297 valid = False 1298 if not IsValidDataType(elem_val, data_type, self.root.param.get('charset')): 1299 if data_type in ('RD8', 'DT', 'D8', 'D6'): 1300 err_str = 'Data element "%s" (%s) contains an invalid date (%s)' % \ 1301 (self.name, self.refdes, elem_val) 1302 self._error(errh, err_str, '8', elem_val) 1303 valid = False 1304 elif data_type == 'TM': 1305 err_str = 'Data element "%s" (%s) contains an invalid time (%s)' % \ 1306 (self.name, self.refdes, elem_val) 1307 self._error(errh, err_str, '9', elem_val) 1308 valid = False 1309 else: 1310 err_str = 'Data element "%s" (%s) is type %s, contains an invalid character(%s)' % \ 1311 (self.name, self.refdes, data_type, elem_val) 1312 self._error(errh, err_str, '6', elem_val) 1313 valid = False 1314 if len(type_list) > 0: 1315 valid_type = False 1316 for dtype in type_list: 1317 valid_type |= IsValidDataType(elem_val, dtype, self.root.param.get('charset')) 1318 if not valid_type: 1319 if 'TM' in type_list: 1320 err_str = 'Data element "%s" (%s) contains an invalid time (%s)' % \ 1321 (self.name, self.refdes, elem_val) 1322 self._error(errh, err_str, '9', elem_val) 1323 elif 'RD8' in type_list or 'DT' in type_list or 'D8' in type_list or 'D6' in type_list: 1324 err_str = 'Data element "%s" (%s) contains an invalid date (%s)' % \ 1325 (self.name, self.refdes, elem_val) 1326 self._error(errh, err_str, '8', elem_val) 1327 valid = False 1328 if self.rec: 1329 m = self.rec.search(elem_val) 1330 if not m: 1331 err_str = 'Data element "%s" with a value of (%s)' % \ 1332 (self.name, elem_val) 1333 err_str += ' failed to match the regular expression "%s"' % (self.res) 1334 self._error(errh, err_str, '7', elem_val) 1335 valid = False 1336 return valid
1337
1338 - def _is_valid_code(self, elem_val, errh, check_dte=None):
1339 """ 1340 @rtype: boolean 1341 """ 1342 bValidCode = False 1343 if len(self.valid_codes) == 0 and self.external_codes is None: 1344 bValidCode = True 1345 if elem_val in self.valid_codes: 1346 bValidCode = True 1347 if self.external_codes is not None and \ 1348 self.root.ext_codes.isValid(self.external_codes, elem_val, check_dte): 1349 bValidCode = True 1350 if not bValidCode: 1351 err_str = '(%s) is not a valid code for %s (%s)' % (elem_val, self.name, self.refdes) 1352 self._error(errh, err_str, '7', elem_val) 1353 return False 1354 return True
1355
1356 - def get_data_type(self):
1357 """ 1358 """ 1359 (data_type, min_len, max_len) = self.root.data_elements.get_by_elem_num(self.data_ele) 1360 return data_type
1361
1362 - def get_seg_count(self):
1363 """ 1364 """ 1365 pass
1366
1367 - def is_element(self):
1368 """ 1369 @rtype: boolean 1370 """ 1371 return True
1372 1373 1374 ############################################################ 1375 # Composite Interface 1376 ############################################################
1377 -class composite_if(x12_node):
1378 """ 1379 Composite Node Interface 1380 """
1381 - def __init__(self, root, parent):
1382 """ 1383 Get the values for this composite 1384 @param parent: parent node 1385 """ 1386 1387 #global reader 1388 reader = root.reader 1389 x12_node.__init__(self) 1390 1391 self.children = [] 1392 self.root = root 1393 self.parent = parent 1394 self.path = '' 1395 self.base_name = 'composite' 1396 self.base_level = reader.Depth() 1397 self.check_dte = '20030930' 1398 1399 #self.id = None 1400 self.name = None 1401 self.data_ele = None 1402 self.usage = None 1403 self.seq = None 1404 self.refdes = None 1405 1406 self.cur_level = reader.Depth() 1407 #self.logger = logging.getLogger('pyx12') 1408 1409 ret = 1 1410 while ret == 1: 1411 #print '--- segment while' 1412 #print 'seg', reader.NodeType(), reader.Depth(), reader.Name() 1413 tmpNodeType = reader.NodeType() 1414 if tmpNodeType == NodeType['element_start']: 1415 #if reader.Name() in ('map', 'transaction', 'loop', 'segment', 'element'): 1416 # print 's'*reader.Depth(), reader.Depth(), self.base_level, reader.NodeType(), reader.Name() 1417 cur_name = reader.Name() 1418 if cur_name == 'composite': 1419 self.base_level = reader.Depth() 1420 self.base_name = 'composite' 1421 elif cur_name == 'element': 1422 self.children.append(element_if(self.root, self)) 1423 #if len(self.children) > 1: 1424 # self.children[-1].prev_node = self.children[-2] 1425 # self.children[-2].next_node = self.children[-1] 1426 1427 #if self.cur_level < reader.Depth(): 1428 # self.cur_path = os.path.join(self.cur_path, cur_name) 1429 #elif self.cur_level > reader.Depth(): 1430 # self.cur_path = os.path.dirname(self.cur_path) 1431 self.cur_level = reader.Depth() 1432 elif tmpNodeType == NodeType['element_end']: 1433 #print '--', reader.Name(), self.base_level, reader.Depth(), reader.Depth() <= self.base_level 1434 if reader.Depth() <= self.base_level: 1435 ret = reader.Read() 1436 if ret == -1: 1437 raise errors.XML_Reader_Error, 'Read Error' 1438 elif ret == 0: 1439 raise errors.XML_Reader_Error, 'End of Map File' 1440 break 1441 #if reader.Name() == 'transaction': 1442 # return 1443 # pass 1444 cur_name = '' 1445 1446 elif tmpNodeType == NodeType['text'] and self.base_level + 2 == reader.Depth(): 1447 #print cur_name, reader.Value() 1448 if cur_name == 'name': 1449 self.name = reader.Value() 1450 elif cur_name == 'data_ele': 1451 self.data_ele = reader.Value() 1452 elif cur_name == 'usage': 1453 self.usage = reader.Value() 1454 elif cur_name == 'seq': 1455 self.seq = int(reader.Value()) 1456 elif cur_name == 'refdes': 1457 self.refdes = reader.Value() 1458 1459 ret = reader.Read() 1460 if ret == -1: 1461 raise errors.XML_Reader_Error, 'Read Error' 1462 elif ret == 0: 1463 raise errors.XML_Reader_Error, 'End of Map File'
1464
1465 - def _error(self, errh, err_str, err_cde, elem_val):
1466 """ 1467 Forward the error to an error_handler 1468 """ 1469 errh.ele_error(err_cde, err_str, elem_val)
1470 #, pos=self.seq, data_ele=self.data_ele) 1471
1472 - def debug_print(self):
1473 sys.stdout.write(self.__repr__()) 1474 for node in self.children: 1475 node.debug_print()
1476
1477 - def __repr__(self):
1478 """ 1479 @rtype: string 1480 """ 1481 out = '%s%s "%s"' % (str(' '*self.base_level), \ 1482 self.id, self.name) 1483 if self.usage: 1484 out += ' usage %s' % (self.usage) 1485 if self.seq: 1486 out += ' seq %i' % (self.seq) 1487 if self.refdes: 1488 out += ' refdes %s' % (self.refdes) 1489 out += '\n' 1490 return out
1491
1492 - def xml(self):
1493 """ 1494 Sends an xml representation of the composite to stdout 1495 """ 1496 sys.stdout.write('<composite>\n') 1497 for sub_elem in self.children: 1498 sub_elem.xml() 1499 sys.stdout.write('</composite>\n')
1500
1501 - def is_valid(self, comp_data, errh, check_dte=None):
1502 """ 1503 Validates the composite 1504 @param comp_data: data composite instance, has multiple values 1505 @param errh: instance of error_handler 1506 @rtype: boolean 1507 """ 1508 valid = True 1509 if (comp_data is None or comp_data.is_empty()) and self.usage in ('N', 'S'): 1510 return True 1511 1512 if self.usage == 'R': 1513 good_flag = False 1514 for sub_ele in comp_data: 1515 if sub_ele is not None and len(sub_ele.get_value()) > 0: 1516 good_flag = True 1517 break 1518 if not good_flag: 1519 err_str = 'At least one component of composite "%s" (%s) is required' % \ 1520 (self.name, self.refdes) 1521 errh.ele_error('2', err_str, None) 1522 return False 1523 1524 if self.usage == 'N' and not comp_data.is_empty(): 1525 err_str = 'Composite "%s" (%s) is marked as Not Used' % (self.name, self.refdes) 1526 errh.ele_error('5', err_str, None) 1527 return False 1528 1529 #try: 1530 # a = len(comp_data) 1531 #except: 1532 if len(comp_data) > self.get_child_count(): 1533 err_str = 'Too many sub-elements in composite "%s" (%s)' % (self.name, self.refdes) 1534 errh.ele_error('3', err_str, None) 1535 valid = False 1536 for i in xrange(min(len(comp_data), self.get_child_count())): 1537 valid &= self.get_child_node_by_idx(i).is_valid(comp_data[i], errh, check_dte) 1538 for i in xrange(min(len(comp_data), self.get_child_count()), \ 1539 self.get_child_count()): 1540 if i < self.get_child_count(): 1541 #Check missing required elements 1542 valid &= self.get_child_node_by_idx(i).is_valid(None, errh) 1543 return valid
1544 1545 # def getnodebypath(self, path): 1546 # """ 1547 # """ 1548 # pathl = path.split('/') 1549 # if len(pathl) <=2: return None 1550 # for child in self.children: 1551 # node = child.getnodebypath(pathl[2:]) 1552 # if node != None: 1553 # return node 1554 # return None 1555
1556 - def is_composite(self):
1557 """ 1558 @rtype: boolean 1559 """ 1560 return True
1561
1562 -class Pickle_Errors(Exception):
1563 """Class for map pickling errors."""
1564
1565 -class Create_Map_Errors(Exception):
1566 """Class for map creation errors."""
1567
1568 -def apply_xslt_to_map_win():
1569 #from os import environ 1570 import win32com.client 1571 xsluri = 'xsl/plainpage.xsl' 1572 xmluri = 'website.xml' 1573 1574 xsl = win32com.client.Dispatch("Msxml2.FreeThreadedDOMDocument.4.0") 1575 xml = win32com.client.Dispatch("Msxml2.DOMDocument.4.0") 1576 xsl.load(xsluri) 1577 xml.load(xmluri) 1578 1579 xslt = win32com.client.Dispatch("Msxml2.XSLTemplate.4.0") 1580 xslt.stylesheet = xsl 1581 proc = xslt.createProcessor() 1582 proc.input = xml 1583 1584 #params = {"url":environ['QUERY_STRING'].split("=")[1]} 1585 #for i, v in enumerate(environ['QUERY_STRING'].split("/")[1:]): 1586 # params["selected_section%s" % (i + 1)] = "/" + v 1587 1588 #for param, value in params.items(): 1589 # proc.addParameter(param, value) 1590 proc.transform() 1591 return proc.output
1592
1593 -def cb(ctx, str):
1594 sys.stdout.write('%s%s' % (ctx, str))
1595
1596 -def load_map_file(map_file, param, xslt_files = []):
1597 """ 1598 If any XSL transforms are given, apply them and create map_if 1599 from transformed map. 1600 Else, load the map by pickle if available 1601 @param map_file: absolute path for file 1602 @type map_file: string 1603 @param xslt_files: list of absolute paths of xsl files 1604 @type xslt_files: list[string] 1605 @rtype: pyx12.map_if 1606 """ 1607 logger = logging.getLogger('pyx12.pickler') 1608 map_path = param.get('map_path') 1609 map_full = os.path.join(map_path, map_file) 1610 schema_file = os.path.join(map_path, 'map.xsd') 1611 imap = None 1612 if xslt_files: 1613 try: 1614 doc = libxml2.parseFile(map_full) 1615 for xslt_file in xslt_files: 1616 logger.debug('Apply transform to map %s' % (xslt_file)) 1617 styledoc = libxml2.parseFile(xslt_file) 1618 style = libxslt.parseStylesheetDoc(styledoc) 1619 doc = style.applyStylesheet(doc, None) 1620 style.freeStylesheet() 1621 xsdp = libxml2.schemaNewParserCtxt(schema_file) 1622 xsds = xsdp.schemaParse() 1623 ctx = xsds.schemaNewValidCtxt() 1624 libxml2.registerErrorHandler(cb, ctx) 1625 if doc.schemaValidateDoc(ctx) != 0: 1626 raise Create_Map_Errors, 'Transformed map does not validate agains the schema %s' % (schema_file) 1627 reader = doc.readerWalker() 1628 imap = map_if(reader, param) 1629 doc.freeDoc() 1630 except: 1631 raise Create_Map_Errors, 'Error creating map: %s' % (map_file) 1632 else: 1633 pickle_path = param.get('pickle_path') 1634 pickle_file = '%s.%s' % (os.path.splitext(os.path.join(pickle_path, \ 1635 map_file))[0], 'pkl') 1636 try: 1637 if os.stat(map_full)[ST_MTIME] < os.stat(pickle_file)[ST_MTIME]: 1638 imap = cPickle.load(open(pickle_file)) 1639 if imap.cur_path != '/transaction' or len(imap.children) == 0 \ 1640 or imap.src_version != '$Revision: 1149 $': 1641 raise Pickle_Errors, "reload map" 1642 logger.debug('Map %s loaded from pickle %s' % (map_full, pickle_file)) 1643 else: 1644 raise Pickle_Errors, "reload map" 1645 except: 1646 try: 1647 logger.debug('Create map from %s' % (map_full)) 1648 reader = libxml2.newTextReaderFilename(map_full) 1649 imap = map_if(reader, param) 1650 except: 1651 raise errors.EngineError, 'Load of map file failed: %s' % (map_full) 1652 return imap
1653
1654 -def is_syntax_valid(seg_data, syn):
1655 """ 1656 Verifies the segment against the syntax 1657 @param seg_data: data segment instance 1658 @type seg_data: L{segment<segment.Segment>} 1659 @param syn: list containing the syntax type, and the indices of elements 1660 @type syn: list[string] 1661 @rtype: tuple(boolean, error string) 1662 """ 1663 # handle intra-segment dependancies 1664 if len(syn) < 3: 1665 err_str = 'Syntax string must have at least two comparators (%s)' \ 1666 % (syntax_str(syn)) 1667 return (False, err_str) 1668 1669 syn_code = syn[0] 1670 syn_idx = [int(s) for s in syn[1:]] 1671 1672 if syn_code == 'P': 1673 count = 0 1674 for s in syn_idx: 1675 if len(seg_data) >= s and seg_data.get_value('%02i' % (s)) != '': 1676 count += 1 1677 if count != 0 and count != len(syn_idx): 1678 err_str = 'Syntax Error (%s): If any of %s is present, then all are required'\ 1679 % (syntax_str(syn), syntax_ele_id_str(seg_data.get_seg_id(), syn_idx)) 1680 return (False, err_str) 1681 else: 1682 return (True, None) 1683 elif syn_code == 'R': 1684 count = 0 1685 for s in syn_idx: 1686 if len(seg_data) >= s and seg_data.get_value('%02i' % (s)) != '': 1687 count += 1 1688 if count == 0: 1689 err_str = 'Syntax Error (%s): At least one element is required' % \ 1690 (syntax_str(syn)) 1691 return (False, err_str) 1692 else: 1693 return (True, None) 1694 elif syn_code == 'E': 1695 count = 0 1696 for s in syn_idx: 1697 if len(seg_data) >= s and seg_data.get_value('%02i' % (s)) != '': 1698 count += 1 1699 if count > 1: 1700 err_str = 'Syntax Error (%s): At most one of %s may be present'\ 1701 % (syntax_str(syn), syntax_ele_id_str(seg_data.get_seg_id(), syn_idx)) 1702 return (False, err_str) 1703 else: 1704 return (True, None) 1705 elif syn_code == 'C': 1706 # If the first is present, then all others are required 1707 if len(seg_data) >= syn_idx[0] and seg_data.get_value('%02i' % (syn_idx[0])) != '': 1708 count = 0 1709 for s in syn_idx[1:]: 1710 if len(seg_data) >= s and seg_data.get_value('%02i' % (s)) != '': 1711 count += 1 1712 if count != len(syn_idx)-1: 1713 if len(syn_idx[1:]) > 1: verb = 'are' 1714 else: verb = 'is' 1715 err_str = 'Syntax Error (%s): If %s%02i is present, then %s %s required'\ 1716 % (syntax_str(syn), seg_data.get_seg_id(), syn_idx[0], \ 1717 syntax_ele_id_str(seg_data.get_seg_id(), syn_idx[1:]), verb) 1718 return (False, err_str) 1719 else: 1720 return (True, None) 1721 else: 1722 return (True, None) 1723 elif syn_code == 'L': 1724 if len(seg_data) > syn_idx[0]-1 and seg_data.get_value('%02i' % (syn_idx[0])) != '': 1725 count = 0 1726 for s in syn_idx[1:]: 1727 if len(seg_data) >= s and seg_data.get_value('%02i' % (s)) != '': 1728 count += 1 1729 if count == 0: 1730 err_str = 'Syntax Error (%s): If %s%02i is present, then at least one of '\ 1731 % (syntax_str(syn), seg_data.get_seg_id(), syn_idx[0]) 1732 err_str += syntax_ele_id_str(seg_data.get_seg_id(), syn_idx[1:]) 1733 err_str += ' is required' 1734 return (False, err_str) 1735 else: 1736 return (True, None) 1737 else: 1738 return (True, None) 1739 #raise errors.EngineError 1740 return (False, 'Syntax Type %s Not Found' % (syntax_str(syn)))
1741
1742 -def syntax_str(syntax):
1743 """ 1744 @rtype: string 1745 """ 1746 output = syntax[0] 1747 for i in syntax[1:]: 1748 output += '%02i' % (i) 1749 return output
1750
1751 -def syntax_ele_id_str(seg_id, ele_pos_list):
1752 """ 1753 @rtype: string 1754 """ 1755 output = '' 1756 output += '%s%02i' % (seg_id, ele_pos_list[0]) 1757 for i in range(len(ele_pos_list)-1): 1758 if i == len(ele_pos_list)-2: 1759 output += ' or %s%02i' % (seg_id, ele_pos_list[i+1]) 1760 else: 1761 output += ', %s%02i' % (seg_id, ele_pos_list[i+1]) 1762 return output
1763
1764 -def IsValidDataType(str_val, data_type, charset = 'B'):
1765 """ 1766 Is str_val a valid X12 data value 1767 1768 @param str_val: data value to validate 1769 @type str_val: string 1770 @param data_type: X12 data element identifier 1771 @type data_type: string 1772 @param charset: [optional] - 'B' for Basic X12 character set, 'E' for extended 1773 @type charset: string 1774 @rtype: boolean 1775 """ 1776 if not data_type: 1777 return True 1778 if type(str_val) is not StringType: 1779 return False 1780 1781 try: 1782 if data_type[0] == 'N': 1783 if not match_re('N', str_val): 1784 raise IsValidError # not a number 1785 elif data_type == 'R': 1786 if not match_re('R', str_val): 1787 raise IsValidError # not a number 1788 elif data_type in ('ID', 'AN'): 1789 if not_match_re('ID', str_val, charset): 1790 raise IsValidError 1791 elif data_type == 'RD8': 1792 if '-' in str_val: 1793 (start, end) = str_val.split('-') 1794 return IsValidDataType(start, 'D8', charset) and IsValidDataType(end, 'D8', charset) 1795 else: 1796 return False 1797 elif data_type in ('DT', 'D8', 'D6'): 1798 if not is_valid_date(data_type, str_val): 1799 raise IsValidError 1800 elif data_type == 'TM': 1801 if not is_valid_time(str_val): 1802 raise IsValidError 1803 elif data_type == 'B': 1804 pass 1805 else: 1806 raise IsValidError, 'Unknown data type %s' % data_type 1807 except IsValidError: 1808 return False 1809 return True
1810 1811 rec_N = re.compile("^-?[0-9]+", re.S) 1812 rec_R = re.compile("^-?[0-9]*(\.[0-9]+)?", re.S) 1813 rec_ID_E = re.compile("[^A-Z0-9!\"&'()*+,\-\\\./:;?=\sa-z%~@\[\]_{}\\\|<>#$\s]", re.S) 1814 rec_ID_B = re.compile("[^A-Z0-9!\"&'()*+,\-\\\./:;?=\s]", re.S) 1815 rec_DT = re.compile("[^0-9]+", re.S) 1816 rec_TM = re.compile("[^0-9]+", re.S) 1817
1818 -def match_re(short_data_type, val):
1819 """ 1820 @param short_data_type: simplified data type 1821 @type short_data_type: string 1822 @param val: data value to be verified 1823 @type val: string 1824 @return: True if matched, False if not 1825 @rtype: boolean 1826 """ 1827 if short_data_type == 'N': 1828 rec = rec_N 1829 elif short_data_type == 'R': 1830 rec = rec_R 1831 else: 1832 raise errors.EngineError, 'Unknown data type %s' % (short_data_type) 1833 m = rec.search(val) 1834 if not m: 1835 return False 1836 if m.group(0) != val: # matched substring != original, bad 1837 return False # nothing matched 1838 return True
1839
1840 -def not_match_re(short_data_type, val, charset = 'B'):
1841 """ 1842 @param short_data_type: simplified data type 1843 @type short_data_type: string 1844 @param val: data value to be verified 1845 @type val: string 1846 @param charset: [optional] - 'B' for Basic X12 character set, 'E' for extended 1847 @type charset: string 1848 @return: True if found invalid characters, False if none 1849 @rtype: boolean 1850 """ 1851 if short_data_type in ('ID', 'AN'): 1852 if charset == 'E': # extended charset 1853 rec = rec_ID_E 1854 elif charset == 'B': # basic charset: 1855 rec = rec_ID_B 1856 elif short_data_type == 'DT': 1857 rec = rec_DT 1858 elif short_data_type == 'TM': 1859 rec = rec_TM 1860 else: 1861 raise errors.EngineError, 'Unknown data type %s' % (short_data_type) 1862 m = rec.search(val) 1863 if m and m.group(0): 1864 return True # Invalid char matched 1865 return False
1866
1867 -def is_valid_date(data_type, val):
1868 """ 1869 @param data_type: Date type 1870 @type data_type: string 1871 @param val: data value to be verified 1872 @type val: string 1873 @return: True if valid, False if not 1874 @rtype: boolean 1875 """ 1876 try: 1877 if data_type == 'D8' and len(val) != 8: 1878 raise IsValidError 1879 if data_type == 'D6' and len(val) != 6: 1880 raise IsValidError 1881 if not_match_re('DT', val): 1882 raise IsValidError 1883 if len(val) in (6, 8, 12): # valid lengths for date 1884 try: 1885 if 6 == len(val): # if 2 digit year, add CC 1886 if val[0:2] < 50: 1887 val = '20' + val 1888 else: 1889 val = '19' + val 1890 year = int(val[0:4]) # get year 1891 month = int(val[4:6]) 1892 day = int(val[6:8]) 1893 # Should not have dates before 1/1/1800 1894 if year < 1800: 1895 raise IsValidError 1896 # check month 1897 if month < 1 or month > 12: 1898 raise IsValidError 1899 if month in (1, 3, 5, 7, 8, 10, 12): # 31 day month 1900 if day < 1 or day > 31: 1901 raise IsValidError 1902 elif month in (4, 6, 9, 11): # 30 day month 1903 if day < 1 or day > 30: 1904 raise IsValidError 1905 else: # else 28 day 1906 if not year%4 and not (not year%100 and year%400): 1907 #if not (year % 4) and ((year % 100) or (not (year % 400)) ): # leap year 1908 if day < 1 or day > 29: 1909 raise IsValidError 1910 elif day < 1 or day > 28: 1911 raise IsValidError 1912 if len(val) == 12: 1913 if not is_valid_time(val[8:12]): 1914 raise IsValidError 1915 except TypeError: 1916 raise IsValidError 1917 else: 1918 raise IsValidError 1919 except IsValidError: 1920 return False 1921 return True
1922
1923 -def is_valid_time(val):
1924 """ 1925 @param val: time value to be verified 1926 @type val: string 1927 """ 1928 try: 1929 not_match_re('TM', val) 1930 if val[0:2] > '23' or val[2:4] > '59': # check hour, minute segment 1931 raise IsValidError 1932 elif len(val) > 4: # time contains seconds 1933 if len(val) < 6: # length is munted 1934 raise IsValidError 1935 elif val[4:6] > '59': # check seconds 1936 raise IsValidError 1937 # check decimal seconds here in the future 1938 elif len(val) > 8: 1939 # print 'unhandled decimal seconds encountered' 1940 raise IsValidError 1941 except IsValidError: 1942 return False 1943 return True
1944