#!/usr/bin/env python3 """ XML Generator 2.0 CLI Tool Processes XLSX or CSV files to generate XML with pack_content sections. Includes validation against set dictionary rules. """ import click import csv import xml.etree.ElementTree as ET from collections import defaultdict, Counter import re import uuid from datetime import datetime from typing import Dict, List, Optional, Tuple, Any try: import openpyxl except ImportError: pass # Will be handled with an explicit error inside DataReader if needed def _safe_str(val: Any) -> str: """Safely convert Excel cell values to string, preserving formatting as best as possible.""" if val is None: return "" if isinstance(val, float): # Prevent outputting '1.0' for integer counts or GTINs that were read as floats if val.is_integer(): return str(int(val)) return str(val).strip() class DataReader: """Handles XLSX and CSV file reading and data processing.""" @staticmethod def _read_csv(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]: """Read CSV file and return headers and rows.""" with open(file_path, 'r', encoding='utf-8-sig') as file: reader = csv.DictReader(file, delimiter=';') headers = [field.lstrip('\ufeff') for field in reader.fieldnames] if reader.fieldnames else [] rows = [] for row in reader: clean_row = {k.lstrip('\ufeff') if isinstance(k, str) else k: v for k, v in row.items()} rows.append(clean_row) return headers, rows @staticmethod def _read_xlsx(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]: """Read XLSX file and return headers and rows.""" try: import openpyxl except ImportError: raise ImportError("Please install openpyxl to read .xlsx files (pip install openpyxl)") wb = openpyxl.load_workbook(file_path, data_only=True) sheet = wb.active # Get headers headers = [_safe_str(cell.value) for cell in sheet[1]] rows = [] for row in sheet.iter_rows(min_row=2, values_only=True): if any(cell is not None for cell in row): row_dict = dict(zip(headers, [_safe_str(cell) for cell in row])) rows.append(row_dict) return headers, rows @staticmethod def _read_file(file_path: str) -> Tuple[List[str], List[Dict[str, Any]]]: """Route to appropriate reader based on file extension.""" if str(file_path).lower().endswith('.xlsx'): return DataReader._read_xlsx(file_path) else: return DataReader._read_csv(file_path) @staticmethod def read_data_with_gtins(file_path: str, cis_column: str = "Код") -> Dict[str, List[Dict[str, str]]]: """Read data file and group data by SET CIS with GTIN information.""" pack_data = defaultdict(list) headers, rows = DataReader._read_file(file_path) for row in rows: # Поддержка новых и старых названий колонок set_cis = row.get('SET_CODE', row.get('SET CIS', '')) cis_code = row.get('ITEM_CODE', row.get(cis_column, '')) set_gtin = row.get('SET_GTIN', row.get('SET GTIN', '')) gtin = row.get('ITEM_GTIN', row.get('GTIN', '')) if set_cis and cis_code: pack_data[set_cis].append({ 'cis_code': cis_code, 'set_gtin': set_gtin, 'gtin': gtin }) return pack_data @staticmethod def read_data_simple(file_path: str, cis_column: str = "CIS") -> Dict[str, List[str]]: """Read data file and group CIS codes by SET CIS.""" pack_data = defaultdict(list) headers, rows = DataReader._read_file(file_path) for row in rows: # Поддержка новых и старых названий колонок set_cis = row.get('SET_CODE', row.get('SET CIS', '')) cis_code = row.get('ITEM_CODE', row.get(cis_column, '')) if set_cis and cis_code: pack_data[set_cis].append(cis_code) return pack_data class SetDictionary: """Handles set dictionary loading and management.""" def __init__(self, dict_file_path: str): self.rules = self._load_set_dict(dict_file_path) def _load_set_dict(self, dict_file_path: str) -> Dict[str, List[Dict[str, Any]]]: """Load set dictionary rules from XLSX or CSV file.""" set_rules = defaultdict(list) headers, rows = DataReader._read_file(dict_file_path) for row in rows: # Support both new XLSX format and old CSV format gtin_set = row.get('GTIN_FULL', row.get('GTIN SET', '')) gtin_item = row.get('GTIN_ITEM_FULL', row.get('GTIN ITEM', '')) count_val = row.get('Количество штук в упаковке', row.get('COUNT', '')) set_name = row.get('Полное наименование товара', row.get('SET NAME', '')) if gtin_set and gtin_item and count_val != '': try: count_num = float(count_val) set_rules[gtin_set].append({ 'gtin_item': gtin_item, 'count': count_num, 'set_name': set_name }) except ValueError: continue return set_rules def get_rules(self) -> Dict[str, List[Dict[str, Any]]]: """Get all loaded rules.""" return self.rules def get_rule_count(self) -> int: """Get the number of loaded rules.""" return len(self.rules) class PackValidator: """Handles pack composition validation.""" def __init__(self, set_dictionary: SetDictionary): self.set_dict = set_dictionary def validate_composition(self, pack_data: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, str]]: """Validate pack composition against set dictionary rules.""" validation_results = [] for set_cis, items in pack_data.items(): if not items: continue result = self._validate_single_pack(set_cis, items) validation_results.append(result) return validation_results def _validate_single_pack(self, set_cis: str, items: List[Dict[str, str]]) -> Dict[str, str]: """Validate a single pack composition.""" # Get SET GTIN from first item (should be same for all items in pack) set_gtin = items[0]['set_gtin'] # Get expected composition from set rules expected_items = self.set_dict.get_rules().get(set_gtin, []) if not expected_items: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'status': 'WARNING', 'message': f'No rules found for SET GTIN: {set_gtin}' } # Count actual GTINs actual_gtins = [item['gtin'] for item in items if item['gtin']] actual_counts = Counter(actual_gtins) # Build expected counts expected_counts = {} set_name = '' for item in expected_items: expected_counts[item['gtin_item']] = int(item['count']) if not set_name: set_name = item['set_name'] # Validate composition errors = self._check_composition_errors(actual_counts, expected_counts) warnings = self._check_composition_warnings(actual_counts, expected_counts) # Compile results if errors: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'set_name': set_name, 'status': 'ERROR', 'message': '; '.join(errors) } elif warnings: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'set_name': set_name, 'status': 'WARNING', 'message': '; '.join(warnings) } else: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'set_name': set_name, 'status': 'OK', 'message': 'Composition is valid' } def _check_composition_errors(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]: """Check for missing items or wrong counts.""" errors = [] for expected_gtin, expected_count in expected_counts.items(): actual_count = actual_counts.get(expected_gtin, 0) if actual_count == 0: errors.append(f'Missing GTIN {expected_gtin} (expected {expected_count})') elif actual_count != expected_count: errors.append(f'Wrong count for GTIN {expected_gtin}: got {actual_count}, expected {expected_count}') return errors def _check_composition_warnings(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]: """Check for unexpected items.""" warnings = [] for actual_gtin, actual_count in actual_counts.items(): if actual_gtin not in expected_counts: warnings.append(f'Unexpected GTIN {actual_gtin} (count: {actual_count})') return warnings class XMLGenerator: """Handles XML generation and template processing.""" @staticmethod def escape_xml_content(text: str) -> str: """Properly escape XML content for CDATA sections.""" # Note: XML CDATA does not strictly require escaping, # but kept here for potential expansion. return text @staticmethod def generate_pack_content_xml(pack_data: Dict[str, List[str]]) -> List[str]: """Generate pack_content XML elements from grouped data.""" pack_contents = [] for set_cis, cis_codes in pack_data.items(): pack_content_lines = [] pack_content_lines.append(f' ') pack_content_lines.append(f' ') for cis_code in cis_codes: escaped_cis = XMLGenerator.escape_xml_content(cis_code) pack_content_lines.append(f' ') pack_content_lines.append(f' ') pack_contents.append('\n'.join(pack_content_lines)) return pack_contents @staticmethod def process_xml_template(template_path: str, pack_contents: List[str], document_id: Optional[str] = None, document_number: Optional[str] = None, operation_time: Optional[str] = None) -> str: """Process XML template and insert pack_content sections with parameter substitution.""" with open(template_path, 'r', encoding='utf-8') as file: template_content = file.read() # Replace template parameters if provided template_content = XMLGenerator._replace_template_parameters( template_content, document_id, document_number, operation_time ) # Insert pack content return XMLGenerator._insert_pack_content(template_content, pack_contents) @staticmethod def _replace_template_parameters(template_content: str, document_id: Optional[str], document_number: Optional[str], operation_time: Optional[str]) -> str: """Replace template parameters in XML content.""" if document_id: template_content = re.sub(r'document_id="[^"]*"', f'document_id="{document_id}"', template_content) if document_number: template_content = re.sub(r'document_number="[^"]*"', f'document_number="{document_number}"', template_content) if operation_time: template_content = re.sub(r'operation_date_time="[^"]*"', f'operation_date_time="{operation_time}"', template_content) return template_content @staticmethod def _insert_pack_content(template_content: str, pack_contents: List[str]) -> str: """Insert pack_content sections into template.""" lines = template_content.split('\n') result_lines = [] pack_content_inserted = False inside_pack_content = False for line in lines: if '' in line and not pack_content_inserted: inside_pack_content = True continue elif '' in line and not pack_content_inserted: inside_pack_content = False continue elif inside_pack_content and not pack_content_inserted: continue elif '' in line and not pack_content_inserted: for pack_content in pack_contents: result_lines.append(pack_content) pack_content_inserted = True result_lines.append(line) else: result_lines.append(line) return '\n'.join(result_lines) class ParameterGenerator: """Handles parameter generation and validation.""" @staticmethod def generate_document_id() -> str: """Generate a unique document ID.""" return f"unit_pack_{str(uuid.uuid4()).replace('-', '').upper()}" @staticmethod def generate_operation_time() -> str: """Generate current operation time in ISO format.""" return datetime.now().isoformat() @staticmethod def validate_operation_time(operation_time: str) -> bool: """Validate operation time format.""" try: datetime.fromisoformat(operation_time.replace('Z', '+00:00')) return True except ValueError: return False class ValidationReporter: """Handles validation result reporting.""" @staticmethod def report_validation_summary(validation_results: List[Dict[str, str]]) -> Tuple[int, int, int]: """Report validation summary and return counts.""" errors = sum(1 for r in validation_results if r['status'] == 'ERROR') warnings = sum(1 for r in validation_results if r['status'] == 'WARNING') ok = sum(1 for r in validation_results if r['status'] == 'OK') click.echo(f"Validation results: {ok} OK, {warnings} warnings, {errors} errors") return ok, warnings, errors @staticmethod def report_validation_details(validation_results: List[Dict[str, str]], show_ok: bool = False): """Report detailed validation results.""" for result in validation_results: if result['status'] == 'ERROR': click.echo(f"❌ ERROR: {result['set_cis']} - {result['message']}", err=True) elif result['status'] == 'WARNING': click.echo(f"⚠️ WARNING: {result['set_cis']} - {result['message']}") elif show_ok: click.echo(f"✅ OK: {result['set_cis']} - {result['message']}") class DryRunReporter: """Handles dry run reporting.""" @staticmethod def report_pack_data_preview(pack_data: Dict[str, List], max_items: int = 3): """Report pack data preview for dry run.""" click.echo("\nDry run - would process:") for set_cis, items in list(pack_data.items())[:max_items]: click.echo(f" SET CIS: {set_cis}") click.echo(f" CIS codes: {len(items)} items") for item in items[:2]: # Show first 2 CIS codes if isinstance(item, dict): click.echo(f" - {item['cis_code']}") else: click.echo(f" - {item}") if len(items) > 2: click.echo(f" ... and {len(items) - 2} more") if len(pack_data) > max_items: click.echo(f" ... and {len(pack_data) - max_items} more SET CIS codes") @staticmethod def report_parameters(document_id: str, document_number: Optional[str], operation_time: str): """Report parameters that would be used.""" click.echo(f"\nWould use parameters:") click.echo(f" Document ID: {document_id}") click.echo(f" Document Number: {document_number or 'Not specified'}") click.echo(f" Operation Time: {operation_time}") class XMLGeneratorApp: """Main application class for XML generation.""" def __init__(self, input_file: str, template_file: str, cis_column: str = "Код", set_dict_file: Optional[str] = None): self.input_file = input_file self.template_file = template_file self.cis_column = cis_column self.set_dict_file = set_dict_file # Initialize components self.data_reader = DataReader() self.xml_generator = XMLGenerator() self.param_generator = ParameterGenerator() self.validation_reporter = ValidationReporter() self.dry_run_reporter = DryRunReporter() # Initialize optional components self.set_dictionary = None self.validator = None if set_dict_file: self.set_dictionary = SetDictionary(set_dict_file) self.validator = PackValidator(self.set_dictionary) def load_data(self) -> Dict[str, List]: """Load Data from XLSX/CSV with or without GTIN information.""" click.echo(f"Reading file: {self.input_file}") click.echo(f"Using CIS column: {self.cis_column}") if self.set_dict_file: return self.data_reader.read_data_with_gtins(self.input_file, self.cis_column) else: pack_data_simple = self.data_reader.read_data_simple(self.input_file, self.cis_column) # Convert to format expected by validation logic pack_data = {} for set_cis, cis_codes in pack_data_simple.items(): pack_data[set_cis] = [{'cis_code': cis} for cis in cis_codes] return pack_data def load_validation_rules(self) -> Optional[SetDictionary]: """Load validation rules if dictionary file is provided.""" if not self.set_dict_file: return None click.echo(f"Loading set dictionary: {self.set_dict_file}") click.echo(f"Loaded {self.set_dictionary.get_rule_count()} set rules") return self.set_dictionary def validate_data(self, pack_data: Dict[str, List[Dict[str, str]]]) -> Tuple[List[Dict[str, str]], bool]: """Validate pack composition and return results with error flag.""" if not self.validator: return [], False click.echo("Validating pack composition...") validation_results = self.validator.validate_composition(pack_data) # Report validation summary ok, warnings, errors = self.validation_reporter.report_validation_summary(validation_results) return validation_results, errors > 0 def generate_parameters(self, document_id: Optional[str], document_number: Optional[str], operation_time: Optional[str]) -> Tuple[str, Optional[str], str]: """Generate or validate parameters.""" if not document_id: document_id = self.param_generator.generate_document_id() if not operation_time: operation_time = self.param_generator.generate_operation_time() elif not self.param_generator.validate_operation_time(operation_time): click.echo(f"Warning: Invalid operation time format: {operation_time}", err=True) return document_id, document_number, operation_time def process_dry_run(self, pack_data: Dict[str, List], validation_results: List[Dict[str, str]], document_id: str, document_number: Optional[str], operation_time: str): """Process dry run mode.""" # Show validation details if available if validation_results: self.validation_reporter.report_validation_details(validation_results, show_ok=True) # Show data preview self.dry_run_reporter.report_pack_data_preview(pack_data) # Show parameters self.dry_run_reporter.report_parameters(document_id, document_number, operation_time) def generate_xml_output(self, pack_data: Dict[str, List], document_id: str, document_number: Optional[str], operation_time: str) -> str: """Generate XML output from pack data.""" # Convert pack_data to simple format for XML generation if needed if self.set_dict_file: simple_pack_data = {} for set_cis, items in pack_data.items(): simple_pack_data[set_cis] = [item['cis_code'] for item in items] pack_data = simple_pack_data # Generate pack_content XML sections click.echo("Generating pack_content sections...") pack_contents = self.xml_generator.generate_pack_content_xml(pack_data) # Process template click.echo(f"Processing template: {self.template_file}") return self.xml_generator.process_xml_template( self.template_file, pack_contents, document_id, document_number, operation_time ) def save_or_print_output(self, xml_content: str, output_file: Optional[str]): """Save XML to file or print to stdout.""" # Если выходной файл не указан, генерируем его имя на основе входного if not output_file: import os base_name, _ = os.path.splitext(self.input_file) output_file = f"{base_name}_output.xml" with open(output_file, 'w', encoding='utf-8') as f: f.write(xml_content) click.echo(f"XML generated successfully: {output_file}") @click.command() @click.argument('input_file', type=click.Path(exists=True, dir_okay=False)) @click.argument('template_file', type=click.Path(exists=True, dir_okay=False)) @click.option('--output', '-o', type=click.Path(dir_okay=False), help='Output XML file path. If not specified, generates [input_filename]_output.xml in the same directory.') @click.option('--cis-column', '-c', default='CIS', help='Legacy column name for CIS codes in data file if ITEM_CODE is not found (default: "CIS")') @click.option('--dry-run', is_flag=True, help='Show what would be processed without generating output') @click.option('--set-dict', type=click.Path(exists=True, dir_okay=False), help='Path to set dictionary (XLSX/CSV) file for validation') @click.option('--document-id', type=str, help='Document ID to use in XML (auto-generated if not provided)') @click.option('--document-number', type=str, help='Document number to use in XML') @click.option('--operation-time', type=str, help='Operation time in ISO format (auto-generated if not provided)') @click.option('--validate-only', is_flag=True, help='Only validate composition without generating XML') def generate_xml(input_file: str, template_file: str, output: Optional[str], cis_column: str, dry_run: bool, set_dict: Optional[str], document_id: Optional[str], document_number: Optional[str], operation_time: Optional[str], validate_only: bool): """ Generate XML file from XLSX/CSV data and template with validation. INPUT_FILE: Path to XLSX or CSV file containing SET CIS and CIS codes TEMPLATE_FILE: Path to XML template file The INPUT_FILE should contain columns: - 'SET_CODE' (or 'SET CIS'): Pack codes that will become elements - 'ITEM_CODE' (or column specified by --cis-column): Individual CIS codes that will become elements - 'SET_GTIN' (or 'SET GTIN'): SET GTIN codes for validation - 'ITEM_GTIN' (or 'GTIN'): Individual GTIN codes for validation """ try: # Initialize application app = XMLGeneratorApp(input_file, template_file, cis_column, set_dict) # Load validation rules if provided app.load_validation_rules() # Load data pack_data = app.load_data() if not pack_data: click.echo("No data found in file or no matching columns.", err=True) return click.echo(f"Found {len(pack_data)} unique SET CIS codes") # Validate composition if dictionary is provided validation_results, has_errors = app.validate_data(pack_data) # Show detailed validation results if validation_results: app.validation_reporter.report_validation_details(validation_results, show_ok=dry_run) if has_errors and not dry_run and not validate_only: click.echo("Errors found. Use --dry-run to see all issues or fix them before generating XML.", err=True) return if validate_only: click.echo("Validation complete. Use without --validate-only to generate XML.") return # Generate parameters document_id, document_number, operation_time = app.generate_parameters( document_id, document_number, operation_time ) if dry_run: app.process_dry_run(pack_data, validation_results, document_id, document_number, operation_time) return # Generate XML output xml_content = app.generate_xml_output(pack_data, document_id, document_number, operation_time) # Save or print output app.save_or_print_output(xml_content, output) except Exception as e: click.echo(f"Error: {str(e)}", err=True) raise click.Abort() if __name__ == '__main__': generate_xml()