#!/usr/bin/env python3 # # "Briefing: Your task is to integrate work from past exercices into a complete # data processing pipeline that demonstrates understanding of polymorphic # concepts. Use your code from Exercise 1 and improve it in order to obtain a # complete data pipeline. Your DataStream class already handles input streams. # You need now to handle the output part of the pipeline. This will be achieved # by using a plugin system for export classes, made export-compatible through # duck typing." # # https://typing.python.org/en/latest/spec/protocol.html from abc import ABC, abstractmethod import typing import sys class ExportPlugin(typing.Protocol): def process_output(self, data: list[tuple[int, str]]) -> None: # The type of the data parameter is a list of tuples that matches the # return value of the output method from the DataProcessor class. # output CSV or JSON # if list output as CSV ... class ExportCSV(ExportPlugin): def process_output(self, data: list[tuple[int, str]]) -> None: i = 0 print("CSV Output:") for item in data: print(data[i][1], end='') if item is not data[-1]: print(',', end='') else: print() i += 1 class ExportJSON(ExportPlugin): def process_output(self, data: list[tuple[int, str]]) -> None: i = 0 print("JSON Output:") print("{", end='') for item in data: print(f"\"item_{data[i][0]}\": \"{data[i][1]}\"", end='') if item is not data[-1]: print(', ', end='') else: print(end='') i += 1 print("}") class DataProcessor(ABC): def __init__(self, name: str) -> None: self.storage: typing.List[typing.Any] = [] self.rank: int = 0 self.counter: int = 0 self.name = name @abstractmethod def validate(self, data: typing.Any) -> bool: pass @abstractmethod def ingest(self, data: typing.Any) -> None: pass def output(self) -> tuple[int, str]: # extract the oldest piece of data # piece of data is removed position = self.rank self.rank += 1 if isinstance(self.storage[0], dict): entry = self.storage[0] output_tuple: tuple = tuple(entry.values()) to_return = (f"{output_tuple[0]}: {output_tuple[1]}") self.storage.pop(0) return position, to_return to_return = self.storage[0] self.storage.pop(0) return position, to_return class NumericProcessor(DataProcessor): def validate(self, data: int | float | typing.List[int | float]) -> bool: try: if isinstance(data, int): self.counter += 1 return True elif isinstance(data, float): self.counter += 1 return True elif isinstance(data, typing.List): type_list = "yes" for item in data: self.counter += 1 if not isinstance(item, int | float): type_list = "no" self.counter -= 1 if type_list == "yes": return True else: raise ValueError() else: raise ValueError() except ValueError: return False def ingest(self, data: int | float | typing.List[int | float]) -> None: try: if isinstance(data, typing.List): for item in data: if isinstance(item, int | float): self.storage.append(item) elif isinstance(data, int | float): self.storage.append(data) else: raise ValueError() except ValueError: print("Improper numeric data", file=sys.stderr) def is_number(s: str) -> bool: try: float(s) return True except (ValueError, TypeError): return False class TextProcessor(DataProcessor): def validate(self, data: str | typing.List[str]) -> bool: try: if isinstance(data, str): if is_number(data): raise ValueError() else: self.counter += 1 return True elif isinstance(data, typing.List): for item in data: if isinstance(item, str): if is_number(item): raise ValueError() return False else: self.counter += 1 if isinstance(item, dict): raise ValueError() return False return True else: raise ValueError() except ValueError: return False def ingest(self, data: str | typing.List[str]) -> None: try: if isinstance(data, str): self.storage.append(data) elif isinstance(data, typing.List): for item in data: if is_number(item): raise ValueError() elif isinstance(item, dict): raise ValueError() else: self.storage.append(item) else: raise ValueError() except ValueError: print("Improper text data", file=sys.stderr) class LogProcessor(DataProcessor): def validate(self, data: dict | typing.List[dict]) -> bool: try: if isinstance(data, dict): return True if isinstance(data, typing.List): for item in data: if isinstance(item, dict): self.counter += 1 else: raise ValueError() return False return True else: raise ValueError() except ValueError: return False def ingest(self, data: dict | typing.List[dict]) -> None: try: if isinstance(data, dict): for item in data: self.storage.append(item) elif isinstance(data, typing.List): for item in data: if isinstance(item, dict): self.storage.append(item) else: raise ValueError() except ValueError: print("Improper log data", file=sys.stderr) class DataStream(): def __init__(self) -> None: self.processor: list = [] self.counter: int = 0 def register_processor(self, proc: DataProcessor) -> None: # method that allows you to register a new data processor to process # the data stream. self.processor.append(proc) def process_stream(self, stream: list[typing.Any]) -> None: # method that will analyze each element of the list received as a # parameter and send it to the appropriate registered data processor. # Error messages will be printed if no data processor can handle an # element for item in stream: try: for processor in self.processor: # route towards appropriate processors if processor.validate(item): processor.ingest(item) # stop when appropriate processors is found break else: raise ValueError() except ValueError: print(f"Data Stream Error - Can't process " f"element in stream: {item}", file=sys.stderr) def print_processors_stats(self) -> None: if self.processor == []: print("No processor found, no data") else: for processor in self.processor: remaining = len(processor.storage) print(f"{processor.name}: total {processor.counter} items " f"processed, remaining {remaining} on processor") def output_pipeline(self, nb: int, plugin: ExportPlugin) -> None: # method, to be used after calling process_stream, that will consume nb # elements from all registered data processors and export them using # the provided compatible plugin. for processor in self.processor: tmp = [] for _ in range(nb): # to make check whether there is something to output if processor.storage: tmp.append(processor.output()) plugin.process_output(tmp) def main() -> None: # Create a test scenario that demonstrates the correct processing of a data # stream. Display statistics on registered data processors, consume # elements using the output method of each data processor and show updated # statistics print("=== Code Nexus - Data Pipeline ===") print() print("Initialize Data Stream...") print() processors = DataStream() print("=== DataStream statistic ===") processors.print_processors_stats() print() # initialize processors is_num = NumericProcessor("Numeric Processor") is_text = TextProcessor("Text Processor") is_log = LogProcessor("Log Processor") print("Registering Processors") print() processors.register_processor(is_num) processors.register_processor(is_text) processors.register_processor(is_log) batch_one = ['Hello world', [3.14, -1, 2.71], [{ 'log_level': 'WARNING', ' log_message': 'Telnet access! Use ssh instead' }, { 'log_level': 'INFO', 'log_message': 'User wil is connected' }], 42, ['Hi', 'five']] print(f"Sending first batch of data on stream: {batch_one}") processors.process_stream(batch_one) print() print("=== DataStream statistic ===") processors.print_processors_stats() print() print("Send 3 processed data from each processor to a CSV plugin:") PluginCSV = ExportCSV() processors.output_pipeline(3, PluginCSV) print() print("=== DataStream statistic ===") processors.print_processors_stats() print() batch_two = [21, ['I love AI', 'LLMs are wonderful', 'Stay healthy'], [{'log_level': ' ERROR', 'log_message': '500 server crash'}, {'log_level': 'NOTICE', 'log_message': 'Certificate expires in 10 days'}], [32, 42, 64, 84, 128, 168], 'World hello'] print(f"Send another batch of data : {batch_two}") processors.process_stream(batch_two) print() print("=== DataStream statistic ===") processors.print_processors_stats() print() print("Send 5 processed data from each processor to a JSON plugin:") PluginJSON = ExportJSON() processors.output_pipeline(5, PluginJSON) print() print("=== DataStream statistic ===") processors.print_processors_stats() if __name__ == "__main__": main()