* [iptables PATCH 1/3] tests: xlate-test: Cleanup file reading loop
2022-11-07 13:08 [iptables PATCH 0/3] Extend xlate-test to replay results Phil Sutter
@ 2022-11-07 13:08 ` Phil Sutter
2022-11-07 13:08 ` [iptables PATCH 2/3] tests: xlate-test.py: Introduce run_proc() Phil Sutter
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Phil Sutter @ 2022-11-07 13:08 UTC (permalink / raw)
To: netfilter-devel
Put the actual translation test into a function to call from the loop
and clean it up a bit. Preparation work for running a second test on the
same data.
Signed-off-by: Phil Sutter <phil@nwl.cc>
---
xlate-test.py | 67 ++++++++++++++++++++++++++++-----------------------
1 file changed, 37 insertions(+), 30 deletions(-)
diff --git a/xlate-test.py b/xlate-test.py
index 03bef7e2e5934..ee393349da50d 100755
--- a/xlate-test.py
+++ b/xlate-test.py
@@ -33,6 +33,24 @@ xtables_nft_multi = 'xtables-nft-multi'
return colors["green"] + string + colors["end"]
+def test_one_xlate(name, sourceline, expected, result):
+ process = Popen([ xtables_nft_multi ] + shlex.split(sourceline), stdout=PIPE, stderr=PIPE)
+ (output, error) = process.communicate()
+ if process.returncode != 0:
+ result.append(name + ": " + red("Error: ") + "iptables-translate failure")
+ result.append(error.decode("utf-8"))
+ return False
+
+ translation = output.decode("utf-8").rstrip(" \n")
+ if translation != expected:
+ result.append(name + ": " + red("Fail"))
+ result.append(magenta("src: ") + sourceline.rstrip(" \n"))
+ result.append(magenta("exp: ") + expected)
+ result.append(magenta("res: ") + translation + "\n")
+ return False
+
+ return True
+
def run_test(name, payload):
global xtables_nft_multi
test_passed = True
@@ -41,37 +59,26 @@ xtables_nft_multi = 'xtables-nft-multi'
line = payload.readline()
while line:
- if line.startswith(keywords):
- sourceline = line
- tests += 1
- process = Popen([ xtables_nft_multi ] + shlex.split(line), stdout=PIPE, stderr=PIPE)
- (output, error) = process.communicate()
- if process.returncode == 0:
- translation = output.decode("utf-8").rstrip(" \n")
- expected = payload.readline().rstrip(" \n")
- next_expected = payload.readline()
- if next_expected.startswith("nft"):
- expected += "\n" + next_expected.rstrip(" \n")
- line = payload.readline()
- else:
- line = next_expected
- if translation != expected:
- test_passed = False
- failed += 1
- result.append(name + ": " + red("Fail"))
- result.append(magenta("src: ") + sourceline.rstrip(" \n"))
- result.append(magenta("exp: ") + expected)
- result.append(magenta("res: ") + translation + "\n")
- else:
- passed += 1
- else:
- test_passed = False
- errors += 1
- result.append(name + ": " + red("Error: ") + "iptables-translate failure")
- result.append(error.decode("utf-8"))
- line = payload.readline()
+ if not line.startswith(keywords):
+ line = payload.readline()
+ continue
+
+ sourceline = line
+ expected = payload.readline().rstrip(" \n")
+ next_expected = payload.readline()
+ if next_expected.startswith("nft"):
+ expected += "\n" + next_expected.rstrip(" \n")
+ line = payload.readline()
+ else:
+ line = next_expected
+
+ tests += 1
+ if test_one_xlate(name, sourceline, expected, result):
+ passed += 1
else:
- line = payload.readline()
+ errors += 1
+ test_passed = False
+
if (passed == tests) and not args.test:
print(name + ": " + green("OK"))
if not test_passed:
--
2.38.0
^ permalink raw reply related [flat|nested] 5+ messages in thread* [iptables PATCH 2/3] tests: xlate-test.py: Introduce run_proc()
2022-11-07 13:08 [iptables PATCH 0/3] Extend xlate-test to replay results Phil Sutter
2022-11-07 13:08 ` [iptables PATCH 1/3] tests: xlate-test: Cleanup file reading loop Phil Sutter
@ 2022-11-07 13:08 ` Phil Sutter
2022-11-07 13:08 ` [iptables PATCH 3/3] tests: xlate-test: Replay results for reverse direction testing Phil Sutter
2022-11-11 18:16 ` [iptables PATCH 0/3] Extend xlate-test to replay results Phil Sutter
3 siblings, 0 replies; 5+ messages in thread
From: Phil Sutter @ 2022-11-07 13:08 UTC (permalink / raw)
To: netfilter-devel
It's just a convenience wrapper around Popen(), simplifying the call.
Signed-off-by: Phil Sutter <phil@nwl.cc>
---
xlate-test.py | 16 +++++++++++-----
1 file changed, 11 insertions(+), 5 deletions(-)
diff --git a/xlate-test.py b/xlate-test.py
index ee393349da50d..bfcddde0f84a6 100755
--- a/xlate-test.py
+++ b/xlate-test.py
@@ -7,6 +7,13 @@ import shlex
import argparse
from subprocess import Popen, PIPE
+def run_proc(args, shell = False):
+ """A simple wrapper around Popen, returning (rc, stdout, stderr)"""
+ process = Popen(args, text = True, shell = shell,
+ stdout = PIPE, stderr = PIPE)
+ output, error = process.communicate()
+ return (process.returncode, output, error)
+
keywords = ("iptables-translate", "ip6tables-translate", "ebtables-translate")
xtables_nft_multi = 'xtables-nft-multi'
@@ -34,14 +41,13 @@ xtables_nft_multi = 'xtables-nft-multi'
def test_one_xlate(name, sourceline, expected, result):
- process = Popen([ xtables_nft_multi ] + shlex.split(sourceline), stdout=PIPE, stderr=PIPE)
- (output, error) = process.communicate()
- if process.returncode != 0:
+ rc, output, error = run_proc([xtables_nft_multi] + shlex.split(sourceline))
+ if rc != 0:
result.append(name + ": " + red("Error: ") + "iptables-translate failure")
- result.append(error.decode("utf-8"))
+ result.append(error)
return False
- translation = output.decode("utf-8").rstrip(" \n")
+ translation = output.rstrip(" \n")
if translation != expected:
result.append(name + ": " + red("Fail"))
result.append(magenta("src: ") + sourceline.rstrip(" \n"))
--
2.38.0
^ permalink raw reply related [flat|nested] 5+ messages in thread* [iptables PATCH 3/3] tests: xlate-test: Replay results for reverse direction testing
2022-11-07 13:08 [iptables PATCH 0/3] Extend xlate-test to replay results Phil Sutter
2022-11-07 13:08 ` [iptables PATCH 1/3] tests: xlate-test: Cleanup file reading loop Phil Sutter
2022-11-07 13:08 ` [iptables PATCH 2/3] tests: xlate-test.py: Introduce run_proc() Phil Sutter
@ 2022-11-07 13:08 ` Phil Sutter
2022-11-11 18:16 ` [iptables PATCH 0/3] Extend xlate-test to replay results Phil Sutter
3 siblings, 0 replies; 5+ messages in thread
From: Phil Sutter @ 2022-11-07 13:08 UTC (permalink / raw)
To: netfilter-devel
Call nft with translation output as input, then check xtables-save
output to make sure iptables-nft can handle anything it suggests nft to
turn its ruleset into.
This extends the test case syntax to cover for expected asymmetries.
When the existing syntax was something like this:
| <xlate command>
| <nft output1>
| [<nft output2>
The new syntax then is:
| <xlate command>[;<replay rule part>]
| <nft output1>
| [<nft output2>]
To keep things terse, <replay rule part> may omit the obligatory '-A
<chain>' argument. If missing, <xlate command> is sanitized for how it
would appear in xtables-save output: '-I' is converted into '-A' and an
optional table spec is removed.
Since replay mode has to manipulate the ruleset in-kernel, abort if
called by unprivileged user. Also try to run in own net namespace to
reduce collateral damage.
Signed-off-by: Phil Sutter <phil@nwl.cc>
---
xlate-test.py | 145 +++++++++++++++++++++++++++++++++++++++++++++++++-
1 file changed, 144 insertions(+), 1 deletion(-)
diff --git a/xlate-test.py b/xlate-test.py
index bfcddde0f84a6..f3fcd797af908 100755
--- a/xlate-test.py
+++ b/xlate-test.py
@@ -57,8 +57,92 @@ xtables_nft_multi = 'xtables-nft-multi'
return True
+def test_one_replay(name, sourceline, expected, result):
+ global args
+
+ searchline = None
+ if sourceline.find(';') >= 0:
+ sourceline, searchline = sourceline.split(';')
+
+ srcwords = sourceline.split()
+
+ srccmd = srcwords[0]
+ table_idx = -1
+ chain_idx = -1
+ table_name = "filter"
+ chain_name = None
+ for idx in range(1, len(srcwords)):
+ if srcwords[idx] in ["-A", "-I", "--append", "--insert"]:
+ chain_idx = idx
+ chain_name = srcwords[idx + 1]
+ elif srcwords[idx] in ["-t", "--table"]:
+ table_idx = idx
+ table_name = srcwords[idx + 1]
+
+ if not chain_name:
+ return True # nothing to do?
+
+ if searchline is None:
+ # adjust sourceline as required
+ srcwords[chain_idx] = "-A"
+ if table_idx >= 0:
+ srcwords.pop(table_idx)
+ srcwords.pop(table_idx)
+ searchline = " ".join(srcwords[1:])
+ elif not searchline.startswith("-A"):
+ tmp = ["-A", chain_name]
+ if len(searchline) > 0:
+ tmp.extend(searchline)
+ searchline = " ".join(tmp)
+
+ fam = ""
+ if srccmd.startswith("ip6"):
+ fam = "ip6 "
+ elif srccmd.startswith("ebt"):
+ fam = "bridge "
+ nft_input = [
+ "flush ruleset",
+ "add table " + fam + table_name,
+ "add chain " + fam + table_name + " " + chain_name
+ ] + [ l.removeprefix("nft ") for l in expected.split("\n") ]
+
+ # feed input via the pipe to make sure the shell "does its thing"
+ cmd = "echo \"" + "\n".join(nft_input) + "\" | " + args.nft + " -f -"
+ rc, output, error = run_proc(cmd, shell = True)
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append(args.nft + " call failed: " + error.rstrip('\n'))
+ for line in nft_input:
+ result.append(magenta("input: ") + line)
+ return False
+
+ ipt = srccmd.split('-')[0]
+ rc, output, error = run_proc([xtables_nft_multi, ipt + "-save"])
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append(ipt + "-save call failed: " + error)
+ return False
+
+ if output.find(searchline) < 0:
+ outline = None
+ for l in output.split('\n'):
+ if l.startswith('-A '):
+ output = l
+ break
+ result.append(name + ": " + red("Replay fail"))
+ result.append(magenta("src: '") + expected + "'")
+ result.append(magenta("exp: '") + searchline + "'")
+ for l in output.split('\n'):
+ result.append(magenta("res: ") + l)
+ return False
+
+ return True
+
+
def run_test(name, payload):
global xtables_nft_multi
+ global args
+
test_passed = True
tests = passed = failed = errors = 0
result = []
@@ -69,7 +153,10 @@ xtables_nft_multi = 'xtables-nft-multi'
line = payload.readline()
continue
- sourceline = line
+ sourceline = replayline = line.rstrip("\n")
+ if line.find(';') >= 0:
+ sourceline = line.split(';')[0]
+
expected = payload.readline().rstrip(" \n")
next_expected = payload.readline()
if next_expected.startswith("nft"):
@@ -84,6 +171,20 @@ xtables_nft_multi = 'xtables-nft-multi'
else:
errors += 1
test_passed = False
+ continue
+
+ if args.replay:
+ tests += 1
+ if test_one_replay(name, replayline, expected, result):
+ passed += 1
+ else:
+ errors += 1
+ test_passed = False
+
+ rc, output, error = run_proc([args.nft, "flush", "ruleset"])
+ if rc != 0:
+ result.append(name + ": " + red("Fail"))
+ result.append("nft flush ruleset call failed: " + error)
if (passed == tests) and not args.test:
print(name + ": " + green("OK"))
@@ -106,8 +207,44 @@ xtables_nft_multi = 'xtables-nft-multi'
return (test_files, total_tests, total_passed, total_failed, total_error)
+def spawn_netns():
+ # prefer unshare module
+ try:
+ import unshare
+ unshare.unshare(unshare.CLONE_NEWNET)
+ return True
+ except:
+ pass
+
+ # sledgehammer style:
+ # - call ourselves prefixed by 'unshare -n' if found
+ # - pass extra --no-netns parameter to avoid another recursion
+ try:
+ import shutil
+
+ unshare = shutil.which("unshare")
+ if unshare is None:
+ return False
+
+ sys.argv.append("--no-netns")
+ os.execv(unshare, [unshare, "-n", sys.executable] + sys.argv)
+ except:
+ pass
+
+ return False
+
+
def main():
global xtables_nft_multi
+
+ if args.replay:
+ if os.getuid() != 0:
+ print("Replay test requires root, sorry", file=sys.stderr)
+ return
+ if not args.no_netns and not spawn_netns():
+ print("Cannot run in own namespace, connectivity might break",
+ file=sys.stderr)
+
if not args.host:
os.putenv("XTABLES_LIBDIR", os.path.abspath("extensions"))
xtables_nft_multi = os.path.abspath(os.path.curdir) \
@@ -139,6 +276,12 @@ xtables_nft_multi = 'xtables-nft-multi'
parser = argparse.ArgumentParser()
parser.add_argument('-H', '--host', action='store_true',
help='Run tests against installed binaries')
+parser.add_argument('-R', '--replay', action='store_true',
+ help='Replay tests to check iptables-nft parser')
+parser.add_argument('-n', '--nft', type=str, default='nft',
+ help='Replay using given nft binary (default: \'%(default)s\')')
+parser.add_argument('--no-netns', action='store_true',
+ help='Do not run testsuite in own network namespace')
parser.add_argument("test", nargs="?", help="run only the specified test file")
args = parser.parse_args()
sys.exit(main())
--
2.38.0
^ permalink raw reply related [flat|nested] 5+ messages in thread