summaryrefslogtreecommitdiff
path: root/py05/ex2/data_pipeline.py
diff options
context:
space:
mode:
Diffstat (limited to 'py05/ex2/data_pipeline.py')
-rwxr-xr-xpy05/ex2/data_pipeline.py338
1 files changed, 338 insertions, 0 deletions
diff --git a/py05/ex2/data_pipeline.py b/py05/ex2/data_pipeline.py
new file mode 100755
index 0000000..a558ac6
--- /dev/null
+++ b/py05/ex2/data_pipeline.py
@@ -0,0 +1,338 @@
+#!/usr/bin/env python3
+#
+# "Briefing: Your task is to integrate work from past exercices into a complete
+# data processing pipeline that demonstrates understanding of polymorphic
+# concepts. Use your code from Exercise 1 and improve it in order to obtain a
+# complete data pipeline. Your DataStream class already handles input streams.
+# You need now to handle the output part of the pipeline. This will be achieved
+# by using a plugin system for export classes, made export-compatible through
+# duck typing."
+#
+# https://typing.python.org/en/latest/spec/protocol.html
+
+from abc import ABC, abstractmethod
+import typing
+import sys
+
+
+class ExportPlugin(typing.Protocol):
+
+ def process_output(self, data: list[tuple[int, str]]) -> None:
+ # The type of the data parameter is a list of tuples that matches the
+ # return value of the output method from the DataProcessor class.
+ # output CSV or JSON
+ # if list output as CSV
+ ...
+
+
+class ExportCSV(ExportPlugin):
+
+ def process_output(self, data: list[tuple[int, str]]) -> None:
+ i = 0
+ print("CSV Output:")
+ for item in data:
+ print(data[i][1], end='')
+ if item is not data[-1]:
+ print(',', end='')
+ else:
+ print()
+ i += 1
+
+
+class ExportJSON(ExportPlugin):
+
+ def process_output(self, data: list[tuple[int, str]]) -> None:
+ i = 0
+ print("JSON Output:")
+ print("{", end='')
+ for item in data:
+ print(f"\"item_{data[i][0]}\": \"{data[i][1]}\"", end='')
+ if item is not data[-1]:
+ print(', ', end='')
+ else:
+ print(end='')
+ i += 1
+ print("}")
+
+
+class DataProcessor(ABC):
+ def __init__(self, name: str) -> None:
+ self.storage: typing.List[typing.Any] = []
+ self.rank: int = 0
+ self.counter: int = 0
+ self.name = name
+
+ @abstractmethod
+ def validate(self, data: typing.Any) -> bool:
+ pass
+
+ @abstractmethod
+ def ingest(self, data: typing.Any) -> None:
+ pass
+
+ def output(self) -> tuple[int, str]:
+ # extract the oldest piece of data
+ # piece of data is removed
+ position = self.rank
+ self.rank += 1
+ if isinstance(self.storage[0], dict):
+ entry = self.storage[0]
+ output_tuple: tuple = tuple(entry.values())
+ to_return = (f"{output_tuple[0]}: {output_tuple[1]}")
+ self.storage.pop(0)
+ return position, to_return
+ to_return = self.storage[0]
+ self.storage.pop(0)
+ return position, to_return
+
+
+class NumericProcessor(DataProcessor):
+
+ def validate(self, data: int | float | typing.List[int | float]) -> bool:
+ try:
+ if isinstance(data, int):
+ self.counter += 1
+ return True
+ elif isinstance(data, float):
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ type_list = "yes"
+ for item in data:
+ self.counter += 1
+ if not isinstance(item, int | float):
+ type_list = "no"
+ self.counter -= 1
+ if type_list == "yes":
+ return True
+ else:
+ raise ValueError()
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: int | float | typing.List[int | float]) -> None:
+ try:
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, int | float):
+ self.storage.append(item)
+ elif isinstance(data, int | float):
+ self.storage.append(data)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper numeric data", file=sys.stderr)
+
+
+def is_number(s: str) -> bool:
+ try:
+ float(s)
+ return True
+ except (ValueError, TypeError):
+ return False
+
+
+class TextProcessor(DataProcessor):
+
+ def validate(self, data: str | typing.List[str]) -> bool:
+ try:
+ if isinstance(data, str):
+ if is_number(data):
+ raise ValueError()
+ else:
+ self.counter += 1
+ return True
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, str):
+ if is_number(item):
+ raise ValueError()
+ return False
+ else:
+ self.counter += 1
+ if isinstance(item, dict):
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: str | typing.List[str]) -> None:
+ try:
+ if isinstance(data, str):
+ self.storage.append(data)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if is_number(item):
+ raise ValueError()
+ elif isinstance(item, dict):
+ raise ValueError()
+ else:
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper text data", file=sys.stderr)
+
+
+class LogProcessor(DataProcessor):
+
+ def validate(self, data: dict | typing.List[dict]) -> bool:
+ try:
+ if isinstance(data, dict):
+ return True
+ if isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.counter += 1
+ else:
+ raise ValueError()
+ return False
+ return True
+ else:
+ raise ValueError()
+ except ValueError:
+ return False
+
+ def ingest(self, data: dict | typing.List[dict]) -> None:
+ try:
+ if isinstance(data, dict):
+ for item in data:
+ self.storage.append(item)
+ elif isinstance(data, typing.List):
+ for item in data:
+ if isinstance(item, dict):
+ self.storage.append(item)
+ else:
+ raise ValueError()
+ except ValueError:
+ print("Improper log data", file=sys.stderr)
+
+
+class DataStream():
+ def __init__(self) -> None:
+ self.processor: list = []
+ self.counter: int = 0
+
+ def register_processor(self, proc: DataProcessor) -> None:
+ # method that allows you to register a new data processor to process
+ # the data stream.
+ self.processor.append(proc)
+
+ def process_stream(self, stream: list[typing.Any]) -> None:
+ # method that will analyze each element of the list received as a
+ # parameter and send it to the appropriate registered data processor.
+ # Error messages will be printed if no data processor can handle an
+ # element
+ for item in stream:
+ try:
+ for processor in self.processor:
+ # route towards appropriate processors
+ if processor.validate(item):
+ processor.ingest(item)
+ # stop when appropriate processors is found
+ break
+ else:
+ raise ValueError()
+ except ValueError:
+ print(f"Data Stream Error - Can't process "
+ f"element in stream: {item}", file=sys.stderr)
+
+ def print_processors_stats(self) -> None:
+ if self.processor == []:
+ print("No processor found, no data")
+ else:
+ for processor in self.processor:
+ remaining = len(processor.storage)
+ print(f"{processor.name}: total {processor.counter} items "
+ f"processed, remaining {remaining} on processor")
+
+ def output_pipeline(self, nb: int, plugin: ExportPlugin) -> None:
+ # method, to be used after calling process_stream, that will consume nb
+ # elements from all registered data processors and export them using
+ # the provided compatible plugin.
+ for processor in self.processor:
+ tmp = []
+ for _ in range(nb):
+ # to make check whether there is something to output
+ if processor.storage:
+ tmp.append(processor.output())
+ plugin.process_output(tmp)
+
+
+def main() -> None:
+
+ # Create a test scenario that demonstrates the correct processing of a data
+ # stream. Display statistics on registered data processors, consume
+ # elements using the output method of each data processor and show updated
+ # statistics
+
+ print("=== Code Nexus - Data Pipeline ===")
+ print()
+ print("Initialize Data Stream...")
+ print()
+ processors = DataStream()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ # initialize processors
+ is_num = NumericProcessor("Numeric Processor")
+ is_text = TextProcessor("Text Processor")
+ is_log = LogProcessor("Log Processor")
+ print("Registering Processors")
+ print()
+ processors.register_processor(is_num)
+ processors.register_processor(is_text)
+ processors.register_processor(is_log)
+ batch_one = ['Hello world',
+ [3.14, -1, 2.71],
+ [{
+ 'log_level': 'WARNING',
+ ' log_message': 'Telnet access! Use ssh instead'
+ },
+ {
+ 'log_level': 'INFO',
+ 'log_message': 'User wil is connected'
+ }],
+ 42,
+ ['Hi', 'five']]
+ print(f"Sending first batch of data on stream: {batch_one}")
+ processors.process_stream(batch_one)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ print("Send 3 processed data from each processor to a CSV plugin:")
+ PluginCSV = ExportCSV()
+ processors.output_pipeline(3, PluginCSV)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ batch_two = [21,
+ ['I love AI', 'LLMs are wonderful', 'Stay healthy'],
+ [{'log_level': ' ERROR', 'log_message': '500 server crash'},
+ {'log_level': 'NOTICE',
+ 'log_message': 'Certificate expires in 10 days'}],
+ [32, 42, 64, 84, 128, 168],
+ 'World hello']
+ print(f"Send another batch of data : {batch_two}")
+ processors.process_stream(batch_two)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+ print()
+ print("Send 5 processed data from each processor to a JSON plugin:")
+ PluginJSON = ExportJSON()
+ processors.output_pipeline(5, PluginJSON)
+ print()
+ print("=== DataStream statistic ===")
+ processors.print_processors_stats()
+
+
+if __name__ == "__main__":
+ main()