diff options
| author | yctct <yctct@yctct.com> | 2026-06-07 08:59:04 +0200 |
|---|---|---|
| committer | yctct <yctct@yctct.com> | 2026-06-07 08:59:04 +0200 |
| commit | 15115b4c52bfda0d1cca9fa1155beecbb873ec35 (patch) | |
| tree | b3f0975e63eb04dcba732a78ce9bd9abda8acf01 /py05/ex1 | |
First commit, add all files
Diffstat (limited to 'py05/ex1')
| -rwxr-xr-x | py05/ex1/data_stream.py | 278 |
1 files changed, 278 insertions, 0 deletions
diff --git a/py05/ex1/data_stream.py b/py05/ex1/data_stream.py new file mode 100755 index 0000000..b457a24 --- /dev/null +++ b/py05/ex1/data_stream.py @@ -0,0 +1,278 @@ +#!/usr/bin/env python3 + +from abc import ABC, abstractmethod +import typing +import sys + + +class DataProcessor(ABC): + def __init__(self, name: str) -> None: + self.storage: typing.List[typing.Any] = [] + self.rank: int = 0 + self.counter: int = 0 + self.name = name + + @abstractmethod + def validate(self, data: typing.Any) -> bool: + pass + + @abstractmethod + def ingest(self, data: typing.Any) -> None: + pass + + def output(self) -> tuple[int, str]: + # extract the oldest piece of data + # piece of data is removed + position = self.rank + self.rank += 1 + if isinstance(self.storage[0], dict): + entry = self.storage[0] + output_tuple: tuple = tuple(entry.values()) + to_return = (f"{output_tuple[0]}: {output_tuple[1]}") + self.storage.pop(0) + return position, to_return + to_return = self.storage[0] + self.storage.pop(0) + return position, to_return + + +class NumericProcessor(DataProcessor): + + def validate(self, data: int | float | typing.List[int | float]) -> bool: + try: + if isinstance(data, int): + self.counter += 1 + return True + elif isinstance(data, float): + self.counter += 1 + return True + elif isinstance(data, typing.List): + type_list = "yes" + for item in data: + self.counter += 1 + if not isinstance(item, int | float): + type_list = "no" + self.counter -= 1 + if type_list == "yes": + return True + else: + raise ValueError() + else: + raise ValueError() + except ValueError: + return False + + def ingest(self, data: int | float | typing.List[int | float]) -> None: + try: + if isinstance(data, typing.List): + for item in data: + if isinstance(item, int | float): + self.storage.append(item) + elif isinstance(data, int | float): + self.storage.append(data) + else: + raise ValueError() + except ValueError: + print("Improper numeric data", file=sys.stderr) + + +def is_number(s: str) -> bool: + try: + float(s) + return True + except (ValueError, TypeError): + return False + + +class TextProcessor(DataProcessor): + + def validate(self, data: str | typing.List[str]) -> bool: + try: + if isinstance(data, str): + if is_number(data): + raise ValueError() + else: + self.counter += 1 + return True + elif isinstance(data, typing.List): + for item in data: + if isinstance(item, str): + if is_number(item): + raise ValueError() + return False + else: + self.counter += 1 + if isinstance(item, dict): + raise ValueError() + return False + return True + else: + raise ValueError() + except ValueError: + return False + + def ingest(self, data: str | typing.List[str]) -> None: + try: + if isinstance(data, str): + self.storage.append(data) + elif isinstance(data, typing.List): + for item in data: + if is_number(item): + raise ValueError() + elif isinstance(item, dict): + raise ValueError() + else: + self.storage.append(item) + else: + raise ValueError() + except ValueError: + print("Improper text data", file=sys.stderr) + + +class LogProcessor(DataProcessor): + + def validate(self, data: dict | typing.List[dict]) -> bool: + try: + if isinstance(data, dict): + return True + if isinstance(data, typing.List): + for item in data: + if isinstance(item, dict): + self.counter += 1 + else: + raise ValueError() + return False + return True + else: + raise ValueError() + except ValueError: + return False + + def ingest(self, data: dict | typing.List[dict]) -> None: + try: + if isinstance(data, dict): + for item in data: + self.storage.append(item) + elif isinstance(data, typing.List): + for item in data: + if isinstance(item, dict): + self.storage.append(item) + else: + raise ValueError() + except ValueError: + print("Improper log data", file=sys.stderr) + + +class DataStream(): + def __init__(self) -> None: + self.processor: list = [] + self.counter: int = 0 + + def register_processor(self, proc: DataProcessor) -> None: + # method that allows you to register a new data processor to process + # the data stream. + self.processor.append(proc) + + def process_stream(self, stream: list[typing.Any]) -> None: + # method that will analyze each element of the list received as a + # parameter and send it to the appropriate registered data processor. + # Error messages will be printed if no data processor can handle an + # element + for item in stream: + try: + for processor in self.processor: + # route towards appropriate processors + if processor.validate(item): + processor.ingest(item) + # stop when appropriate processors is found + break + else: + raise ValueError() + except ValueError: + print(f"Data Stream Error - Can't process " + f"element in stream: {item}", file=sys.stderr) + + def print_processors_stats(self) -> None: + if self.processor == []: + print("No processor found, no data") + else: + for processor in self.processor: + remaining = len(processor.storage) + print(f"{processor.name}: total {processor.counter} items " + f"processed, remaining {remaining} on processor") + + def consume_processors(self, rm_1: int, rm_2: int, rm_3: int) -> None: + for processor in self.processor: + if processor.name == "Numeric Processor": + for _ in range(rm_1): + processor.output() + elif processor.name == "Text Processor": + for _ in range(rm_2): + processor.output() + elif processor.name == "Log Processor": + for _ in range(rm_3): + processor.output() + + +def main() -> None: + + # Create a test scenario that demonstrates the correct processing of a data + # stream. Display statistics on registered data processors, consume + # elements using the output method of each data processor and show updated + # statistics + + print("=== Code Nexus - Data Stream ===") + print() + print("Initialize Data Stream...") + print() + processors = DataStream() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + # initialize processors + is_num = NumericProcessor("Numeric Processor") + print("Registering Numeric Processor") + print() + processors.register_processor(is_num) + batch_one = ['Hello world', + [3.14, -1, 2.71], + [{ + 'log_level': 'WARNING', + ' log_message': 'Telnet access! Use ssh instead' + }, + { + 'log_level': 'INFO', + 'log_message': 'User wil is connected' + }], + 42, + ['Hi', 'five']] + print(f"Sending first batch of data on stream: {batch_one}") + processors.process_stream(batch_one) + print() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + print("Registering other data processors") + is_text = TextProcessor("Text Processor") + is_log = LogProcessor("Log Processor") + processors.register_processor(is_text) + processors.register_processor(is_log) + print() + print("Send the same batch again") + processors.process_stream(batch_one) + print() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + consume_num = 3 + consume_text = 2 + consume_log = 1 + print(f"Consume some elements from the data processors:" + f" Numeric {consume_num}, Text {consume_text}, Log {consume_log}") + processors.consume_processors(consume_num, consume_text, consume_log) + print("=== DataStream statistic ===") + processors.print_processors_stats() + + +if __name__ == "__main__": + main() |
