From 15115b4c52bfda0d1cca9fa1155beecbb873ec35 Mon Sep 17 00:00:00 2001 From: yctct Date: Sun, 7 Jun 2026 08:59:04 +0200 Subject: First commit, add all files --- py05/ex2/data_pipeline.py | 338 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 338 insertions(+) create mode 100755 py05/ex2/data_pipeline.py (limited to 'py05/ex2') diff --git a/py05/ex2/data_pipeline.py b/py05/ex2/data_pipeline.py new file mode 100755 index 0000000..a558ac6 --- /dev/null +++ b/py05/ex2/data_pipeline.py @@ -0,0 +1,338 @@ +#!/usr/bin/env python3 +# +# "Briefing: Your task is to integrate work from past exercices into a complete +# data processing pipeline that demonstrates understanding of polymorphic +# concepts. Use your code from Exercise 1 and improve it in order to obtain a +# complete data pipeline. Your DataStream class already handles input streams. +# You need now to handle the output part of the pipeline. This will be achieved +# by using a plugin system for export classes, made export-compatible through +# duck typing." +# +# https://typing.python.org/en/latest/spec/protocol.html + +from abc import ABC, abstractmethod +import typing +import sys + + +class ExportPlugin(typing.Protocol): + + def process_output(self, data: list[tuple[int, str]]) -> None: + # The type of the data parameter is a list of tuples that matches the + # return value of the output method from the DataProcessor class. + # output CSV or JSON + # if list output as CSV + ... + + +class ExportCSV(ExportPlugin): + + def process_output(self, data: list[tuple[int, str]]) -> None: + i = 0 + print("CSV Output:") + for item in data: + print(data[i][1], end='') + if item is not data[-1]: + print(',', end='') + else: + print() + i += 1 + + +class ExportJSON(ExportPlugin): + + def process_output(self, data: list[tuple[int, str]]) -> None: + i = 0 + print("JSON Output:") + print("{", end='') + for item in data: + print(f"\"item_{data[i][0]}\": \"{data[i][1]}\"", end='') + if item is not data[-1]: + print(', ', end='') + else: + print(end='') + i += 1 + print("}") + + +class DataProcessor(ABC): + def __init__(self, name: str) -> None: + self.storage: typing.List[typing.Any] = [] + self.rank: int = 0 + self.counter: int = 0 + self.name = name + + @abstractmethod + def validate(self, data: typing.Any) -> bool: + pass + + @abstractmethod + def ingest(self, data: typing.Any) -> None: + pass + + def output(self) -> tuple[int, str]: + # extract the oldest piece of data + # piece of data is removed + position = self.rank + self.rank += 1 + if isinstance(self.storage[0], dict): + entry = self.storage[0] + output_tuple: tuple = tuple(entry.values()) + to_return = (f"{output_tuple[0]}: {output_tuple[1]}") + self.storage.pop(0) + return position, to_return + to_return = self.storage[0] + self.storage.pop(0) + return position, to_return + + +class NumericProcessor(DataProcessor): + + def validate(self, data: int | float | typing.List[int | float]) -> bool: + try: + if isinstance(data, int): + self.counter += 1 + return True + elif isinstance(data, float): + self.counter += 1 + return True + elif isinstance(data, typing.List): + type_list = "yes" + for item in data: + self.counter += 1 + if not isinstance(item, int | float): + type_list = "no" + self.counter -= 1 + if type_list == "yes": + return True + else: + raise ValueError() + else: + raise ValueError() + except ValueError: + return False + + def ingest(self, data: int | float | typing.List[int | float]) -> None: + try: + if isinstance(data, typing.List): + for item in data: + if isinstance(item, int | float): + self.storage.append(item) + elif isinstance(data, int | float): + self.storage.append(data) + else: + raise ValueError() + except ValueError: + print("Improper numeric data", file=sys.stderr) + + +def is_number(s: str) -> bool: + try: + float(s) + return True + except (ValueError, TypeError): + return False + + +class TextProcessor(DataProcessor): + + def validate(self, data: str | typing.List[str]) -> bool: + try: + if isinstance(data, str): + if is_number(data): + raise ValueError() + else: + self.counter += 1 + return True + elif isinstance(data, typing.List): + for item in data: + if isinstance(item, str): + if is_number(item): + raise ValueError() + return False + else: + self.counter += 1 + if isinstance(item, dict): + raise ValueError() + return False + return True + else: + raise ValueError() + except ValueError: + return False + + def ingest(self, data: str | typing.List[str]) -> None: + try: + if isinstance(data, str): + self.storage.append(data) + elif isinstance(data, typing.List): + for item in data: + if is_number(item): + raise ValueError() + elif isinstance(item, dict): + raise ValueError() + else: + self.storage.append(item) + else: + raise ValueError() + except ValueError: + print("Improper text data", file=sys.stderr) + + +class LogProcessor(DataProcessor): + + def validate(self, data: dict | typing.List[dict]) -> bool: + try: + if isinstance(data, dict): + return True + if isinstance(data, typing.List): + for item in data: + if isinstance(item, dict): + self.counter += 1 + else: + raise ValueError() + return False + return True + else: + raise ValueError() + except ValueError: + return False + + def ingest(self, data: dict | typing.List[dict]) -> None: + try: + if isinstance(data, dict): + for item in data: + self.storage.append(item) + elif isinstance(data, typing.List): + for item in data: + if isinstance(item, dict): + self.storage.append(item) + else: + raise ValueError() + except ValueError: + print("Improper log data", file=sys.stderr) + + +class DataStream(): + def __init__(self) -> None: + self.processor: list = [] + self.counter: int = 0 + + def register_processor(self, proc: DataProcessor) -> None: + # method that allows you to register a new data processor to process + # the data stream. + self.processor.append(proc) + + def process_stream(self, stream: list[typing.Any]) -> None: + # method that will analyze each element of the list received as a + # parameter and send it to the appropriate registered data processor. + # Error messages will be printed if no data processor can handle an + # element + for item in stream: + try: + for processor in self.processor: + # route towards appropriate processors + if processor.validate(item): + processor.ingest(item) + # stop when appropriate processors is found + break + else: + raise ValueError() + except ValueError: + print(f"Data Stream Error - Can't process " + f"element in stream: {item}", file=sys.stderr) + + def print_processors_stats(self) -> None: + if self.processor == []: + print("No processor found, no data") + else: + for processor in self.processor: + remaining = len(processor.storage) + print(f"{processor.name}: total {processor.counter} items " + f"processed, remaining {remaining} on processor") + + def output_pipeline(self, nb: int, plugin: ExportPlugin) -> None: + # method, to be used after calling process_stream, that will consume nb + # elements from all registered data processors and export them using + # the provided compatible plugin. + for processor in self.processor: + tmp = [] + for _ in range(nb): + # to make check whether there is something to output + if processor.storage: + tmp.append(processor.output()) + plugin.process_output(tmp) + + +def main() -> None: + + # Create a test scenario that demonstrates the correct processing of a data + # stream. Display statistics on registered data processors, consume + # elements using the output method of each data processor and show updated + # statistics + + print("=== Code Nexus - Data Pipeline ===") + print() + print("Initialize Data Stream...") + print() + processors = DataStream() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + # initialize processors + is_num = NumericProcessor("Numeric Processor") + is_text = TextProcessor("Text Processor") + is_log = LogProcessor("Log Processor") + print("Registering Processors") + print() + processors.register_processor(is_num) + processors.register_processor(is_text) + processors.register_processor(is_log) + batch_one = ['Hello world', + [3.14, -1, 2.71], + [{ + 'log_level': 'WARNING', + ' log_message': 'Telnet access! Use ssh instead' + }, + { + 'log_level': 'INFO', + 'log_message': 'User wil is connected' + }], + 42, + ['Hi', 'five']] + print(f"Sending first batch of data on stream: {batch_one}") + processors.process_stream(batch_one) + print() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + print("Send 3 processed data from each processor to a CSV plugin:") + PluginCSV = ExportCSV() + processors.output_pipeline(3, PluginCSV) + print() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + batch_two = [21, + ['I love AI', 'LLMs are wonderful', 'Stay healthy'], + [{'log_level': ' ERROR', 'log_message': '500 server crash'}, + {'log_level': 'NOTICE', + 'log_message': 'Certificate expires in 10 days'}], + [32, 42, 64, 84, 128, 168], + 'World hello'] + print(f"Send another batch of data : {batch_two}") + processors.process_stream(batch_two) + print() + print("=== DataStream statistic ===") + processors.print_processors_stats() + print() + print("Send 5 processed data from each processor to a JSON plugin:") + PluginJSON = ExportJSON() + processors.output_pipeline(5, PluginJSON) + print() + print("=== DataStream statistic ===") + processors.print_processors_stats() + + +if __name__ == "__main__": + main() -- cgit v1.2.3