summaryrefslogtreecommitdiff
path: root/py05/ex1
diff options
context:
space:
mode:
authoryctct <yctct@yctct.com>2026-06-07 08:59:04 +0200
committeryctct <yctct@yctct.com>2026-06-07 08:59:04 +0200
commit15115b4c52bfda0d1cca9fa1155beecbb873ec35 (patch)
treeb3f0975e63eb04dcba732a78ce9bd9abda8acf01 /py05/ex1
First commit, add all files
Diffstat (limited to 'py05/ex1')
-rwxr-xr-xpy05/ex1/data_stream.py278
1 files changed, 278 insertions, 0 deletions
diff --git a/py05/ex1/data_stream.py b/py05/ex1/data_stream.py
new file mode 100755
index 0000000..b457a24
--- /dev/null
+++ b/py05/ex1/data_stream.py
@@ -0,0 +1,278 @@
+#!/usr/bin/env python3
+
+from abc import ABC, abstractmethod
+import typing
+import sys
+
+
+class DataProcessor(ABC):
+ def __init__(self, name: str) -> None:
+ self.storage: typing.List[typing.Any] = []
+ self.rank: int = 0
+ self.counter: int = 0
+ self.name = name
+
+ @abstractmethod
+ def validate(self, data: typing.Any) -> bool:
+ pass
+
+ @abstractmethod
+ def ingest(self, data: typing.Any) -> None:
+ pass
+
+ def output(self) -> tuple[int, str]:
+ # extract the oldest piece of data
+ # piece of data is removed
+ position = self.rank
+ self.rank += 1
+ if isinstance(self.storage[0], dict):
+ entry = self.storage[0]
+ output_tuple: tuple = tuple(entry.values())
+ to_return = (f"{output_tuple[0]}: {output_tuple[1]}")
+ self.storage.pop(0)
+ return position, to_return
+ to_return = self.storage[0]
+ self.storage.pop(0)
+ return position, to_return
+
+
+class NumericProcessor(DataProcessor):
+
+ def validate(self, data: int | float | typing.List[int | float]) -> bool:
+ try:
+ if isinstance(data, int):
+ self.counter += 1
+ return True
+ elif isinstance(data, float):
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ type_list = "yes"
+ for item in data:
+ self.counter += 1
+ if not isinstance(item, int | float):
+ type_list = "no"
+ self.counter -= 1
+ if type_list == "yes":
+ return True
+ else:
+ raise ValueError()
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: int | float | typing.List[int | float]) -> None:
+ try:
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, int | float):
+ self.storage.append(item)
+ elif isinstance(data, int | float):
+ self.storage.append(data)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper numeric data", file=sys.stderr)
+
+
+def is_number(s: str) -> bool:
+ try:
+ float(s)
+ return True
+ except (ValueError, TypeError):
+ return False
+
+
+class TextProcessor(DataProcessor):
+
+ def validate(self, data: str | typing.List[str]) -> bool:
+ try:
+ if isinstance(data, str):
+ if is_number(data):
+ raise ValueError()
+ else:
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, str):
+ if is_number(item):
+ raise ValueError()
+ return False
+ else:
+ self.counter += 1
+ if isinstance(item, dict):
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: str | typing.List[str]) -> None:
+ try:
+ if isinstance(data, str):
+ self.storage.append(data)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if is_number(item):
+ raise ValueError()
+ elif isinstance(item, dict):
+ raise ValueError()
+ else:
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper text data", file=sys.stderr)
+
+
+class LogProcessor(DataProcessor):
+
+ def validate(self, data: dict | typing.List[dict]) -> bool:
+ try:
+ if isinstance(data, dict):
+ return True
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.counter += 1
+ else:
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: dict | typing.List[dict]) -> None:
+ try:
+ if isinstance(data, dict):
+ for item in data:
+ self.storage.append(item)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper log data", file=sys.stderr)
+
+
+class DataStream():
+ def __init__(self) -> None:
+ self.processor: list = []
+ self.counter: int = 0
+
+ def register_processor(self, proc: DataProcessor) -> None:
+ # method that allows you to register a new data processor to process
+ # the data stream.
+ self.processor.append(proc)
+
+ def process_stream(self, stream: list[typing.Any]) -> None:
+ # method that will analyze each element of the list received as a
+ # parameter and send it to the appropriate registered data processor.
+ # Error messages will be printed if no data processor can handle an
+ # element
+ for item in stream:
+ try:
+ for processor in self.processor:
+ # route towards appropriate processors
+ if processor.validate(item):
+ processor.ingest(item)
+ # stop when appropriate processors is found
+ break
+ else:
+ raise ValueError()
+ except ValueError:
+ print(f"Data Stream Error - Can't process "
+ f"element in stream: {item}", file=sys.stderr)
+
+ def print_processors_stats(self) -> None:
+ if self.processor == []:
+ print("No processor found, no data")
+ else:
+ for processor in self.processor:
+ remaining = len(processor.storage)
+ print(f"{processor.name}: total {processor.counter} items "
+ f"processed, remaining {remaining} on processor")
+
+ def consume_processors(self, rm_1: int, rm_2: int, rm_3: int) -> None:
+ for processor in self.processor:
+ if processor.name == "Numeric Processor":
+ for _ in range(rm_1):
+ processor.output()
+ elif processor.name == "Text Processor":
+ for _ in range(rm_2):
+ processor.output()
+ elif processor.name == "Log Processor":
+ for _ in range(rm_3):
+ processor.output()
+
+
+def main() -> None:
+
+ # Create a test scenario that demonstrates the correct processing of a data
+ # stream. Display statistics on registered data processors, consume
+ # elements using the output method of each data processor and show updated
+ # statistics
+
+ print("=== Code Nexus - Data Stream ===")
+ print()
+ print("Initialize Data Stream...")
+ print()
+ processors = DataStream()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ # initialize processors
+ is_num = NumericProcessor("Numeric Processor")
+ print("Registering Numeric Processor")
+ print()
+ processors.register_processor(is_num)
+ batch_one = ['Hello world',
+ [3.14, -1, 2.71],
+ [{
+ 'log_level': 'WARNING',
+ ' log_message': 'Telnet access! Use ssh instead'
+ },
+ {
+ 'log_level': 'INFO',
+ 'log_message': 'User wil is connected'
+ }],
+ 42,
+ ['Hi', 'five']]
+ print(f"Sending first batch of data on stream: {batch_one}")
+ processors.process_stream(batch_one)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ print("Registering other data processors")
+ is_text = TextProcessor("Text Processor")
+ is_log = LogProcessor("Log Processor")
+ processors.register_processor(is_text)
+ processors.register_processor(is_log)
+ print()
+ print("Send the same batch again")
+ processors.process_stream(batch_one)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ consume_num = 3
+ consume_text = 2
+ consume_log = 1
+ print(f"Consume some elements from the data processors:"
+ f" Numeric {consume_num}, Text {consume_text}, Log {consume_log}")
+ processors.consume_processors(consume_num, consume_text, consume_log)
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+
+
+if __name__ == "__main__":
+ main()