Compare commits
1 commit
main
...
python-tes
Author | SHA1 | Date | |
---|---|---|---|
|
9d8f0f5e5c |
3 changed files with 165 additions and 40 deletions
130
test/agenix_testing.py
Normal file
130
test/agenix_testing.py
Normal file
|
@ -0,0 +1,130 @@
|
||||||
|
"""Provide a class and helper methods for agenix integration tests."""
|
||||||
|
|
||||||
|
|
||||||
|
import typing as t
|
||||||
|
|
||||||
|
T = t.TypeVar("T", str, list[str])
|
||||||
|
|
||||||
|
|
||||||
|
class AgenixTester:
|
||||||
|
"""Provide a class to help reduce repetition in setup."""
|
||||||
|
|
||||||
|
def __init__(self, system, user, password) -> None:
|
||||||
|
"""Necessary setup can be put here."""
|
||||||
|
self.system = system
|
||||||
|
self.user = user
|
||||||
|
self.password = password
|
||||||
|
self.setup()
|
||||||
|
|
||||||
|
def login(self) -> None:
|
||||||
|
self.system.wait_for_unit("multi-user.target")
|
||||||
|
self.system.wait_until_succeeds("pgrep -f 'agetty.*tty1'")
|
||||||
|
self.system.sleep(2)
|
||||||
|
self.system.send_key("alt-f2")
|
||||||
|
self.system.wait_until_succeeds("[ $(fgconsole) = 2 ]")
|
||||||
|
self.system.wait_for_unit("getty@tty2.service")
|
||||||
|
self.system.wait_until_succeeds("pgrep -f 'agetty.*tty2'")
|
||||||
|
self.system.wait_until_tty_matches("2", "login: ")
|
||||||
|
self.system.send_chars(f"{self.user}\n")
|
||||||
|
self.system.wait_until_tty_matches("2", f"login: {self.user}")
|
||||||
|
self.system.wait_until_succeeds("pgrep login")
|
||||||
|
self.system.sleep(2)
|
||||||
|
self.system.send_chars(f"{self.password}\n")
|
||||||
|
|
||||||
|
def setup(self) -> None:
|
||||||
|
"""Run common setup code."""
|
||||||
|
self.login()
|
||||||
|
|
||||||
|
def user_succeed(
|
||||||
|
self,
|
||||||
|
cmds: T,
|
||||||
|
directory: str | None = None,
|
||||||
|
debug: bool = False,
|
||||||
|
) -> T:
|
||||||
|
"""Run cmds as `self.user`, optionally in a specified directory.
|
||||||
|
|
||||||
|
For convenience, if cmds is a sequence, returns output as a list of
|
||||||
|
outputs corresponding with each line in cmds. if cmds is a string,
|
||||||
|
returns output as a string.
|
||||||
|
"""
|
||||||
|
|
||||||
|
context: list[str] = [
|
||||||
|
"set -Eeu -o pipefail",
|
||||||
|
"shopt -s inherit_errexit",
|
||||||
|
]
|
||||||
|
if debug:
|
||||||
|
context.append("set -x")
|
||||||
|
|
||||||
|
if directory:
|
||||||
|
context.append(f"cd {directory}")
|
||||||
|
|
||||||
|
if isinstance(cmds, str):
|
||||||
|
commands_str = "\n".join([*context, cmds])
|
||||||
|
final_command = f"sudo -u {self.user} -- bash -c '{commands_str}'"
|
||||||
|
return self.system.succeed(final_command)
|
||||||
|
|
||||||
|
results: list[str] = []
|
||||||
|
for cmd in cmds:
|
||||||
|
commands_str = "\n".join([*context, cmd])
|
||||||
|
final_command = f"sudo -u {self.user} -- bash -c '{commands_str}'"
|
||||||
|
result = self.system.succeed(final_command)
|
||||||
|
results.append(result.strip())
|
||||||
|
return t.cast(T, results)
|
||||||
|
|
||||||
|
def run_all(self) -> None:
|
||||||
|
self.test_rekeying()
|
||||||
|
self.test_user_edit()
|
||||||
|
|
||||||
|
def test_rekeying(self) -> None:
|
||||||
|
"""Ensure we can rekey a file and its hash changes."""
|
||||||
|
|
||||||
|
before_hash, _, after_hash = self.user_succeed(
|
||||||
|
[
|
||||||
|
"sha256sum passwordfile-user1.age",
|
||||||
|
f"agenix -r -i /home/{self.user}/.ssh/id_ed25519",
|
||||||
|
"sha256sum passwordfile-user1.age",
|
||||||
|
],
|
||||||
|
directory="/tmp/secrets",
|
||||||
|
)
|
||||||
|
|
||||||
|
# Ensure we actually have hashes
|
||||||
|
for line in [before_hash, after_hash]:
|
||||||
|
h = line.split()
|
||||||
|
assert len(h) == 2, f"hash should be [hash, filename], got {h}"
|
||||||
|
assert h[1] == "passwordfile-user1.age", "filename is incorrect"
|
||||||
|
assert len(h[0].strip()) == 64, "hash length is incorrect"
|
||||||
|
assert (
|
||||||
|
before_hash[0] != after_hash[0]
|
||||||
|
), "hash did not change with rekeying"
|
||||||
|
|
||||||
|
def test_user_edit(self):
|
||||||
|
"""Ensure user1 can edit passwordfile-user1.age."""
|
||||||
|
self.user_succeed(
|
||||||
|
"EDITOR=cat agenix -e passwordfile-user1.age",
|
||||||
|
directory="/tmp/secrets",
|
||||||
|
)
|
||||||
|
|
||||||
|
self.user_succeed("echo bogus > ~/.ssh/id_rsa")
|
||||||
|
|
||||||
|
# Cannot edit with bogus default id_rsa
|
||||||
|
self.system.fail(
|
||||||
|
f"sudo -u {self.user} -- bash -c '"
|
||||||
|
"cd /tmp/secrets; "
|
||||||
|
"EDITOR=cat agenix -e /tmp/secrets/passwordfile-user1.age; "
|
||||||
|
"'"
|
||||||
|
)
|
||||||
|
|
||||||
|
# user1 can still edit if good identity specified
|
||||||
|
*_, pw = self.user_succeed(
|
||||||
|
[
|
||||||
|
(
|
||||||
|
"EDITOR=cat agenix -e passwordfile-user1.age "
|
||||||
|
"-i /home/user1/.ssh/id_ed25519"
|
||||||
|
),
|
||||||
|
"rm ~/.ssh/id_rsa",
|
||||||
|
"echo 'secret1234' | agenix -e passwordfile-user1.age",
|
||||||
|
"EDITOR=cat agenix -e passwordfile-user1.age",
|
||||||
|
],
|
||||||
|
directory="/tmp/secrets",
|
||||||
|
)
|
||||||
|
assert pw == "secret1234", f"password didn't match, got '{pw}'"
|
|
@ -9,6 +9,24 @@
|
||||||
}:
|
}:
|
||||||
pkgs.nixosTest {
|
pkgs.nixosTest {
|
||||||
name = "agenix-integration";
|
name = "agenix-integration";
|
||||||
|
extraPythonPackages = ps: let
|
||||||
|
agenixTesting = let
|
||||||
|
version = (pkgs.callPackage ../pkgs/agenix.nix {}).version;
|
||||||
|
in
|
||||||
|
ps.buildPythonPackage rec {
|
||||||
|
inherit version;
|
||||||
|
pname = "agenix_testing";
|
||||||
|
src = ./.;
|
||||||
|
format = "pyproject";
|
||||||
|
propagatedBuildInputs = [ps.setuptools];
|
||||||
|
postPatch = ''
|
||||||
|
# Keep a default version makes for easy installation outside of
|
||||||
|
# nix for debugging
|
||||||
|
substituteInPlace pyproject.toml \
|
||||||
|
--replace 'version = "0.1.0"' 'version = "${version}"'
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
in [agenixTesting];
|
||||||
nodes.system1 = {
|
nodes.system1 = {
|
||||||
config,
|
config,
|
||||||
pkgs,
|
pkgs,
|
||||||
|
@ -49,47 +67,17 @@ pkgs.nixosTest {
|
||||||
user = "user1";
|
user = "user1";
|
||||||
password = "password1234";
|
password = "password1234";
|
||||||
in ''
|
in ''
|
||||||
system1.wait_for_unit("multi-user.target")
|
# Skipping analyzing "agenix_testing": module is installed, but missing
|
||||||
system1.wait_until_succeeds("pgrep -f 'agetty.*tty1'")
|
# library stubs or py.typed marker
|
||||||
system1.sleep(2)
|
from agenix_testing import AgenixTester # type: ignore
|
||||||
system1.send_key("alt-f2")
|
tester = AgenixTester(system=system1, user="${user}", password="${password}")
|
||||||
system1.wait_until_succeeds("[ $(fgconsole) = 2 ]")
|
|
||||||
system1.wait_for_unit("getty@tty2.service")
|
# Can still be used as before
|
||||||
system1.wait_until_succeeds("pgrep -f 'agetty.*tty2'")
|
|
||||||
system1.wait_until_tty_matches("2", "login: ")
|
|
||||||
system1.send_chars("${user}\n")
|
|
||||||
system1.wait_until_tty_matches("2", "login: ${user}")
|
|
||||||
system1.wait_until_succeeds("pgrep login")
|
|
||||||
system1.sleep(2)
|
|
||||||
system1.send_chars("${password}\n")
|
|
||||||
system1.send_chars("whoami > /tmp/1\n")
|
system1.send_chars("whoami > /tmp/1\n")
|
||||||
system1.wait_for_file("/tmp/1")
|
# Or from `tester.system`
|
||||||
assert "${user}" in system1.succeed("cat /tmp/1")
|
tester.system.wait_for_file("/tmp/1")
|
||||||
|
assert "${user}" in tester.system.succeed("cat /tmp/1")
|
||||||
|
|
||||||
userDo = lambda input : f"sudo -u user1 -- bash -c 'set -eou pipefail; cd /tmp/secrets; {input}'"
|
tester.run_all()
|
||||||
|
|
||||||
before_hash = system1.succeed(userDo('sha256sum passwordfile-user1.age')).split()
|
|
||||||
print(system1.succeed(userDo('agenix -r -i /home/user1/.ssh/id_ed25519')))
|
|
||||||
after_hash = system1.succeed(userDo('sha256sum passwordfile-user1.age')).split()
|
|
||||||
|
|
||||||
# Ensure we actually have hashes
|
|
||||||
for h in [before_hash, after_hash]:
|
|
||||||
assert len(h) == 2, "hash should be [hash, filename]"
|
|
||||||
assert h[1] == "passwordfile-user1.age", "filename is incorrect"
|
|
||||||
assert len(h[0].strip()) == 64, "hash length is incorrect"
|
|
||||||
assert before_hash[0] != after_hash[0], "hash did not change with rekeying"
|
|
||||||
|
|
||||||
# user1 can edit passwordfile-user1.age
|
|
||||||
system1.succeed(userDo("EDITOR=cat agenix -e passwordfile-user1.age"))
|
|
||||||
|
|
||||||
# user1 can edit even if bogus id_rsa present
|
|
||||||
system1.succeed(userDo("echo bogus > ~/.ssh/id_rsa"))
|
|
||||||
system1.fail(userDo("EDITOR=cat agenix -e passwordfile-user1.age"))
|
|
||||||
system1.succeed(userDo("EDITOR=cat agenix -e passwordfile-user1.age -i /home/user1/.ssh/id_ed25519"))
|
|
||||||
system1.succeed(userDo("rm ~/.ssh/id_rsa"))
|
|
||||||
|
|
||||||
# user1 can edit a secret by piping in contents
|
|
||||||
system1.succeed(userDo("echo 'secret1234' | agenix -e passwordfile-user1.age"))
|
|
||||||
assert "secret1234" in system1.succeed(userDo("EDITOR=cat agenix -e passwordfile-user1.age"))
|
|
||||||
'';
|
'';
|
||||||
}
|
}
|
||||||
|
|
7
test/pyproject.toml
Normal file
7
test/pyproject.toml
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
[build-system]
|
||||||
|
requires = ["setuptools"]
|
||||||
|
build-backend = "setuptools.build_meta"
|
||||||
|
|
||||||
|
[project]
|
||||||
|
name = "agenix_testing"
|
||||||
|
version = "0.1.0"
|
Loading…
Reference in a new issue