From: Kevin Wolf <kwolf@redhat.com>
To: Amjad Alsharafi <amjadsharafi10@gmail.com>
Cc: qemu-devel@nongnu.org, Hanna Reitz <hreitz@redhat.com>,
"open list:vvfat" <qemu-block@nongnu.org>
Subject: Re: [PATCH v4 4/4] iotests: Add `vvfat` tests
Date: Mon, 10 Jun 2024 14:01:24 +0200 [thread overview]
Message-ID: <ZmbrFAOjpuFB_pQG@redhat.com> (raw)
In-Reply-To: <5780279cb6d0ac2a25b359ab9bbd480f34a4265f.1717549035.git.amjadsharafi10@gmail.com>
Am 05.06.2024 um 02:58 hat Amjad Alsharafi geschrieben:
> Added several tests to verify the implementation of the vvfat driver.
>
> We needed a way to interact with it, so created a basic `fat16.py` driver that handled writing correct sectors for us.
>
> Added `vvfat` to the non-generic formats, as its not a normal image format.
>
> Signed-off-by: Amjad Alsharafi <amjadsharafi10@gmail.com>
> ---
> tests/qemu-iotests/check | 2 +-
> tests/qemu-iotests/fat16.py | 635 +++++++++++++++++++++++++++++
> tests/qemu-iotests/testenv.py | 2 +-
> tests/qemu-iotests/tests/vvfat | 440 ++++++++++++++++++++
> tests/qemu-iotests/tests/vvfat.out | 5 +
> 5 files changed, 1082 insertions(+), 2 deletions(-)
> create mode 100644 tests/qemu-iotests/fat16.py
> create mode 100755 tests/qemu-iotests/tests/vvfat
> create mode 100755 tests/qemu-iotests/tests/vvfat.out
>
> diff --git a/tests/qemu-iotests/check b/tests/qemu-iotests/check
> index 56d88ca423..545f9ec7bd 100755
> --- a/tests/qemu-iotests/check
> +++ b/tests/qemu-iotests/check
> @@ -84,7 +84,7 @@ def make_argparser() -> argparse.ArgumentParser:
> p.set_defaults(imgfmt='raw', imgproto='file')
>
> format_list = ['raw', 'bochs', 'cloop', 'parallels', 'qcow', 'qcow2',
> - 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg']
> + 'qed', 'vdi', 'vpc', 'vhdx', 'vmdk', 'luks', 'dmg', 'vvfat']
> g_fmt = p.add_argument_group(
> ' image format options',
> 'The following options set the IMGFMT environment variable. '
> diff --git a/tests/qemu-iotests/fat16.py b/tests/qemu-iotests/fat16.py
> new file mode 100644
> index 0000000000..baf801b4d5
> --- /dev/null
> +++ b/tests/qemu-iotests/fat16.py
> @@ -0,0 +1,635 @@
> +# A simple FAT16 driver that is used to test the `vvfat` driver in QEMU.
> +#
> +# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program. If not, see <http://www.gnu.org/licenses/>.
> +
> +from typing import List
> +import string
> +
> +SECTOR_SIZE = 512
> +DIRENTRY_SIZE = 32
> +ALLOWED_FILE_CHARS = \
> + set("!#$%&'()-@^_`{}~" + string.digits + string.ascii_uppercase)
> +
> +
> +class MBR:
> + def __init__(self, data: bytes):
> + assert len(data) == 512
> + self.partition_table = []
> + for i in range(4):
> + partition = data[446 + i * 16 : 446 + (i + 1) * 16]
> + self.partition_table.append(
> + {
> + "status": partition[0],
> + "start_head": partition[1],
> + "start_sector": partition[2] & 0x3F,
> + "start_cylinder":
> + ((partition[2] & 0xC0) << 2) | partition[3],
> + "type": partition[4],
> + "end_head": partition[5],
> + "end_sector": partition[6] & 0x3F,
> + "end_cylinder":
> + ((partition[6] & 0xC0) << 2) | partition[7],
> + "start_lba": int.from_bytes(partition[8:12], "little"),
> + "size": int.from_bytes(partition[12:16], "little"),
> + }
> + )
> +
> + def __str__(self):
> + return "\n".join(
> + [f"{i}: {partition}"
> + for i, partition in enumerate(self.partition_table)]
> + )
> +
> +
> +class FatBootSector:
> + def __init__(self, data: bytes):
> + assert len(data) == 512
> + self.bytes_per_sector = int.from_bytes(data[11:13], "little")
> + self.sectors_per_cluster = data[13]
> + self.reserved_sectors = int.from_bytes(data[14:16], "little")
> + self.fat_count = data[16]
> + self.root_entries = int.from_bytes(data[17:19], "little")
> + self.media_descriptor = data[21]
> + self.fat_size = int.from_bytes(data[22:24], "little")
> + self.sectors_per_fat = int.from_bytes(data[22:24], "little")
Why two different attributes self.fat_size and self.sectors_per_fat that
contain the same value?
> + self.sectors_per_track = int.from_bytes(data[24:26], "little")
> + self.heads = int.from_bytes(data[26:28], "little")
> + self.hidden_sectors = int.from_bytes(data[28:32], "little")
> + self.total_sectors = int.from_bytes(data[32:36], "little")
This value should only be considered if the 16 bit value at byte 19
(which you don't store at all) was 0.
> + self.drive_number = data[36]
> + self.volume_id = int.from_bytes(data[39:43], "little")
> + self.volume_label = data[43:54].decode("ascii").strip()
> + self.fs_type = data[54:62].decode("ascii").strip()
> +
> + def root_dir_start(self):
> + """
> + Calculate the start sector of the root directory.
> + """
> + return self.reserved_sectors + self.fat_count * self.sectors_per_fat
> +
> + def root_dir_size(self):
> + """
> + Calculate the size of the root directory in sectors.
> + """
> + return (
> + self.root_entries * DIRENTRY_SIZE + self.bytes_per_sector - 1
> + ) // self.bytes_per_sector
> +
> + def data_sector_start(self):
> + """
> + Calculate the start sector of the data region.
> + """
> + return self.root_dir_start() + self.root_dir_size()
> +
> + def first_sector_of_cluster(self, cluster: int):
> + """
> + Calculate the first sector of the given cluster.
> + """
> + return self.data_sector_start() \
> + + (cluster - 2) * self.sectors_per_cluster
> +
> + def cluster_bytes(self):
> + """
> + Calculate the number of bytes in a cluster.
> + """
> + return self.bytes_per_sector * self.sectors_per_cluster
> +
> + def __str__(self):
> + return (
> + f"Bytes per sector: {self.bytes_per_sector}\n"
> + f"Sectors per cluster: {self.sectors_per_cluster}\n"
> + f"Reserved sectors: {self.reserved_sectors}\n"
> + f"FAT count: {self.fat_count}\n"
> + f"Root entries: {self.root_entries}\n"
> + f"Total sectors: {self.total_sectors}\n"
> + f"Media descriptor: {self.media_descriptor}\n"
> + f"Sectors per FAT: {self.sectors_per_fat}\n"
> + f"Sectors per track: {self.sectors_per_track}\n"
> + f"Heads: {self.heads}\n"
> + f"Hidden sectors: {self.hidden_sectors}\n"
> + f"Drive number: {self.drive_number}\n"
> + f"Volume ID: {self.volume_id}\n"
> + f"Volume label: {self.volume_label}\n"
> + f"FS type: {self.fs_type}\n"
> + )
> +
> +
> +class FatDirectoryEntry:
> + def __init__(self, data: bytes, sector: int, offset: int):
> + self.name = data[0:8].decode("ascii").strip()
> + self.ext = data[8:11].decode("ascii").strip()
> + self.attributes = data[11]
> + self.reserved = data[12]
> + self.create_time_tenth = data[13]
> + self.create_time = int.from_bytes(data[14:16], "little")
> + self.create_date = int.from_bytes(data[16:18], "little")
> + self.last_access_date = int.from_bytes(data[18:20], "little")
> + high_cluster = int.from_bytes(data[20:22], "little")
> + self.last_mod_time = int.from_bytes(data[22:24], "little")
> + self.last_mod_date = int.from_bytes(data[24:26], "little")
> + low_cluster = int.from_bytes(data[26:28], "little")
> + self.cluster = (high_cluster << 16) | low_cluster
> + self.size_bytes = int.from_bytes(data[28:32], "little")
> +
> + # extra (to help write back to disk)
> + self.sector = sector
> + self.offset = offset
> +
> + def as_bytes(self) -> bytes:
> + return (
> + self.name.ljust(8, " ").encode("ascii")
> + + self.ext.ljust(3, " ").encode("ascii")
> + + self.attributes.to_bytes(1, "little")
> + + self.reserved.to_bytes(1, "little")
> + + self.create_time_tenth.to_bytes(1, "little")
> + + self.create_time.to_bytes(2, "little")
> + + self.create_date.to_bytes(2, "little")
> + + self.last_access_date.to_bytes(2, "little")
> + + (self.cluster >> 16).to_bytes(2, "little")
> + + self.last_mod_time.to_bytes(2, "little")
> + + self.last_mod_date.to_bytes(2, "little")
> + + (self.cluster & 0xFFFF).to_bytes(2, "little")
> + + self.size_bytes.to_bytes(4, "little")
> + )
> +
> + def whole_name(self):
> + if self.ext:
> + return f"{self.name}.{self.ext}"
> + else:
> + return self.name
> +
> + def __str__(self):
> + return (
> + f"Name: {self.name}\n"
> + f"Ext: {self.ext}\n"
> + f"Attributes: {self.attributes}\n"
> + f"Reserved: {self.reserved}\n"
> + f"Create time tenth: {self.create_time_tenth}\n"
> + f"Create time: {self.create_time}\n"
> + f"Create date: {self.create_date}\n"
> + f"Last access date: {self.last_access_date}\n"
> + f"Last mod time: {self.last_mod_time}\n"
> + f"Last mod date: {self.last_mod_date}\n"
> + f"Cluster: {self.cluster}\n"
> + f"Size: {self.size_bytes}\n"
> + )
> +
> + def __repr__(self):
> + # convert to dict
> + return str(vars(self))
> +
> +
> +class Fat16:
> + def __init__(
> + self,
> + start_sector: int,
> + size: int,
> + sector_reader: callable,
> + sector_writer: callable,
> + ):
> + self.start_sector = start_sector
> + self.size_in_sectors = size
> + self.sector_reader = sector_reader
> + self.sector_writer = sector_writer
> +
> + self.boot_sector = FatBootSector(self.sector_reader(start_sector))
> +
> + fat_size_in_sectors = \
> + self.boot_sector.fat_size * self.boot_sector.fat_count
> + self.fats = self.read_sectors(
> + self.boot_sector.reserved_sectors, fat_size_in_sectors
> + )
> + self.fats_dirty_sectors = set()
> +
> + def read_sectors(self, start_sector: int, num_sectors: int) -> bytes:
> + return self.sector_reader(start_sector + self.start_sector, num_sectors)
> +
> + def write_sectors(self, start_sector: int, data: bytes):
> + return self.sector_writer(start_sector + self.start_sector, data)
> +
> + def directory_from_bytes(
> + self, data: bytes, start_sector: int
> + ) -> List[FatDirectoryEntry]:
> + """
> + Convert `bytes` into a list of `FatDirectoryEntry` objects.
> + Will ignore long file names.
> + Will stop when it encounters a 0x00 byte.
> + """
> +
> + entries = []
> + for i in range(0, len(data), DIRENTRY_SIZE):
> + entry = data[i : i + DIRENTRY_SIZE]
> +
> + current_sector = start_sector + (i // SECTOR_SIZE)
> + current_offset = i % SECTOR_SIZE
> +
> + if entry[0] == 0:
> + break
> + elif entry[0] == 0xE5:
> + # Deleted file
> + continue
> +
> + if entry[11] & 0xF == 0xF:
> + # Long file name
> + continue
> +
> + entries.append(
> + FatDirectoryEntry(entry, current_sector, current_offset))
> + return entries
> +
> + def read_root_directory(self) -> List[FatDirectoryEntry]:
> + root_dir = self.read_sectors(
> + self.boot_sector.root_dir_start(), self.boot_sector.root_dir_size()
> + )
> + return self.directory_from_bytes(root_dir,
> + self.boot_sector.root_dir_start())
> +
> + def read_fat_entry(self, cluster: int) -> int:
> + """
> + Read the FAT entry for the given cluster.
> + """
> + fat_offset = cluster * 2 # FAT16
> + return int.from_bytes(self.fats[fat_offset : fat_offset + 2], "little")
> +
> + def write_fat_entry(self, cluster: int, value: int):
> + """
> + Write the FAT entry for the given cluster.
> + """
> + fat_offset = cluster * 2
> + self.fats = (
> + self.fats[:fat_offset]
> + + value.to_bytes(2, "little")
> + + self.fats[fat_offset + 2 :]
> + )
> + self.fats_dirty_sectors.add(fat_offset // SECTOR_SIZE)
> +
> + def flush_fats(self):
> + """
> + Write the FATs back to the disk.
> + """
> + for sector in self.fats_dirty_sectors:
> + data = self.fats[sector * SECTOR_SIZE : (sector + 1) * SECTOR_SIZE]
> + sector = self.boot_sector.reserved_sectors + sector
> + self.write_sectors(sector, data)
> + self.fats_dirty_sectors = set()
> +
> + def next_cluster(self, cluster: int) -> int | None:
> + """
> + Get the next cluster in the chain.
> + If its `None`, then its the last cluster.
> + The function will crash if the next cluster
> + is `FREE` (unexpected) or invalid entry.
> + """
> + fat_entry = self.read_fat_entry(cluster)
> + if fat_entry == 0:
> + raise Exception("Unexpected: FREE cluster")
> + elif fat_entry == 1:
> + raise Exception("Unexpected: RESERVED cluster")
> + elif fat_entry >= 0xFFF8:
> + return None
> + elif fat_entry >= 0xFFF7:
> + raise Exception("Invalid FAT entry")
> + else:
> + return fat_entry
> +
> + def next_free_cluster(self) -> int:
> + """
> + Find the next free cluster.
> + """
> + # simple linear search
> + for i in range(2, 0xFFFF):
> + if self.read_fat_entry(i) == 0:
> + return i
> + raise Exception("No free clusters")
> +
> + def read_cluster(self, cluster: int) -> bytes:
> + """
> + Read the cluster at the given cluster.
> + """
> + return self.read_sectors(
> + self.boot_sector.first_sector_of_cluster(cluster),
> + self.boot_sector.sectors_per_cluster,
> + )
> +
> + def write_cluster(self, cluster: int, data: bytes):
> + """
> + Write the cluster at the given cluster.
> + """
> + assert len(data) == self.boot_sector.cluster_bytes()
> + return self.write_sectors(
> + self.boot_sector.first_sector_of_cluster(cluster),
> + data,
> + )
> +
> + def read_directory(self, cluster: int) -> List[FatDirectoryEntry]:
> + """
> + Read the directory at the given cluster.
> + """
> + entries = []
> + while cluster is not None:
> + data = self.read_cluster(cluster)
> + entries.extend(
> + self.directory_from_bytes(
> + data, self.boot_sector.first_sector_of_cluster(cluster)
> + )
> + )
> + cluster = self.next_cluster(cluster)
> + return entries
> +
> + def add_direntry(self,
> + cluster: int | None,
> + name: str, ext: str,
> + attributes: int):
> + """
> + Add a new directory entry to the given cluster.
> + If the cluster is `None`, then it will be added to the root directory.
> + """
> +
> + def find_free_entry(data: bytes):
> + for i in range(0, len(data), DIRENTRY_SIZE):
> + entry = data[i : i + DIRENTRY_SIZE]
> + if entry[0] == 0 or entry[0] == 0xE5:
> + return i
> + return None
> +
> + assert len(name) <= 8, "Name must be 8 characters or less"
> + assert len(ext) <= 3, "Ext must be 3 characters or less"
> + assert attributes % 0x15 != 0x15, "Invalid attributes"
> +
> + # initial dummy data
> + new_entry = FatDirectoryEntry(b"\0" * 32, 0, 0)
> + new_entry.name = name.ljust(8, " ")
> + new_entry.ext = ext.ljust(3, " ")
> + new_entry.attributes = attributes
> + new_entry.reserved = 0
> + new_entry.create_time_tenth = 0
> + new_entry.create_time = 0
> + new_entry.create_date = 0
> + new_entry.last_access_date = 0
> + new_entry.last_mod_time = 0
> + new_entry.last_mod_date = 0
> + new_entry.cluster = self.next_free_cluster()
> + new_entry.size_bytes = 0
> +
> + # mark as EOF
> + self.write_fat_entry(new_entry.cluster, 0xFFFF)
> +
> + if cluster is None:
> + for i in range(self.boot_sector.root_dir_size()):
> + sector_data = self.read_sectors(
> + self.boot_sector.root_dir_start() + i, 1
> + )
> + offset = find_free_entry(sector_data)
> + if offset is not None:
> + new_entry.sector = self.boot_sector.root_dir_start() + i
> + new_entry.offset = offset
> + self.update_direntry(new_entry)
> + return new_entry
> + else:
> + while cluster is not None:
> + data = self.read_cluster(cluster)
> + offset = find_free_entry(data)
> + if offset is not None:
> + new_entry.sector = self.boot_sector.first_sector_of_cluster(
> + cluster
> + ) + (offset // SECTOR_SIZE)
> + new_entry.offset = offset % SECTOR_SIZE
> + self.update_direntry(new_entry)
> + return new_entry
> + cluster = self.next_cluster(cluster)
> +
> + raise Exception("No free directory entries")
> +
> + def update_direntry(self, entry: FatDirectoryEntry):
> + """
> + Write the directory entry back to the disk.
> + """
> + sector = self.read_sectors(entry.sector, 1)
> + sector = (
> + sector[: entry.offset]
> + + entry.as_bytes()
> + + sector[entry.offset + DIRENTRY_SIZE :]
> + )
> + self.write_sectors(entry.sector, sector)
> +
> + def find_direntry(self, path: str) -> FatDirectoryEntry | None:
> + """
> + Find the directory entry for the given path.
> + """
> + assert path[0] == "/", "Path must start with /"
> +
> + path = path[1:] # remove the leading /
> + parts = path.split("/")
> + directory = self.read_root_directory()
> +
> + current_entry = None
> +
> + for i, part in enumerate(parts):
> + is_last = i == len(parts) - 1
> +
> + for entry in directory:
> + if entry.whole_name() == part:
> + current_entry = entry
> + break
> + if current_entry is None:
> + return None
> +
> + if is_last:
> + return current_entry
> + else:
> + if current_entry.attributes & 0x10 == 0:
> + raise Exception(
> + f"{current_entry.whole_name()} is not a directory")
> + else:
> + directory = self.read_directory(current_entry.cluster)
> +
> + def read_file(self, entry: FatDirectoryEntry) -> bytes:
> + """
> + Read the content of the file at the given path.
> + """
> + if entry is None:
> + return None
> + if entry.attributes & 0x10 != 0:
> + raise Exception(f"{entry.whole_name()} is a directory")
> +
> + data = b""
> + cluster = entry.cluster
> + while cluster is not None and len(data) <= entry.size_bytes:
> + data += self.read_cluster(cluster)
> + cluster = self.next_cluster(cluster)
> + return data[: entry.size_bytes]
> +
> + def truncate_file(self, entry: FatDirectoryEntry, new_size: int):
> + """
> + Truncate the file at the given path to the new size.
> + """
> + if entry is None:
> + return Exception("entry is None")
> + if entry.attributes & 0x10 != 0:
> + raise Exception(f"{entry.whole_name()} is a directory")
> +
> + def clusters_from_size(size: int):
> + return (
> + size + self.boot_sector.cluster_bytes() - 1
> + ) // self.boot_sector.cluster_bytes()
> +
> + # First, allocate new FATs if we need to
> + required_clusters = clusters_from_size(new_size)
> + current_clusters = clusters_from_size(entry.size_bytes)
> +
> + affected_clusters = set()
> +
> + # Keep at least one cluster, easier to manage this way
> + if required_clusters == 0:
> + required_clusters = 1
> + if current_clusters == 0:
> + current_clusters = 1
> +
> + if required_clusters > current_clusters:
> + # Allocate new clusters
> + cluster = entry.cluster
> + to_add = required_clusters
> + for _ in range(current_clusters - 1):
> + to_add -= 1
> + cluster = self.next_cluster(cluster)
> + assert required_clusters > 0, "No new clusters to allocate"
> + assert cluster is not None, "Cluster is None"
> + assert self.next_cluster(cluster) is None, \
> + "Cluster is not the last cluster"
> +
> + # Allocate new clusters
> + for _ in range(to_add - 1):
> + new_cluster = self.next_free_cluster()
> + self.write_fat_entry(cluster, new_cluster)
> + self.write_fat_entry(new_cluster, 0xFFFF)
> + cluster = new_cluster
> +
> + elif required_clusters < current_clusters:
> + # Truncate the file
> + cluster = entry.cluster
> + for _ in range(required_clusters - 1):
> + cluster = self.next_cluster(cluster)
> + assert cluster is not None, "Cluster is None"
> +
> + next_cluster = self.next_cluster(cluster)
> + # mark last as EOF
> + self.write_fat_entry(cluster, 0xFFFF)
> + # free the rest
> + while next_cluster is not None:
> + cluster = next_cluster
> + next_cluster = self.next_cluster(next_cluster)
> + self.write_fat_entry(cluster, 0)
> +
> + self.flush_fats()
> +
> + # verify number of clusters
> + cluster = entry.cluster
> + count = 0
> + while cluster is not None:
> + count += 1
> + affected_clusters.add(cluster)
> + cluster = self.next_cluster(cluster)
> + assert (
> + count == required_clusters
> + ), f"Expected {required_clusters} clusters, got {count}"
> +
> + # update the size
> + entry.size_bytes = new_size
> + self.update_direntry(entry)
> +
> + # trigger every affected cluster
> + for cluster in affected_clusters:
> + first_sector = self.boot_sector.first_sector_of_cluster(cluster)
> + first_sector_data = self.read_sectors(first_sector, 1)
> + self.write_sectors(first_sector, first_sector_data)
> +
> + def write_file(self, entry: FatDirectoryEntry, data: bytes):
> + """
> + Write the content of the file at the given path.
> + """
> + if entry is None:
> + return Exception("entry is None")
> + if entry.attributes & 0x10 != 0:
> + raise Exception(f"{entry.whole_name()} is a directory")
> +
> + data_len = len(data)
> +
> + self.truncate_file(entry, data_len)
> +
> + cluster = entry.cluster
> + while cluster is not None:
> + data_to_write = data[: self.boot_sector.cluster_bytes()]
> + last_data = False
> + if len(data_to_write) < self.boot_sector.cluster_bytes():
> + last_data = True
> + old_data = self.read_cluster(cluster)
> + data_to_write += old_data[len(data_to_write) :]
> +
> + self.write_cluster(cluster, data_to_write)
> + data = data[self.boot_sector.cluster_bytes() :]
> + if len(data) == 0:
> + break
> + cluster = self.next_cluster(cluster)
> +
> + assert len(data) == 0, \
> + "Data was not written completely, clusters missing"
> +
> + def create_file(self, path: str):
> + """
> + Create a new file at the given path.
> + """
> + assert path[0] == "/", "Path must start with /"
> +
> + path = path[1:] # remove the leading /
> +
> + parts = path.split("/")
> +
> + directory_cluster = None
> + directory = self.read_root_directory()
> +
> + parts, filename = parts[:-1], parts[-1]
> +
> + for i, part in enumerate(parts):
> + current_entry = None
> + for entry in directory:
> + if entry.whole_name() == part:
> + current_entry = entry
> + break
> + if current_entry is None:
> + return None
> +
> + if current_entry.attributes & 0x10 == 0:
> + raise Exception(
> + f"{current_entry.whole_name()} is not a directory")
> + else:
> + directory = self.read_directory(current_entry.cluster)
> + directory_cluster = current_entry.cluster
> +
> + # add new entry to the directory
> +
> + filename, ext = filename.split(".")
> +
> + if len(ext) > 3:
> + raise Exception("Ext must be 3 characters or less")
> + if len(filename) > 8:
> + raise Exception("Name must be 8 characters or less")
> +
> + for c in filename + ext:
> +
> + if c not in ALLOWED_FILE_CHARS:
> + raise Exception("Invalid character in filename")
> +
> + return self.add_direntry(directory_cluster, filename, ext, 0)
> diff --git a/tests/qemu-iotests/testenv.py b/tests/qemu-iotests/testenv.py
> index 588f30a4f1..4053d29de4 100644
> --- a/tests/qemu-iotests/testenv.py
> +++ b/tests/qemu-iotests/testenv.py
> @@ -250,7 +250,7 @@ def __init__(self, source_dir: str, build_dir: str,
> self.qemu_img_options = os.getenv('QEMU_IMG_OPTIONS')
> self.qemu_nbd_options = os.getenv('QEMU_NBD_OPTIONS')
>
> - is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg']
> + is_generic = self.imgfmt not in ['bochs', 'cloop', 'dmg', 'vvfat']
> self.imgfmt_generic = 'true' if is_generic else 'false'
>
> self.qemu_io_options = f'--cache {self.cachemode} --aio {self.aiomode}'
> diff --git a/tests/qemu-iotests/tests/vvfat b/tests/qemu-iotests/tests/vvfat
> new file mode 100755
> index 0000000000..113d7d3270
> --- /dev/null
> +++ b/tests/qemu-iotests/tests/vvfat
> @@ -0,0 +1,440 @@
> +#!/usr/bin/env python3
> +# group: rw vvfat
> +#
> +# Test vvfat driver implementation
> +# Here, we use a simple FAT16 implementation and check the behavior of the vvfat driver.
> +#
> +# Copyright (C) 2024 Amjad Alsharafi <amjadsharafi10@gmail.com>
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 2 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program. If not, see <http://www.gnu.org/licenses/>.
> +
> +import os, shutil
> +import iotests
> +from iotests import imgfmt, QMPTestCase
> +from fat16 import MBR, Fat16, DIRENTRY_SIZE
> +
> +filesystem = os.path.join(iotests.test_dir, "filesystem")
> +
> +nbd_sock = iotests.file_path("nbd.sock", base_dir=iotests.sock_dir)
> +nbd_uri = "nbd+unix:///disk?socket=" + nbd_sock
> +
> +SECTOR_SIZE = 512
> +
> +
> +class TestVVFatDriver(QMPTestCase):
> + def setUp(self) -> None:
> + if os.path.exists(filesystem):
> + if os.path.isdir(filesystem):
> + shutil.rmtree(filesystem)
> + else:
> + print(f"Error: {filesystem} exists and is not a directory")
> + exit(1)
> + os.mkdir(filesystem)
> +
> + # Add some text files to the filesystem
> + for i in range(10):
> + with open(os.path.join(filesystem, f"file{i}.txt"), "w") as f:
> + f.write(f"Hello, world! {i}\n")
> +
> + # Add 2 large files, above the cluster size (8KB)
> + with open(os.path.join(filesystem, "large1.txt"), "wb") as f:
> + # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ...
> + for i in range(8 * 2): # two clusters
> + f.write(bytes([0x41 + i] * 1024))
> +
> + with open(os.path.join(filesystem, "large2.txt"), "wb") as f:
> + # write 'A' * 1KB, 'B' * 1KB, 'C' * 1KB, ...
> + for i in range(8 * 3): # 3 clusters
> + f.write(bytes([0x41 + i] * 1024))
> +
> + self.vm = iotests.VM()
> +
> + self.vm.add_blockdev(
> + self.vm.qmp_to_opts(
> + {
> + "driver": imgfmt,
> + "node-name": "disk",
> + "rw": "true",
> + "fat-type": "16",
> + "dir": filesystem,
> + }
> + )
> + )
> +
> + self.vm.launch()
> +
> + self.vm.qmp_log("block-dirty-bitmap-add", **{"node": "disk", "name": "bitmap0"})
> +
> + # attach nbd server
> + self.vm.qmp_log(
> + "nbd-server-start",
> + **{"addr": {"type": "unix", "data": {"path": nbd_sock}}},
> + filters=[],
> + )
> +
> + self.vm.qmp_log(
> + "nbd-server-add",
> + **{"device": "disk", "writable": True, "bitmap": "bitmap0"},
> + )
> +
> + self.qio = iotests.QemuIoInteractive("-f", "raw", nbd_uri)
> +
> + def tearDown(self) -> None:
> + self.qio.close()
> + self.vm.shutdown()
> + # print(self.vm.get_log())
> + shutil.rmtree(filesystem)
> +
> + def read_sectors(self, sector: int, num: int = 1) -> bytes:
> + """
> + Read `num` sectors starting from `sector` from the `disk`.
> + This uses `QemuIoInteractive` to read the sectors into `stdout` and then parse the output.
> + """
> + self.assertGreater(num, 0)
> + # The output contains the content of the sector in hex dump format
> + # We need to extract the content from it
> + output = self.qio.cmd(f"read -v {sector * SECTOR_SIZE} {num * SECTOR_SIZE}")
> + # Each row is 16 bytes long, and we are writing `num` sectors
> + rows = num * SECTOR_SIZE // 16
> + output_rows = output.split("\n")[:rows]
> +
> + hex_content = "".join(
> + [(row.split(": ")[1]).split(" ")[0] for row in output_rows]
> + )
> + bytes_content = bytes.fromhex(hex_content)
> +
> + self.assertEqual(len(bytes_content), num * SECTOR_SIZE)
> +
> + return bytes_content
> +
> + def write_sectors(self, sector: int, data: bytes):
> + """
> + Write `data` to the `disk` starting from `sector`.
> + This uses `QemuIoInteractive` to write the data into the disk.
> + """
> +
> + self.assertGreater(len(data), 0)
> + self.assertEqual(len(data) % SECTOR_SIZE, 0)
> +
> + temp_file = os.path.join(iotests.test_dir, "temp.bin")
> + with open(temp_file, "wb") as f:
> + f.write(data)
> +
> + self.qio.cmd(f"write -s {temp_file} {sector * SECTOR_SIZE} {len(data)}")
> +
> + os.remove(temp_file)
> +
> + def init_fat16(self):
> + mbr = MBR(self.read_sectors(0))
> + return Fat16(
> + mbr.partition_table[0]["start_lba"],
> + mbr.partition_table[0]["size"],
> + self.read_sectors,
> + self.write_sectors,
> + )
> +
> + # Tests
> +
> + def test_fat_filesystem(self):
> + """
> + Test that vvfat produce a valid FAT16 and MBR sectors
> + """
> + mbr = MBR(self.read_sectors(0))
> +
> + self.assertEqual(mbr.partition_table[0]["status"], 0x80)
> + self.assertEqual(mbr.partition_table[0]["type"], 6)
> +
> + fat16 = Fat16(
> + mbr.partition_table[0]["start_lba"],
> + mbr.partition_table[0]["size"],
> + self.read_sectors,
> + self.write_sectors,
> + )
> + self.assertEqual(fat16.boot_sector.bytes_per_sector, 512)
> + self.assertEqual(fat16.boot_sector.volume_label, "QEMU VVFAT")
> +
> + def test_read_root_directory(self):
> + """
> + Test the content of the root directory
> + """
> + fat16 = self.init_fat16()
> +
> + root_dir = fat16.read_root_directory()
> +
> + self.assertEqual(len(root_dir), 13) # 12 + 1 special file
> +
> + files = {
> + "QEMU VVF.AT": 0, # special empty file
> + "FILE0.TXT": 16,
> + "FILE1.TXT": 16,
> + "FILE2.TXT": 16,
> + "FILE3.TXT": 16,
> + "FILE4.TXT": 16,
> + "FILE5.TXT": 16,
> + "FILE6.TXT": 16,
> + "FILE7.TXT": 16,
> + "FILE8.TXT": 16,
> + "FILE9.TXT": 16,
> + "LARGE1.TXT": 0x2000 * 2,
> + "LARGE2.TXT": 0x2000 * 3,
> + }
> +
> + for entry in root_dir:
> + self.assertIn(entry.whole_name(), files)
> + self.assertEqual(entry.size_bytes, files[entry.whole_name()])
> +
> + def test_direntry_as_bytes(self):
> + """
> + Test if we can convert Direntry back to bytes, so that we can write it back to the disk safely.
> + """
> + fat16 = self.init_fat16()
> +
> + root_dir = fat16.read_root_directory()
> + first_entry_bytes = fat16.read_sectors(fat16.boot_sector.root_dir_start(), 1)
> + # The first entry won't be deleted, so we can compare it with the first entry in the root directory
> + self.assertEqual(root_dir[0].as_bytes(), first_entry_bytes[:DIRENTRY_SIZE])
> +
> + def test_read_files(self):
> + """
> + Test reading the content of the files
> + """
> + fat16 = self.init_fat16()
> +
> + for i in range(10):
> + file = fat16.find_direntry(f"/FILE{i}.TXT")
> + self.assertIsNotNone(file)
> + self.assertEqual(
> + fat16.read_file(file), f"Hello, world! {i}\n".encode("ascii")
> + )
> +
> + # test large files
> + large1 = fat16.find_direntry("/LARGE1.TXT")
> + with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
> + self.assertEqual(fat16.read_file(large1), f.read())
> +
> + large2 = fat16.find_direntry("/LARGE2.TXT")
> + self.assertIsNotNone(large2)
> + with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
> + self.assertEqual(fat16.read_file(large2), f.read())
> +
> + def test_write_file_same_content_direct(self):
> + """
> + Similar to `test_write_file_in_same_content`, but we write the file directly clusters
> + and thus we don't go through the modification of direntry.
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/FILE0.TXT")
> + self.assertIsNotNone(file)
> +
> + data = fat16.read_cluster(file.cluster)
> + fat16.write_cluster(file.cluster, data)
> +
> + with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
> + self.assertEqual(fat16.read_file(file), f.read())
> +
> + def test_write_file_in_same_content(self):
> + """
> + Test writing the same content to the file back to it
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/FILE0.TXT")
> + self.assertIsNotNone(file)
> +
> + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
> +
> + fat16.write_file(file, b"Hello, world! 0\n")
> +
> + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
> +
> + with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
> + self.assertEqual(f.read(), b"Hello, world! 0\n")
> +
> + def test_modify_content_same_clusters(self):
> + """
> + Test modifying the content of the file without changing the number of clusters
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/FILE0.TXT")
> + self.assertIsNotNone(file)
> +
> + new_content = b"Hello, world! Modified\n"
> + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
> +
> + fat16.write_file(file, new_content)
> +
> + self.assertEqual(fat16.read_file(file), new_content)
> +
> + with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_truncate_file_same_clusters_less(self):
> + """
> + Test truncating the file without changing number of clusters
> + Test decreasing the file size
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/FILE0.TXT")
> + self.assertIsNotNone(file)
> +
> + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
> +
> + fat16.truncate_file(file, 5)
> +
> + new_content = fat16.read_file(file)
> +
> + self.assertEqual(new_content, b"Hello")
> +
> + with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_truncate_file_same_clusters_more(self):
> + """
> + Test truncating the file without changing number of clusters
> + Test increase the file size
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/FILE0.TXT")
> + self.assertIsNotNone(file)
> +
> + self.assertEqual(fat16.read_file(file), b"Hello, world! 0\n")
> +
> + fat16.truncate_file(file, 20)
> +
> + new_content = fat16.read_file(file)
> +
> + # random pattern will be appended to the file, and its not always the same
> + self.assertEqual(new_content[:16], b"Hello, world! 0\n")
> + self.assertEqual(len(new_content), 20)
> +
> + with open(os.path.join(filesystem, "file0.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_write_large_file(self):
> + """
> + Test writing a large file
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/LARGE1.TXT")
> + self.assertIsNotNone(file)
> +
> + # The content of LARGE1 is A * 1KB, B * 1KB, C * 1KB, ..., P * 1KB
> + # Lets change it to be Z * 1KB, Y * 1KB, X * 1KB, ..., K * 1KB
> + # without changing the number of clusters or filesize
> + new_content = b"".join([bytes([0x5A - i] * 1024) for i in range(16)])
> +
> + fat16.write_file(file, new_content)
> +
> + with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_truncate_file_change_clusters_less(self):
> + """
> + Test truncating a file by reducing the number of clusters
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/LARGE1.TXT")
> + self.assertIsNotNone(file)
> +
> + fat16.truncate_file(file, 1)
> +
> + self.assertEqual(fat16.read_file(file), b"A")
> +
> + with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
> + self.assertEqual(f.read(), b"A")
> +
> + def test_write_file_change_clusters_less(self):
> + """
> + Test truncating a file by reducing the number of clusters
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/LARGE2.TXT")
> + self.assertIsNotNone(file)
> +
> + new_content = b"Hello, world! This was a large file\n"
> + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024
This sets and then immediately overwrites new_content. What was intended
here?
> +
> + fat16.write_file(file, new_content)
> +
> + self.assertEqual(fat16.read_file(file), new_content)
> +
> + with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_write_file_change_clusters_more(self):
> + """
> + Test truncating a file by increasing the number of clusters
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/LARGE2.TXT")
> + self.assertIsNotNone(file)
> +
> + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024
> +
> + fat16.write_file(file, new_content)
> +
> + with open(os.path.join(filesystem, "large2.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_write_file_change_clusters_more_non_last_file(self):
> + """
> + Test truncating a file by increasing the number of clusters
> + This is a special variant of the above test, where we write to
> + a file so that when allocating new clusters, it won't have contiguous clusters
> + """
> + fat16 = self.init_fat16()
> +
> + file = fat16.find_direntry("/LARGE1.TXT")
> + self.assertIsNotNone(file)
> +
> + new_content = b"X" * 8 * 1024 + b"Y" * 8 * 1024 + b"Z" * 8 * 1024
> +
> + fat16.write_file(file, new_content)
> +
> + with open(os.path.join(filesystem, "large1.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + def test_create_file(self):
> + """
> + Test creating a new file
> + """
> + fat16 = self.init_fat16()
> +
> + new_file = fat16.create_file("/NEWFILE.TXT")
> +
> + self.assertIsNotNone(new_file)
> + self.assertEqual(new_file.size_bytes, 0)
> +
> + new_content = b"Hello, world! New file\n"
> + fat16.write_file(new_file, new_content)
> +
> + self.assertEqual(fat16.read_file(new_file), new_content)
> +
> + with open(os.path.join(filesystem, "newfile.txt"), "rb") as f:
> + self.assertEqual(f.read(), new_content)
> +
> + # TODO: support deleting files
> +
> +
> +if __name__ == "__main__":
> + # This is a specific test for vvfat driver
> + iotests.main(supported_fmts=["vvfat"], supported_protocols=["file"])
> diff --git a/tests/qemu-iotests/tests/vvfat.out b/tests/qemu-iotests/tests/vvfat.out
> new file mode 100755
> index 0000000000..96961ed0b5
> --- /dev/null
> +++ b/tests/qemu-iotests/tests/vvfat.out
> @@ -0,0 +1,5 @@
> +...............
> +----------------------------------------------------------------------
> +Ran 15 tests
> +
> +OK
With the updated test, I can catch the problems that are fixed by
patches 1 and 2, but it still doesn't need patch 3 to pass.
Kevin
next prev parent reply other threads:[~2024-06-10 12:02 UTC|newest]
Thread overview: 13+ messages / expand[flat|nested] mbox.gz Atom feed top
2024-06-05 0:58 [PATCH v4 0/4] vvfat: Fix write bugs for large files and add iotests Amjad Alsharafi
2024-06-05 0:58 ` [PATCH v4 1/4] vvfat: Fix bug in writing to middle of file Amjad Alsharafi
2024-06-05 0:58 ` [PATCH v4 2/4] vvfat: Fix usage of `info.file.offset` Amjad Alsharafi
2024-06-10 16:49 ` Kevin Wolf
2024-06-11 12:31 ` Amjad Alsharafi
2024-06-11 14:30 ` Kevin Wolf
2024-06-11 16:22 ` Amjad Alsharafi
2024-06-11 18:11 ` Kevin Wolf
2024-06-05 0:58 ` [PATCH v4 3/4] vvfat: Fix reading files with non-continuous clusters Amjad Alsharafi
2024-06-05 0:58 ` [PATCH v4 4/4] iotests: Add `vvfat` tests Amjad Alsharafi
2024-06-10 12:01 ` Kevin Wolf [this message]
2024-06-10 14:11 ` Amjad Alsharafi
2024-06-10 16:50 ` Kevin Wolf
Reply instructions:
You may reply publicly to this message via plain-text email
using any one of the following methods:
* Save the following mbox file, import it into your mail client,
and reply-to-all from there: mbox
Avoid top-posting and favor interleaved quoting:
https://en.wikipedia.org/wiki/Posting_style#Interleaved_style
* Reply using the --to, --cc, and --in-reply-to
switches of git-send-email(1):
git send-email \
--in-reply-to=ZmbrFAOjpuFB_pQG@redhat.com \
--to=kwolf@redhat.com \
--cc=amjadsharafi10@gmail.com \
--cc=hreitz@redhat.com \
--cc=qemu-block@nongnu.org \
--cc=qemu-devel@nongnu.org \
/path/to/YOUR_REPLY
https://kernel.org/pub/software/scm/git/docs/git-send-email.html
* If your mail client supports setting the In-Reply-To header
via mailto: links, try the mailto: link
Be sure your reply has a Subject: header at the top and a blank line
before the message body.
This is an external index of several public inboxes,
see mirroring instructions on how to clone and mirror
all data and code used by this external index.