#!/usr/bin/env python3 from abc import ABC, abstractmethod import typing import sys class DataProcessor(ABC): def __init__(self, name: str) -> None: self.storage: typing.List[typing.Any] = [] self.rank: int = 0 self.counter: int = 0 self.name = name @abstractmethod def validate(self, data: typing.Any) -> bool: pass @abstractmethod def ingest(self, data: typing.Any) -> None: pass def output(self) -> tuple[int, str]: # extract the oldest piece of data # piece of data is removed position = self.rank self.rank += 1 if isinstance(self.storage[0], dict): entry = self.storage[0] output_tuple: tuple = tuple(entry.values()) to_return = (f"{output_tuple[0]}: {output_tuple[1]}") self.storage.pop(0) return position, to_return to_return = self.storage[0] self.storage.pop(0) return position, to_return class NumericProcessor(DataProcessor): def validate(self, data: int | float | typing.List[int | float]) -> bool: try: if isinstance(data, int): self.counter += 1 return True elif isinstance(data, float): self.counter += 1 return True elif isinstance(data, typing.List): type_list = "yes" for item in data: self.counter += 1 if not isinstance(item, int | float): type_list = "no" self.counter -= 1 if type_list == "yes": return True else: raise ValueError() else: raise ValueError() except ValueError: return False def ingest(self, data: int | float | typing.List[int | float]) -> None: try: if isinstance(data, typing.List): for item in data: if isinstance(item, int | float): self.storage.append(item) elif isinstance(data, int | float): self.storage.append(data) else: raise ValueError() except ValueError: print("Improper numeric data", file=sys.stderr) def is_number(s: str) -> bool: try: float(s) return True except (ValueError, TypeError): return False class TextProcessor(DataProcessor): def validate(self, data: str | typing.List[str]) -> bool: try: if isinstance(data, str): if is_number(data): raise ValueError() else: self.counter += 1 return True elif isinstance(data, typing.List): for item in data: if isinstance(item, str): if is_number(item): raise ValueError() return False else: self.counter += 1 if isinstance(item, dict): raise ValueError() return False return True else: raise ValueError() except ValueError: return False def ingest(self, data: str | typing.List[str]) -> None: try: if isinstance(data, str): self.storage.append(data) elif isinstance(data, typing.List): for item in data: if is_number(item): raise ValueError() elif isinstance(item, dict): raise ValueError() else: self.storage.append(item) else: raise ValueError() except ValueError: print("Improper text data", file=sys.stderr) class LogProcessor(DataProcessor): def validate(self, data: dict | typing.List[dict]) -> bool: try: if isinstance(data, dict): return True if isinstance(data, typing.List): for item in data: if isinstance(item, dict): self.counter += 1 else: raise ValueError() return False return True else: raise ValueError() except ValueError: return False def ingest(self, data: dict | typing.List[dict]) -> None: try: if isinstance(data, dict): for item in data: self.storage.append(item) elif isinstance(data, typing.List): for item in data: if isinstance(item, dict): self.storage.append(item) else: raise ValueError() except ValueError: print("Improper log data", file=sys.stderr) class DataStream(): def __init__(self) -> None: self.processor: list = [] self.counter: int = 0 def register_processor(self, proc: DataProcessor) -> None: # method that allows you to register a new data processor to process # the data stream. self.processor.append(proc) def process_stream(self, stream: list[typing.Any]) -> None: # method that will analyze each element of the list received as a # parameter and send it to the appropriate registered data processor. # Error messages will be printed if no data processor can handle an # element for item in stream: try: for processor in self.processor: # route towards appropriate processors if processor.validate(item): processor.ingest(item) # stop when appropriate processors is found break else: raise ValueError() except ValueError: print(f"Data Stream Error - Can't process " f"element in stream: {item}", file=sys.stderr) def print_processors_stats(self) -> None: if self.processor == []: print("No processor found, no data") else: for processor in self.processor: remaining = len(processor.storage) print(f"{processor.name}: total {processor.counter} items " f"processed, remaining {remaining} on processor") def consume_processors(self, rm_1: int, rm_2: int, rm_3: int) -> None: for processor in self.processor: if processor.name == "Numeric Processor": for _ in range(rm_1): processor.output() elif processor.name == "Text Processor": for _ in range(rm_2): processor.output() elif processor.name == "Log Processor": for _ in range(rm_3): processor.output() def main() -> None: # Create a test scenario that demonstrates the correct processing of a data # stream. Display statistics on registered data processors, consume # elements using the output method of each data processor and show updated # statistics print("=== Code Nexus - Data Stream ===") print() print("Initialize Data Stream...") print() processors = DataStream() print("=== DataStream statistic ===") processors.print_processors_stats() print() # initialize processors is_num = NumericProcessor("Numeric Processor") print("Registering Numeric Processor") print() processors.register_processor(is_num) batch_one = ['Hello world', [3.14, -1, 2.71], [{ 'log_level': 'WARNING', ' log_message': 'Telnet access! Use ssh instead' }, { 'log_level': 'INFO', 'log_message': 'User wil is connected' }], 42, ['Hi', 'five']] print(f"Sending first batch of data on stream: {batch_one}") processors.process_stream(batch_one) print() print("=== DataStream statistic ===") processors.print_processors_stats() print() print("Registering other data processors") is_text = TextProcessor("Text Processor") is_log = LogProcessor("Log Processor") processors.register_processor(is_text) processors.register_processor(is_log) print() print("Send the same batch again") processors.process_stream(batch_one) print() print("=== DataStream statistic ===") processors.print_processors_stats() print() consume_num = 3 consume_text = 2 consume_log = 1 print(f"Consume some elements from the data processors:" f" Numeric {consume_num}, Text {consume_text}, Log {consume_log}") processors.consume_processors(consume_num, consume_text, consume_log) print("=== DataStream statistic ===") processors.print_processors_stats() if __name__ == "__main__": main()