#!/usr/bin/env python3 """ XML Generator CLI Tool Processes semicolon-separated CSV files to generate XML with pack_content sections. Includes validation against set dictionary rules. """ import click import csv import xml.etree.ElementTree as ET from collections import defaultdict, Counter from pathlib import Path import html import re import uuid from datetime import datetime from typing import Dict, List, Optional, Tuple, Any class CSVReader: """Handles CSV file reading and data processing.""" @staticmethod def clean_csv_columns(fieldnames: List[str]) -> List[str]: """Clean CSV column names by removing BOM characters.""" return [field.lstrip('\ufeff') for field in fieldnames] @staticmethod def read_csv_with_gtins(file_path: str, cis_column: str = "Код") -> Dict[str, List[Dict[str, str]]]: """Read CSV file and group data by SET CIS with GTIN information.""" pack_data = defaultdict(list) with open(file_path, 'r', encoding='utf-8-sig') as file: reader = csv.DictReader(file, delimiter=';') if reader.fieldnames: reader.fieldnames = CSVReader.clean_csv_columns(reader.fieldnames) for row in reader: set_cis = row.get('SET CIS', '').strip() cis_code = row.get(cis_column, '').strip() set_gtin = row.get('SET GTIN', '').strip() gtin = row.get('GTIN', '').strip() if set_cis and cis_code: pack_data[set_cis].append({ 'cis_code': cis_code, 'set_gtin': set_gtin, 'gtin': gtin }) return pack_data @staticmethod def read_csv_simple(file_path: str, cis_column: str = "CIS") -> Dict[str, List[str]]: """Read CSV file and group CIS codes by SET CIS.""" pack_data = defaultdict(list) with open(file_path, 'r', encoding='utf-8-sig') as file: reader = csv.DictReader(file, delimiter=';') if reader.fieldnames: reader.fieldnames = CSVReader.clean_csv_columns(reader.fieldnames) for row in reader: set_cis = row.get('SET CIS', '').strip() cis_code = row.get(cis_column, '').strip() if set_cis and cis_code: pack_data[set_cis].append(cis_code) return pack_data class SetDictionary: """Handles set dictionary loading and management.""" def __init__(self, dict_file_path: str): self.rules = self._load_set_dict(dict_file_path) def _load_set_dict(self, dict_file_path: str) -> Dict[str, List[Dict[str, Any]]]: """Load set dictionary rules from CSV file.""" set_rules = defaultdict(list) with open(dict_file_path, 'r', encoding='utf-8-sig') as file: reader = csv.DictReader(file, delimiter=';') if reader.fieldnames: reader.fieldnames = CSVReader.clean_csv_columns(reader.fieldnames) for row in reader: gtin_set = row.get('GTIN SET', '').strip() gtin_item = row.get('GTIN ITEM', '').strip() count = row.get('COUNT', '').strip() if gtin_set and gtin_item and count: try: count_num = float(count) set_rules[gtin_set].append({ 'gtin_item': gtin_item, 'count': count_num, 'set_name': row.get('SET NAME', '').strip() }) except ValueError: continue return set_rules def get_rules(self) -> Dict[str, List[Dict[str, Any]]]: """Get all loaded rules.""" return self.rules def get_rule_count(self) -> int: """Get the number of loaded rules.""" return len(self.rules) class PackValidator: """Handles pack composition validation.""" def __init__(self, set_dictionary: SetDictionary): self.set_dict = set_dictionary def validate_composition(self, pack_data: Dict[str, List[Dict[str, str]]]) -> List[Dict[str, str]]: """Validate pack composition against set dictionary rules.""" validation_results = [] for set_cis, items in pack_data.items(): if not items: continue result = self._validate_single_pack(set_cis, items) validation_results.append(result) return validation_results def _validate_single_pack(self, set_cis: str, items: List[Dict[str, str]]) -> Dict[str, str]: """Validate a single pack composition.""" # Get SET GTIN from first item (should be same for all items in pack) set_gtin = items[0]['set_gtin'] # Get expected composition from set rules expected_items = self.set_dict.get_rules().get(set_gtin, []) if not expected_items: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'status': 'WARNING', 'message': f'No rules found for SET GTIN: {set_gtin}' } # Count actual GTINs actual_gtins = [item['gtin'] for item in items if item['gtin']] actual_counts = Counter(actual_gtins) # Build expected counts expected_counts = {} set_name = '' for item in expected_items: expected_counts[item['gtin_item']] = int(item['count']) if not set_name: set_name = item['set_name'] # Validate composition errors = self._check_composition_errors(actual_counts, expected_counts) warnings = self._check_composition_warnings(actual_counts, expected_counts) # Compile results if errors: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'set_name': set_name, 'status': 'ERROR', 'message': '; '.join(errors) } elif warnings: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'set_name': set_name, 'status': 'WARNING', 'message': '; '.join(warnings) } else: return { 'set_cis': set_cis, 'set_gtin': set_gtin, 'set_name': set_name, 'status': 'OK', 'message': 'Composition is valid' } def _check_composition_errors(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]: """Check for missing items or wrong counts.""" errors = [] for expected_gtin, expected_count in expected_counts.items(): actual_count = actual_counts.get(expected_gtin, 0) if actual_count == 0: errors.append(f'Missing GTIN {expected_gtin} (expected {expected_count})') elif actual_count != expected_count: errors.append(f'Wrong count for GTIN {expected_gtin}: got {actual_count}, expected {expected_count}') return errors def _check_composition_warnings(self, actual_counts: Counter, expected_counts: Dict[str, int]) -> List[str]: """Check for unexpected items.""" warnings = [] for actual_gtin, actual_count in actual_counts.items(): if actual_gtin not in expected_counts: warnings.append(f'Unexpected GTIN {actual_gtin} (count: {actual_count})') return warnings class XMLGenerator: """Handles XML generation and template processing.""" @staticmethod def escape_xml_content(text: str) -> str: """Properly escape XML content for CDATA sections.""" """Looks, like no any escaping needed in CDATA section according to text = text.replace('&', '&') text = text.replace('<', '<') text = text.replace('>', '>')""" return text @staticmethod def generate_pack_content_xml(pack_data: Dict[str, List[str]]) -> List[str]: """Generate pack_content XML elements from grouped data.""" pack_contents = [] for set_cis, cis_codes in pack_data.items(): pack_content_lines = [] pack_content_lines.append(f' ') pack_content_lines.append(f' ') for cis_code in cis_codes: escaped_cis = XMLGenerator.escape_xml_content(cis_code) pack_content_lines.append(f' ') pack_content_lines.append(f' ') pack_contents.append('\n'.join(pack_content_lines)) return pack_contents @staticmethod def process_xml_template(template_path: str, pack_contents: List[str], document_id: Optional[str] = None, document_number: Optional[str] = None, operation_time: Optional[str] = None) -> str: """Process XML template and insert pack_content sections with parameter substitution.""" with open(template_path, 'r', encoding='utf-8') as file: template_content = file.read() # Replace template parameters if provided template_content = XMLGenerator._replace_template_parameters( template_content, document_id, document_number, operation_time ) # Insert pack content return XMLGenerator._insert_pack_content(template_content, pack_contents) @staticmethod def _replace_template_parameters(template_content: str, document_id: Optional[str], document_number: Optional[str], operation_time: Optional[str]) -> str: """Replace template parameters in XML content.""" if document_id: template_content = re.sub(r'document_id="[^"]*"', f'document_id="{document_id}"', template_content) if document_number: template_content = re.sub(r'document_number="[^"]*"', f'document_number="{document_number}"', template_content) if operation_time: template_content = re.sub(r'operation_date_time="[^"]*"', f'operation_date_time="{operation_time}"', template_content) return template_content @staticmethod def _insert_pack_content(template_content: str, pack_contents: List[str]) -> str: """Insert pack_content sections into template.""" lines = template_content.split('\n') result_lines = [] pack_content_inserted = False inside_pack_content = False for line in lines: if '' in line and not pack_content_inserted: inside_pack_content = True continue elif '' in line and not pack_content_inserted: inside_pack_content = False continue elif inside_pack_content and not pack_content_inserted: continue elif '' in line and not pack_content_inserted: for pack_content in pack_contents: result_lines.append(pack_content) pack_content_inserted = True result_lines.append(line) else: result_lines.append(line) return '\n'.join(result_lines) class ParameterGenerator: """Handles parameter generation and validation.""" @staticmethod def generate_document_id() -> str: """Generate a unique document ID.""" return f"unit_pack_{str(uuid.uuid4()).replace('-', '').upper()}" @staticmethod def generate_operation_time() -> str: """Generate current operation time in ISO format.""" return datetime.now().isoformat() @staticmethod def validate_operation_time(operation_time: str) -> bool: """Validate operation time format.""" try: datetime.fromisoformat(operation_time.replace('Z', '+00:00')) return True except ValueError: return False class ValidationReporter: """Handles validation result reporting.""" @staticmethod def report_validation_summary(validation_results: List[Dict[str, str]]) -> Tuple[int, int, int]: """Report validation summary and return counts.""" errors = sum(1 for r in validation_results if r['status'] == 'ERROR') warnings = sum(1 for r in validation_results if r['status'] == 'WARNING') ok = sum(1 for r in validation_results if r['status'] == 'OK') click.echo(f"Validation results: {ok} OK, {warnings} warnings, {errors} errors") return ok, warnings, errors @staticmethod def report_validation_details(validation_results: List[Dict[str, str]], show_ok: bool = False): """Report detailed validation results.""" for result in validation_results: if result['status'] == 'ERROR': click.echo(f"❌ ERROR: {result['set_cis']} - {result['message']}", err=True) elif result['status'] == 'WARNING': click.echo(f"⚠️ WARNING: {result['set_cis']} - {result['message']}") elif show_ok: click.echo(f"✅ OK: {result['set_cis']} - {result['message']}") class DryRunReporter: """Handles dry run reporting.""" @staticmethod def report_pack_data_preview(pack_data: Dict[str, List], max_items: int = 3): """Report pack data preview for dry run.""" click.echo("\nDry run - would process:") for set_cis, items in list(pack_data.items())[:max_items]: click.echo(f" SET CIS: {set_cis}") click.echo(f" CIS codes: {len(items)} items") for item in items[:2]: # Show first 2 CIS codes if isinstance(item, dict): click.echo(f" - {item['cis_code']}") else: click.echo(f" - {item}") if len(items) > 2: click.echo(f" ... and {len(items) - 2} more") if len(pack_data) > max_items: click.echo(f" ... and {len(pack_data) - max_items} more SET CIS codes") @staticmethod def report_parameters(document_id: str, document_number: Optional[str], operation_time: str): """Report parameters that would be used.""" click.echo(f"\nWould use parameters:") click.echo(f" Document ID: {document_id}") click.echo(f" Document Number: {document_number or 'Not specified'}") click.echo(f" Operation Time: {operation_time}") # Keep the old function names for backward compatibility def read_csv_file(file_path: str, cis_column: str = "Код") -> Dict[str, List[str]]: """Legacy function for backward compatibility.""" return CSVReader.read_csv_simple(file_path, cis_column) def read_csv_file_with_gtins(file_path: str, cis_column: str = "Код") -> Dict[str, List[Dict[str, str]]]: """Legacy function for backward compatibility.""" return CSVReader.read_csv_with_gtins(file_path, cis_column) def load_set_dict(dict_file_path: str) -> Dict[str, List[Dict[str, Any]]]: """Legacy function for backward compatibility.""" return SetDictionary(dict_file_path).get_rules() def validate_pack_composition(pack_data: Dict[str, List[Dict[str, str]]], set_rules: Dict[str, List[Dict[str, Any]]]) -> List[Dict[str, str]]: """Legacy function for backward compatibility.""" # Create a temporary SetDictionary with the rules temp_dict = SetDictionary.__new__(SetDictionary) temp_dict.rules = set_rules validator = PackValidator(temp_dict) return validator.validate_composition(pack_data) def escape_xml_content(text: str) -> str: """Legacy function for backward compatibility.""" return XMLGenerator.escape_xml_content(text) def generate_pack_content_xml(pack_data: Dict[str, List[str]]) -> List[str]: """Legacy function for backward compatibility.""" return XMLGenerator.generate_pack_content_xml(pack_data) def process_xml_template(template_path: str, pack_contents: List[str], document_id: Optional[str] = None, document_number: Optional[str] = None, operation_time: Optional[str] = None) -> str: """Legacy function for backward compatibility.""" return XMLGenerator.process_xml_template(template_path, pack_contents, document_id, document_number, operation_time) class XMLGeneratorApp: """Main application class for XML generation.""" def __init__(self, csv_file: str, template_file: str, cis_column: str = "Код", set_dict_file: Optional[str] = None): self.csv_file = csv_file self.template_file = template_file self.cis_column = cis_column self.set_dict_file = set_dict_file # Initialize components self.csv_reader = CSVReader() self.xml_generator = XMLGenerator() self.param_generator = ParameterGenerator() self.validation_reporter = ValidationReporter() self.dry_run_reporter = DryRunReporter() # Initialize optional components self.set_dictionary = None self.validator = None if set_dict_file: self.set_dictionary = SetDictionary(set_dict_file) self.validator = PackValidator(self.set_dictionary) def load_data(self) -> Dict[str, List]: """Load CSV data with or without GTIN information.""" click.echo(f"Reading CSV file: {self.csv_file}") click.echo(f"Using CIS column: {self.cis_column}") if self.set_dict_file: return self.csv_reader.read_csv_with_gtins(self.csv_file, self.cis_column) else: pack_data_simple = self.csv_reader.read_csv_simple(self.csv_file, self.cis_column) # Convert to format expected by validation pack_data = {} for set_cis, cis_codes in pack_data_simple.items(): pack_data[set_cis] = [{'cis_code': cis} for cis in cis_codes] return pack_data def load_validation_rules(self) -> Optional[SetDictionary]: """Load validation rules if dictionary file is provided.""" if not self.set_dict_file: return None click.echo(f"Loading set dictionary: {self.set_dict_file}") click.echo(f"Loaded {self.set_dictionary.get_rule_count()} set rules") return self.set_dictionary def validate_data(self, pack_data: Dict[str, List[Dict[str, str]]]) -> Tuple[List[Dict[str, str]], bool]: """Validate pack composition and return results with error flag.""" if not self.validator: return [], False click.echo("Validating pack composition...") validation_results = self.validator.validate_composition(pack_data) # Report validation summary ok, warnings, errors = self.validation_reporter.report_validation_summary(validation_results) return validation_results, errors > 0 def generate_parameters(self, document_id: Optional[str], document_number: Optional[str], operation_time: Optional[str]) -> Tuple[str, Optional[str], str]: """Generate or validate parameters.""" if not document_id: document_id = self.param_generator.generate_document_id() if not operation_time: operation_time = self.param_generator.generate_operation_time() elif not self.param_generator.validate_operation_time(operation_time): click.echo(f"Warning: Invalid operation time format: {operation_time}", err=True) return document_id, document_number, operation_time def process_dry_run(self, pack_data: Dict[str, List], validation_results: List[Dict[str, str]], document_id: str, document_number: Optional[str], operation_time: str): """Process dry run mode.""" # Show validation details if available if validation_results: self.validation_reporter.report_validation_details(validation_results, show_ok=True) # Show data preview self.dry_run_reporter.report_pack_data_preview(pack_data) # Show parameters self.dry_run_reporter.report_parameters(document_id, document_number, operation_time) def generate_xml_output(self, pack_data: Dict[str, List], document_id: str, document_number: Optional[str], operation_time: str) -> str: """Generate XML output from pack data.""" # Convert pack_data to simple format for XML generation if needed if self.set_dict_file: simple_pack_data = {} for set_cis, items in pack_data.items(): simple_pack_data[set_cis] = [item['cis_code'] for item in items] pack_data = simple_pack_data # Generate pack_content XML sections click.echo("Generating pack_content sections...") pack_contents = self.xml_generator.generate_pack_content_xml(pack_data) # Process template click.echo(f"Processing template: {self.template_file}") return self.xml_generator.process_xml_template( self.template_file, pack_contents, document_id, document_number, operation_time ) def save_or_print_output(self, xml_content: str, output_file: Optional[str]): """Save XML to file or print to stdout.""" if output_file: with open(output_file, 'w', encoding='utf-8') as f: f.write(xml_content) click.echo(f"XML generated successfully: {output_file}") else: click.echo(xml_content) @click.command() @click.argument('csv_file', type=click.Path(exists=True, dir_okay=False)) @click.argument('template_file', type=click.Path(exists=True, dir_okay=False)) @click.option('--output', '-o', type=click.Path(dir_okay=False), help='Output XML file path. If not specified, prints to stdout.') @click.option('--cis-column', '-c', default='CIS', help='Column name for CIS codes in CSV file (default: "CIS")') @click.option('--encoding', '-e', default='utf-8', help='CSV file encoding (default: utf-8)') @click.option('--dry-run', is_flag=True, help='Show what would be processed without generating output') @click.option('--set-dict', type=click.Path(exists=True, dir_okay=False), help='Path to set dictionary CSV file for validation') @click.option('--document-id', type=str, help='Document ID to use in XML (auto-generated if not provided)') @click.option('--document-number', type=str, help='Document number to use in XML') @click.option('--operation-time', type=str, help='Operation time in ISO format (auto-generated if not provided)') @click.option('--validate-only', is_flag=True, help='Only validate composition without generating XML') def generate_xml(csv_file: str, template_file: str, output: Optional[str], cis_column: str, encoding: str, dry_run: bool, set_dict: Optional[str], document_id: Optional[str], document_number: Optional[str], operation_time: Optional[str], validate_only: bool): """ Generate XML file from CSV data and template with validation. CSV_FILE: Path to semicolon-separated CSV file containing SET CIS and CIS codes TEMPLATE_FILE: Path to XML template file The CSV file should contain columns: - 'SET CIS': Pack codes that will become elements - Column specified by --cis-column: Individual CIS codes that will become elements - 'SET GTIN': SET GTIN codes for validation - 'GTIN': Individual GTIN codes for validation """ try: # Initialize application app = XMLGeneratorApp(csv_file, template_file, cis_column, set_dict) # Load validation rules if provided app.load_validation_rules() # Load CSV data pack_data = app.load_data() if not pack_data: click.echo("No data found in CSV file or no matching columns.", err=True) return click.echo(f"Found {len(pack_data)} unique SET CIS codes") # Validate composition if dictionary is provided validation_results, has_errors = app.validate_data(pack_data) # Show detailed validation results if validation_results: app.validation_reporter.report_validation_details(validation_results, show_ok=dry_run) if has_errors and not dry_run and not validate_only: click.echo("Errors found. Use --dry-run to see all issues or fix them before generating XML.", err=True) return if validate_only: click.echo("Validation complete. Use without --validate-only to generate XML.") return # Generate parameters document_id, document_number, operation_time = app.generate_parameters( document_id, document_number, operation_time ) if dry_run: app.process_dry_run(pack_data, validation_results, document_id, document_number, operation_time) return # Generate XML output xml_content = app.generate_xml_output(pack_data, document_id, document_number, operation_time) # Save or print output app.save_or_print_output(xml_content, output) except Exception as e: click.echo(f"Error: {str(e)}", err=True) raise click.Abort() if __name__ == '__main__': generate_xml()