-
Notifications
You must be signed in to change notification settings - Fork 63
unittest_load #199
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
unittest_load #199
Changes from 1 commit
762918b
bd86c90
0b52950
9e5b05b
c3dd666
181ceab
9305c91
4b7b7f1
bbc1c79
322b373
28ac31b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import unittest | ||
from unittest.mock import patch | ||
|
||
from uniflow.node import Node | ||
from uniflow.op.extract.load.html_op import ExtractHTMLOp, ProcessHTMLOp | ||
|
||
|
||
class TestExtractHTMLOp(unittest.TestCase): | ||
def test_extract_html_op_with_url(self): | ||
extract_op = ExtractHTMLOp("test_extract") | ||
node = Node("test_node", {"url": "http://testsite.com"}) | ||
with patch( | ||
"uniflow.op.extract.load.html_op.read_file", | ||
return_value="<html><body><p>Hello World</p></body></html>", | ||
): | ||
output = extract_op([node]) | ||
self.assertEqual(len(output), 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shall we add a new line here?
|
||
self.assertEqual(output[0].value_dict["text"], "Hello World") | ||
|
||
def test_extract_html_op_with_filename(self): | ||
extract_op = ExtractHTMLOp("test_extract") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. same above |
||
node = Node("test_node", {"filename": "testfile.html"}) | ||
with patch( | ||
"uniflow.op.extract.load.html_op.read_file", | ||
return_value="<html><body><p>Hello File</p></body></html>", | ||
): | ||
output = extract_op([node]) | ||
self.assertEqual(len(output), 1) | ||
self.assertEqual(output[0].value_dict["text"], "Hello File") | ||
|
||
def test_extract_html_op_with_no_url_or_filename(self): | ||
extract_op = ExtractHTMLOp("test_extract") | ||
node = Node("test_node", {}) | ||
with self.assertRaises(ValueError): | ||
extract_op([node]) | ||
|
||
def test_extract_html_op_with_container(self): | ||
extract_op = ExtractHTMLOp("test_extract") | ||
node = Node("test_node", {"url": "http://testsite.com"}) | ||
html_content = "<html><body><div>Hello, <span>World!</span></div></body></html>" | ||
with patch( | ||
"uniflow.op.extract.load.html_op.read_file", return_value=html_content | ||
): | ||
output = extract_op([node]) | ||
self.assertEqual(len(output), 1) | ||
self.assertEqual(output[0].value_dict["text"], "Hello, World!") | ||
|
||
def test_extract_html_op_with_empty_container(self): | ||
extract_op = ExtractHTMLOp("test_extract") | ||
node = Node("test_node", {"url": "http://testsite.com"}) | ||
|
||
html_content = "<html><body><div><span></span></div></body></html>" | ||
with patch( | ||
"uniflow.op.extract.load.html_op.read_file", return_value=html_content | ||
): | ||
output = extract_op([node]) | ||
self.assertEqual(len(output), 1) | ||
self.assertEqual(output[0].value_dict["text"], "") | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: it might be worth adding two more unit tests to test with more than one node and no node. https://medium.com/@samarthgvasist/parameterized-unit-testing-in-python-9be82fa7e17f |
||
|
||
class TestProcessHTMLOp(unittest.TestCase): | ||
def test_process_html_op(self): | ||
process_op = ProcessHTMLOp("test_process") | ||
node = Node("test_node", {"text": "\n Hello World \n"}) | ||
output = process_op([node]) | ||
self.assertEqual(len(output), 1) | ||
self.assertEqual(output[0].value_dict["text"], "Hello World") | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,98 @@ | ||
import unittest | ||
from unittest.mock import Mock | ||
|
||
from uniflow.node import Node | ||
from uniflow.op.extract.load.image_op import ExtractImageOp, ProcessImageOp | ||
from uniflow.op.model.abs_model import AbsModel | ||
|
||
|
||
class TestExtractImageOp(unittest.TestCase): | ||
|
||
def test_extract_image_with_empty_sequence(self): | ||
model = Mock(spec=AbsModel) | ||
op = ExtractImageOp("test_op", model) | ||
self.assertEqual(op([]), []) | ||
|
||
def test_extract_image_with_single_node(self): | ||
model = Mock(spec=AbsModel) | ||
model.run = Mock(return_value={"response": ["Extracted text"]}) | ||
|
||
node = Node("input_node", {"data": "image_data"}) | ||
op = ExtractImageOp("test_op", model) | ||
result_nodes = op([node]) | ||
|
||
self.assertEqual(len(result_nodes), 1) | ||
self.assertEqual(result_nodes[0].value_dict["text"], "Extracted text") | ||
self.assertIn(node, result_nodes[0].prev_nodes) | ||
|
||
def test_extract_image_with_multiple_nodes(self): | ||
model = Mock(spec=AbsModel) | ||
model.run = Mock(return_value={"response": ["Extracted text"]}) | ||
|
||
nodes = [Node(f"input_node_{i}", {"data": "image_data"}) for i in range(3)] | ||
op = ExtractImageOp("test_op", model) | ||
result_nodes = op(nodes) | ||
|
||
self.assertEqual(len(result_nodes), 3) | ||
for i, result_node in enumerate(result_nodes): | ||
self.assertEqual(result_node.value_dict["text"], "Extracted text") | ||
self.assertIn(nodes[i], result_node.prev_nodes) | ||
|
||
|
||
class TestProcessImageOp(unittest.TestCase): | ||
|
||
def test_process_image_with_empty_sequence(self): | ||
op = ProcessImageOp("test_op") | ||
self.assertEqual(op([]), []) | ||
|
||
def test_process_image_with_single_node(self): | ||
node = Node("input_node", {"text": "Hello\n\n\nWorld"}) | ||
op = ProcessImageOp("test_op") | ||
result_nodes = op([node]) | ||
|
||
self.assertEqual(len(result_nodes), 1) | ||
self.assertEqual(result_nodes[0].value_dict["text"], "Hello\n\nWorld") | ||
self.assertIn(node, result_nodes[0].prev_nodes) | ||
|
||
def test_process_image_with_multiple_nodes(self): | ||
nodes = [ | ||
Node(f"input_node_{i}", {"text": f"Text with\n\n\n\n{i} newlines\n\n\n"}) | ||
for i in range(3) | ||
] | ||
op = ProcessImageOp("test_op") | ||
result_nodes = op(nodes) | ||
|
||
self.assertEqual(len(result_nodes), 3) | ||
for i, result_node in enumerate(result_nodes): | ||
self.assertEqual( | ||
result_node.value_dict["text"], f"Text with\n\n{i} newlines" | ||
) | ||
self.assertIn(nodes[i], result_node.prev_nodes) | ||
|
||
def test_process_image_with_leading_and_trailing_whitespace(self): | ||
node = Node("input_node", {"text": "\n\n\n Hello World \n\n\n\n"}) | ||
op = ProcessImageOp("test_op") | ||
result_nodes = op([node]) | ||
|
||
self.assertEqual(result_nodes[0].value_dict["text"], "Hello World") | ||
self.assertIn(node, result_nodes[0].prev_nodes) | ||
|
||
def test_process_image_without_extra_newlines(self): | ||
node = Node("input_node", {"text": "Hello\nWorld"}) | ||
op = ProcessImageOp("test_op") | ||
result_nodes = op([node]) | ||
|
||
self.assertEqual(result_nodes[0].value_dict["text"], "Hello\nWorld") | ||
self.assertIn(node, result_nodes[0].prev_nodes) | ||
|
||
def test_process_image_with_only_whitespace(self): | ||
node = Node("input_node", {"text": " \n\n\n \n \n "}) | ||
op = ProcessImageOp("test_op") | ||
result_nodes = op([node]) | ||
|
||
self.assertEqual(result_nodes[0].value_dict["text"], "") | ||
self.assertIn(node, result_nodes[0].prev_nodes) | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,101 @@ | ||
import unittest | ||
from unittest.mock import MagicMock, mock_open, patch | ||
|
||
from uniflow.node import Node | ||
from uniflow.op.extract.load.ipynb_op import ExtractIpynbOp, ProcessIpynbOp | ||
|
||
|
||
class TestExtractIpynbOp(unittest.TestCase): | ||
@patch("nbformat.read") | ||
@patch("nbconvert.MarkdownExporter") | ||
def test_extract_with_valid_nodes(self, mock_markdown_exporter, mock_nbformat_read): | ||
mock_file_content = '{"cells": [{"cell_type": "markdown", "source": "Some markdown content"}], "metadata": {}, "nbformat": 4, "nbformat_minor": 4}' | ||
with patch("builtins.open", mock_open(read_data=mock_file_content)): | ||
mock_nb = MagicMock() | ||
mock_nbformat_read.return_value = mock_nb | ||
|
||
mock_md_exporter_instance = mock_markdown_exporter.return_value | ||
mock_md_exporter_instance.from_notebook_node.return_value = ( | ||
"# Converted Markdown", | ||
None, | ||
) | ||
|
||
extract_op = ExtractIpynbOp("test_op") | ||
test_nodes = [ | ||
Node(name="test_node_1", value_dict={"filename": "dummy.ipynb"}) | ||
] | ||
|
||
output_nodes = extract_op(test_nodes) | ||
|
||
self.assertEqual(len(output_nodes), 1) | ||
self.assertIn("# Converted Markdown", output_nodes[0].value_dict["text"]) | ||
mock_nbformat_read.assert_called_once() | ||
mock_md_exporter_instance.from_notebook_node.assert_called_once() | ||
|
||
def test_extract_ipynb_with_no_nodes(self): | ||
op = ExtractIpynbOp("extract_ipynb") | ||
result_nodes = op([]) | ||
self.assertEqual(result_nodes, []) | ||
|
||
def test_extract_ipynb_with_missing_file(self): | ||
op = ExtractIpynbOp("extract_ipynb") | ||
mock_node = Node( | ||
name="test_node", value_dict={"filename": "non_existent.ipynb"} | ||
) | ||
nodes = [mock_node] | ||
|
||
with patch("builtins.open", mock_open()) as mock_file: | ||
mock_file.side_effect = FileNotFoundError | ||
with self.assertRaises(FileNotFoundError): | ||
op(nodes) | ||
|
||
def test_extract_ipynb_with_invalid_file_content(self): | ||
op = ExtractIpynbOp("extract_ipynb") | ||
mock_node = Node( | ||
name="test_node", value_dict={"filename": "invalid_content.ipynb"} | ||
) | ||
nodes = [mock_node] | ||
|
||
with patch("builtins.open", mock_open(read_data="invalid")), patch( | ||
"nbformat.read", side_effect=ValueError | ||
): | ||
with self.assertRaises(ValueError): | ||
op(nodes) | ||
|
||
|
||
class TestProcessIpynbOp(unittest.TestCase): | ||
def test_process_ipynb_with_valid_text(self): | ||
op = ProcessIpynbOp("process_ipynb") | ||
mock_node = Node(name="test_node", value_dict={"text": "\nValid text\n"}) | ||
nodes = [mock_node] | ||
result_nodes = op(nodes) | ||
|
||
self.assertEqual(len(result_nodes), 1) | ||
self.assertEqual(result_nodes[0].value_dict["text"], "Valid text") | ||
|
||
def test_process_ipynb_with_empty_text(self): | ||
op = ProcessIpynbOp("process_ipynb") | ||
mock_node = Node(name="test_node", value_dict={"text": ""}) | ||
nodes = [mock_node] | ||
result_nodes = op(nodes) | ||
|
||
self.assertEqual(len(result_nodes), 1) | ||
self.assertEqual(result_nodes[0].value_dict["text"], "") | ||
|
||
def test_process_ipynb_with_no_nodes(self): | ||
op = ProcessIpynbOp("process_ipynb") | ||
result_nodes = op([]) | ||
self.assertEqual(result_nodes, []) | ||
|
||
def test_process_ipynb_with_whitespace_only_text(self): | ||
op = ProcessIpynbOp("process_ipynb") | ||
mock_node = Node(name="test_node", value_dict={"text": " \n \t "}) | ||
nodes = [mock_node] | ||
result_nodes = op(nodes) | ||
|
||
self.assertEqual(len(result_nodes), 1) | ||
self.assertEqual(result_nodes[0].value_dict["text"], "") | ||
|
||
|
||
if __name__ == "__main__": | ||
unittest.main() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we init the function in the class?
e.g.
self.extract_op = ExtractHTMLOp("test_splitter")