From mboxrd@z Thu Jan 1 00:00:00 1970 Received: from mgamail.intel.com (mgamail.intel.com [198.175.65.12]) (using TLSv1.2 with cipher ECDHE-RSA-AES256-GCM-SHA384 (256/256 bits)) (No client certificate requested) by smtp.subspace.kernel.org (Postfix) with ESMTPS id 1936C3DCD9A; Tue, 28 Apr 2026 07:22:19 +0000 (UTC) Authentication-Results: smtp.subspace.kernel.org; arc=none smtp.client-ip=198.175.65.12 ARC-Seal:i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1777360942; cv=none; b=WTg37isdfUAzHpHsfZin9qVv9+2tck1UPLlF8t2oinrXoTu1ed82RwIj/4c7Uvdam6gctLpN7jMunNoBMNolvlYSWQzVDMrr50rtj9Cp3/5MTTki25tr17n4Geo4mX9oSyw3hYsbZTYZ8rMRewefVjpCefBsLqAqXUcmwp/d004= ARC-Message-Signature:i=1; a=rsa-sha256; d=subspace.kernel.org; s=arc-20240116; t=1777360942; c=relaxed/simple; bh=6h0oCMOIhmIrMFLxb5B6LJlLvFUh0EdXQnldLInEqhg=; h=From:To:Cc:Subject:Date:Message-ID:In-Reply-To:References: MIME-Version; b=tPZ2bT8/2gdxmGSS1iMyUv0giV1bovkNz6FpFIuDIm55nhgfr3eGPir74uDIrRGaPdP5Zz7NZRjPDMp0cD5lVyzQYeQGZsJJIQw4KUhrE5VynHOj3HpGkz85PgDTqAosbIw1eOxrwdRJCdrhilF7WlmnJRlN0+Aeak0M8NvgQos= ARC-Authentication-Results:i=1; smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=linux.intel.com; spf=pass smtp.mailfrom=linux.intel.com; dkim=pass (2048-bit key) header.d=intel.com header.i=@intel.com header.b=S2hFKKTe; arc=none smtp.client-ip=198.175.65.12 Authentication-Results: smtp.subspace.kernel.org; dmarc=pass (p=none dis=none) header.from=linux.intel.com Authentication-Results: smtp.subspace.kernel.org; spf=pass smtp.mailfrom=linux.intel.com Authentication-Results: smtp.subspace.kernel.org; dkim=pass (2048-bit key) header.d=intel.com header.i=@intel.com header.b="S2hFKKTe" DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/simple; d=intel.com; i=@intel.com; q=dns/txt; s=Intel; t=1777360939; x=1808896939; h=from:to:cc:subject:date:message-id:in-reply-to: references:mime-version:content-transfer-encoding; bh=6h0oCMOIhmIrMFLxb5B6LJlLvFUh0EdXQnldLInEqhg=; b=S2hFKKTeV1hnuiG16arA+k0mej+lZmbWPgQ5ZtGLjZ809eIpYFiD6GJb o7bGBNngBL+tzYC75AxJryMnQaVAY5lUWcqVHjM3/R6H0ap0G1gwSpS6q wr54Yh4LBEnfpgra/SFGDFtlaMZLoRxF5S3lkvjNmoPyFkNA9EPIpyEXh E5qUt+RduQSE4BvvUL6jy/mrsdeYvIcKmPZROFTmEEU87aE0l1u8JZ7x9 ELzNVMaFysPruBuP0DNGKeN4XZQAeOU/od7dmJvu7JZKekq98eYv+Y5Sz Ka43O8jmPPey/VtyR12Uf4a0XYu+dNCkAAYvNWjIIckd9DMBgfph0ZjXR g==; X-CSE-ConnectionGUID: xgKJQ6WZTE6rq8JKFrPpvA== X-CSE-MsgGUID: ZzPpBrJqSIOjt3sZiLQFkg== X-IronPort-AV: E=McAfee;i="6800,10657,11769"; a="89720842" X-IronPort-AV: E=Sophos;i="6.23,203,1770624000"; d="scan'208";a="89720842" Received: from orviesa004.jf.intel.com ([10.64.159.144]) by orvoesa104.jf.intel.com with ESMTP/TLS/ECDHE-RSA-AES256-GCM-SHA384; 28 Apr 2026 00:22:18 -0700 X-CSE-ConnectionGUID: zXkr0EBLSOygacOoNJuWYA== X-CSE-MsgGUID: JYxobhsTQkKZYBVAz6KB8w== X-ExtLoop1: 1 X-IronPort-AV: E=Sophos;i="6.23,203,1770624000"; d="scan'208";a="238206460" Received: from black.igk.intel.com ([10.91.253.5]) by orviesa004.jf.intel.com with ESMTP; 28 Apr 2026 00:22:15 -0700 Received: by black.igk.intel.com (Postfix, from userid 1001) id 1AA1EA5; Tue, 28 Apr 2026 09:22:10 +0200 (CEST) From: Mika Westerberg To: linux-usb@vger.kernel.org Cc: Yehezkel Bernat , Lukas Wunner , Andreas Noever , Alan Borzeszkowski , Andrew Lunn , "David S . Miller" , Eric Dumazet , Jakub Kicinski , Paolo Abeni , netdev@vger.kernel.org, Mika Westerberg Subject: [PATCH 9/9] thunderbolt: Add support for USB4STREAM Date: Tue, 28 Apr 2026 09:22:09 +0200 Message-ID: <20260428072209.3084930-10-mika.westerberg@linux.intel.com> X-Mailer: git-send-email 2.50.1 In-Reply-To: <20260428072209.3084930-1-mika.westerberg@linux.intel.com> References: <20260428072209.3084930-1-mika.westerberg@linux.intel.com> Precedence: bulk X-Mailing-List: netdev@vger.kernel.org List-Id: List-Subscribe: List-Unsubscribe: MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Introduce USB4STREAM protocol and Linux implementation. This allows two (or more) hosts to transfer data directly over Thunderbolt/USB4 cable through a character device without need to go through the network stack. Any application that supports read(2) and write(2) in some form should be able to use the device without changes. The data is sent out to the other side over a tunnel inside Thunderbolt/USB4 fabric. The character device is called /dev/tbstreamX where X is the minor number starting from 0. All stream devices need to be configured first. This is done through ConfigFS interface. There can be multiple streams at the same time (this depends on number of DMA rings and available HopIDs) and a single stream supports traffic in both directions. For example there could be an application that uses one stream as control channel and another one as bi-directional data channel. A real use-case for this is to take a backup as a part of recovery initramfs tooling (no need to setup networking or have ssh or similar tooling as part of the initramfs). Say we want to backup the disk of host1 to host2. First Thunderbolt/USB4 cable is connected between the hosts (there can be devices in the middle too) then the receiving side configures the stream: host2 # mkdir /sys/kernel/config/thunderbolt/stream/0-1.0 host2 # mkdir /sys/kernel/config/thunderbolt/stream/0-1.0/backup host2 # echo -1 > /sys/kernel/config/thunderbolt/stream/0-1.0/backup/in_hopid host2 # echo -1 > /sys/kernel/config/thunderbolt/stream/0-1.0/backup/out_hopid We use automatic HopID allocation (writing -1 to HopIDs) for simplicity. >From this point forward the /dev/tbstream0 can be used pretty much as regular file: host2 # dd if=/dev/tbstream0 of=/tmp/host1.nvme0n1.backup-$(date +%F) bs=256k The host that is being backed up then configures the stream accordingly: host1 # mkdir /sys/kernel/config/thunderbolt/stream/0-503.0 host1 # mkdir /sys/kernel/config/thunderbolt/stream/0-503.0/backup Here we take advantage of the fact that host2 also announces the active streams through XDomain properties so the name "backup" gives us the HopIDs. It is also possible to configure them manually in the same way we did for host2. Then it is just a matter of copying the data over: host1 # dd if=/dev/nvme0n1 of=/dev/tbstream0 bs=256k Similarly it is possible to transfer parts of the filesystem. For example copy contents of mydir over to the host2: host2 # gunzip < /dev/tbstream0 | tar xf - host1 # tar cf - mydir | gzip > /dev/tbstream0 Other end of the spectrum use-case is "borrowing" laptop (host1) camera to desktop (host2): host2 # gst-launch-1.0 filesrc location=/dev/tbstream0 ! jpegdec ! videoconvert ! \ autovideosink host1 # gst-launch-1.0 v4l2src device=/dev/video0 ! video/x-raw,width=1920,height=1080 ! \ jpegenc quality=90 ! filesink location=/dev/tbstream0 Once the streams are no longer needed they can be removed: host1 # cd /sys/kernel/config/thunderbolt/stream/ host1 # rmdir -p 0-503.0/backup host2 # cd /sys/kernel/config/thunderbolt/stream host2 # rmdir -p 0-1.0/backup Co-developed-by: Alan Borzeszkowski Signed-off-by: Alan Borzeszkowski Signed-off-by: Mika Westerberg --- .../ABI/testing/configfs-thunderbolt_stream | 77 + drivers/thunderbolt/Kconfig | 11 + drivers/thunderbolt/Makefile | 3 + drivers/thunderbolt/stream.c | 1693 +++++++++++++++++ 4 files changed, 1784 insertions(+) create mode 100644 Documentation/ABI/testing/configfs-thunderbolt_stream create mode 100644 drivers/thunderbolt/stream.c diff --git a/Documentation/ABI/testing/configfs-thunderbolt_stream b/Documentation/ABI/testing/configfs-thunderbolt_stream new file mode 100644 index 000000000000..e403fda92765 --- /dev/null +++ b/Documentation/ABI/testing/configfs-thunderbolt_stream @@ -0,0 +1,77 @@ +What: /sys/kernel/config/thunderbolt/stream/. +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + Configuration group for a stream Thunderbolt/USB4 + service. It is possible to create groups even if there + is no connection yet to the other host. Once a + connection established and there is stream service on + the remote side that matches, this configuration is + applied to it. + +What: /sys/kernel/config/thunderbolt/stream/./$name +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + Creates new stream with $name and fills it with the + default values. If there is an advertised remote stream + with the same name, uses its values as the default. + +What: /sys/kernel/config/thunderbolt/stream/./$name/index +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + This matches the X in /dev/tbstreamX and allows userspace + to map the configfs directory to the corresponding character + device. + +What: /sys/kernel/config/thunderbolt/stream/./$name/in_hopid +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + In HopID used with the read path of the tunnel. Available HopIDs + for tunneling start from 8. You can pass also -1 for automatic + allocation. The allocated value can be read here. Writing 0 will + de-allocate if the stream is not in use. + + To figure out the maximum HopID you can run tbget from + tbtools [1] for the lane adapter. For example below we check + for lane adapter number 1 (first USB4 port): + + # tbget -r 0 -a 1 -D ADP_CS_5.Max\ Input\ HopID + 19 + + This allows to use anything between 8 and 19 inclusive. + + [1] https://github.com/intel/tbtools + +What: /sys/kernel/config/thunderbolt/stream/./$name/out_hopid +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + Out HopID used with the write path of the tunnel. Available HopIDs + for tunneling start from 8. You can pass also -1 for automatic + allocation. The allocated value can be read here. Writing 0 will + de-allocate if the stream is not in use. See @in_hopid + for how to figure out the maximum HopID. + +What: /sys/kernel/config/thunderbolt/stream/./$name/ring_size +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + Size of the TX/RX rings. Can be adjusted between 32 and + 4096. The default is 256. + +What: /sys/kernel/config/thunderbolt/stream/./$name/throttling +Date: Sep 2026 +KernelVersion: v7.2 +Contact: Mika Westerberg +Description: + Interrupt throttling rate in ns. Lower values can give + better latency. The default is 8192 ns. diff --git a/drivers/thunderbolt/Kconfig b/drivers/thunderbolt/Kconfig index 9b4aaa456e32..294b3227a545 100644 --- a/drivers/thunderbolt/Kconfig +++ b/drivers/thunderbolt/Kconfig @@ -64,4 +64,15 @@ config USB4_DMA_TEST To compile this driver a module, choose M here. The module will be called thunderbolt_dma_test. +config USB4_STREAM + tristate "Stream data over Thunderbolt/USB4 cable" + depends on USB4_CONFIGFS + help + This adds support for USB4STREAM protocol that allows two + hosts to stream data directly over Thunderbolt/USB4 cable + through /dev/tbstreamX devices. + + To compile this driver a module, choose M here. The module will be + called thunderbolt_stream. + endif # USB4 diff --git a/drivers/thunderbolt/Makefile b/drivers/thunderbolt/Makefile index d5b367dfda1e..c792b8084ffa 100644 --- a/drivers/thunderbolt/Makefile +++ b/drivers/thunderbolt/Makefile @@ -13,3 +13,6 @@ CFLAGS_test.o += $(DISABLE_STRUCTLEAK_PLUGIN) thunderbolt_dma_test-${CONFIG_USB4_DMA_TEST} += dma_test.o obj-$(CONFIG_USB4_DMA_TEST) += thunderbolt_dma_test.o + +thunderbolt_stream-${CONFIG_USB4_STREAM} += stream.o +obj-$(CONFIG_USB4_STREAM) += thunderbolt_stream.o diff --git a/drivers/thunderbolt/stream.c b/drivers/thunderbolt/stream.c new file mode 100644 index 000000000000..684ab4e080c2 --- /dev/null +++ b/drivers/thunderbolt/stream.c @@ -0,0 +1,1693 @@ +// SPDX-License-Identifier: GPL-2.0 +/* + * Stream data over Thunderbolt/USB4 cable + * + * Copyright (C) 2026, Intel Corporation + * Authors: Alan Borzeszkowski + * Mika Westerberg + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/* + * USB4STREAM - Stream data directly over Thunderbolt/USB4 cable + * + * HopIDs are configured by the user. In Linux this is done through + * ConfigFS. Once that is done paths are be established the first time + * the stream is opened. Typically the read side is opened first to make + * sure all the data will be received. + * + * End-to-end flow control is mandatory on both sides. + * + * Data is sent to the other side as tunneled DATA packets. All the data + * is owned by the user and passed as-is from the writer to the reader. + * + * Once the stream device is closed, a CLOSE packet is sent to the peer + * so it can take the necessary action. On Linux this typically results + * in EOF being returned to the reader. + * + * Tunneled packet types: + * + * +-------+---------+------------------+ + * | PDF | Type | Payload size | + * +-------+---------+------------------+ + * | 2 | DATA | up to 4 KiB | + * | 3 | CLOSE | up to 256 bytes | + * +-------+---------+------------------+ + * + * Each stream can optionally publish configuration values under its own + * XDomain property directory. The name of the directory is the name of + * the stream in question and the UUID is up to the stream. For example + * if the stream exposes video output then the directory name could be + * "video". + * + * Below values are reserved and can be used by the stream: + * + * +----------+-----------+-------------------------+ + * | Key | Type | Contents | + * +----------+-----------+-------------------------+ + * | inhopid | IMMEDIATE | Configured input HopID | + * | outhopid | IMMEDIATE | Configured output HopID | + * +----------+-----------+-------------------------+ + * + * It is allowed to add more stream specific properties as well if the + * above are not enough. + */ + +#define TBSTREAM_DEV_MINORS (MINORMASK + 1) +#define TBSTREAM_DEV_RING_SIZE 256 +#define TBSTREAM_DEV_MIN_RING_SIZE 32 +#define TBSTREAM_DEV_MAX_RING_SIZE 4096 +#define TBSTREAM_DEV_THROTTLING 8192 +#define TBSTREAM_DEV_MAX_THROTTLING 16776960 + +/** + * enum tbstream_frame_pdf - PDF numbers for tunneled frames + * @TBSTREAM_FRAME_START: PDF of the start of the frame + * @TBSTREAM_DATA: PDF of the DATA frame + * @TBSTREAM_CLOSE: PDF of the CLOSE frame + */ +enum tbstream_frame_pdf { + TBSTREAM_FRAME_START = 1, + TBSTREAM_DATA, + TBSTREAM_CLOSE, +}; + +/** + * struct tbstream_frame - Frame submitted to/from the rings + * @sdev: Pointer to the stream device + * @page: Page holding the packet + * @offset: Offset inside @page if partial read is done + * @completed: %true if the RX frame is completed + * @frame: Underlying frame structure + */ +struct tbstream_frame { + struct tbstream_dev *sdev; + struct page *page; + unsigned int offset; + bool completed; + struct ring_frame frame; +}; + +/** + * struct tbstream_ring - Stream RX/TX ring structure + * @ring: Pointer to the API ring + * @prod: Current value of producer + * @cons: Current value of consumer + * @frames: Holds the ring frames + */ +struct tbstream_ring { + struct tb_ring *ring; + unsigned long prod; + unsigned long cons; + struct tbstream_frame *frames; +}; + +/** + * struct tbstream_dev - Stream character device + * @group: ConfigFS group for this device + * @stream: Pointer to the stream if it is attached (%NULL otherwise) + * @cdev: Character device used for tunneling + * @dev: Stream device + * @index: Unique identifier for the character device + * @in_hopid: In HopID + * @out_hopid: Out HopID + * @ring_size: Size of the rings + * @throttling: Interrupt throttling rate in ns + * @users: Number of times @cdev has been opened + * @closed: CLOSE packet was received + * @wait: Waitqueue for open, read and write + * @lock: Lock protecting this structure + * @tx_ring: Transmit ring + * @rx_ring: Receive ring + * @list: Stream devices are linked through this + */ +struct tbstream_dev { + struct config_group group; + struct tbstream *stream; + struct cdev cdev; + struct device dev; + int index; + int in_hopid; + int out_hopid; + unsigned int ring_size; + unsigned int throttling; + int users; + bool closed; + wait_queue_head_t wait; + struct mutex lock; + struct tbstream_ring tx_ring; + struct tbstream_ring rx_ring; + struct list_head list; +}; + +/** + * struct tbstream_group - Config group for stream + * @group: ConfigFS group for @stream + * @stream: Stream the ConfigFS group is attached to. %NULL if there is + * no stream attached. + * @lock: Lock protecting this structure + * @dev_list: List of stream devices + * + * This is the ConfigFS directory for one connection to another host. + * There can be several &struct stream_dev linked through @dev_list of + * this structure. + */ +struct tbstream_group { + struct config_group group; + struct tbstream *stream; + struct mutex lock; + struct list_head dev_list; +}; + +/** + * struct tbstream - Stream service private data + * @kref: Reference count + * @svc: Pointer to the service device + * @list: Streams are linked through this in @stream_list + * + * This represents the actual physical connection between two domains. + */ +struct tbstream { + struct kref kref; + struct tb_service *svc; + struct list_head list; +}; + +/* Major and minor numbers of the stream devices (/dev/tbstreamX) */ +static dev_t tbstream_devt; +static DEFINE_IDA(tbstream_minors); + +/* Protects tbstream_list */ +static DEFINE_MUTEX(tbstream_lock); +static LIST_HEAD(tbstream_list); + +/* Stream property directory UUID: 3a1cb984-c4d9-4469-a277-ce2fdfd11f0d */ +static const uuid_t tbstream_dir_uuid = + UUID_INIT(0x3a1cb984, 0xc4d9, 0x4469, + 0xa2, 0x77, 0xce, 0x2f, 0xdf, 0xd1, 0x1f, 0x0d); + +static struct tb_property_dir *tbstream_dir; + +static const struct class tbstream_class = { + .name = "thunderbolt_stream", +}; + +static void tbstream_release(struct kref *kref) +{ + struct tbstream *stream = container_of(kref, typeof(*stream), kref); + + tb_service_put(stream->svc); + kfree(stream); +} + +static void tbstream_put(struct tbstream *stream) +{ + if (stream) + kref_put(&stream->kref, tbstream_release); +} + +static struct tbstream *tbstream_get(struct tbstream *stream) +{ + if (stream) + kref_get(&stream->kref); + return stream; +} + +static inline bool tbstream_valid(const struct tbstream *stream) +{ + if (!stream) + return false; + return !tb_service_parent(stream->svc)->is_unplugged; +} + +static void tbstream_ring_free(struct tbstream_ring *ring) +{ + struct device *dma_dev = tb_ring_dma_device(ring->ring); + enum dma_data_direction dir; + int i; + + if (ring->ring->is_tx) + dir = DMA_TO_DEVICE; + else + dir = DMA_FROM_DEVICE; + + for (i = 0; i < tb_ring_size(ring->ring); i++) { + struct tbstream_frame *sf = &ring->frames[i]; + + if (sf->frame.buffer_phy) + dma_unmap_page(dma_dev, sf->frame.buffer_phy, + tb_ring_frame_size(&sf->frame), dir); + sf->frame.buffer_phy = 0; + if (sf->page) + __free_page(sf->page); + sf->page = NULL; + } + + ring->prod = 0; + ring->cons = 0; + kfree(ring->frames); +} + +static inline bool tbstream_ring_available(const struct tbstream_ring *ring) +{ + return ring->prod > ring->cons; +} + +static inline struct tbstream_dev *tbstream_dev_get(struct tbstream_dev *sdev) +{ + get_device(&sdev->dev); + return sdev; +} + +static inline void tbstream_dev_put(struct tbstream_dev *sdev) +{ + put_device(&sdev->dev); +} + +static inline struct tbstream_dev *to_tbstream_dev(struct device *dev) +{ + return container_of(dev, struct tbstream_dev, dev); +} + +static inline struct tb_xdomain *tbstream_dev_xdomain(struct tbstream_dev *sdev) +{ + if (sdev->stream) + return tb_service_parent(sdev->stream->svc); + return NULL; +} + +static inline int tbstream_dev_valid(const struct tbstream_dev *sdev) +{ + const struct tbstream *stream = sdev->stream; + + if (!tbstream_valid(stream)) + return -ENXIO; + if (sdev->in_hopid <= 0 || sdev->out_hopid <= 0) + return -EINVAL; + return 0; +} + +static inline bool tbstream_dev_closed(const struct tbstream_dev *sdev) +{ + return sdev->closed; +} + +static void tbstream_dev_release(struct device *dev) +{ + struct tbstream_dev *sdev = to_tbstream_dev(dev); + + if (sdev->stream) { + struct tb_xdomain *xd = tbstream_dev_xdomain(sdev); + + if (sdev->out_hopid > 0) + tb_xdomain_release_out_hopid(xd, sdev->out_hopid); + if (sdev->in_hopid > 0) + tb_xdomain_release_in_hopid(xd, sdev->in_hopid); + + tbstream_put(sdev->stream); + } + ida_free(&tbstream_minors, sdev->index); + kfree(sdev); +} + +static void +tbstream_dev_rx_callback(struct tb_ring *ring, struct ring_frame *frame, + bool canceled) +{ + struct tbstream_frame *sf = container_of(frame, typeof(*sf), frame); + struct tbstream_dev *sdev = sf->sdev; + + if (canceled) + return; + + sf->completed = true; + sdev->rx_ring.prod++; + + if (sf->frame.flags & RING_DESC_CRC_ERROR) + dev_warn(&sdev->dev, "RX CRC error\n"); + else if (sf->frame.flags & RING_DESC_BUFFER_OVERRUN) + dev_warn(&sdev->dev, "RX buffer overrun\n"); + else + wake_up_interruptible_poll(&sdev->wait, EPOLLIN | EPOLLRDNORM); +} + +static struct tbstream_frame * +tbstream_dev_completed_rx(struct tbstream_dev *sdev) +{ + struct device *dma_dev = tb_ring_dma_device(sdev->rx_ring.ring); + struct tbstream_frame *sf; + int index; + + index = sdev->rx_ring.cons % tb_ring_size(sdev->rx_ring.ring); + sf = &sdev->rx_ring.frames[index]; + if (!sf->completed) + return NULL; + + dma_sync_single_for_cpu(dma_dev, sf->frame.buffer_phy, + tb_ring_frame_size(&sf->frame), + DMA_FROM_DEVICE); + return sf; +} + +static int tbstream_dev_consume_rx(struct tbstream_dev *sdev) +{ + struct device *dma_dev = tb_ring_dma_device(sdev->rx_ring.ring); + struct tbstream_frame *sf; + int index; + + index = sdev->rx_ring.cons % tb_ring_size(sdev->rx_ring.ring); + sdev->rx_ring.cons++; + + sf = &sdev->rx_ring.frames[index]; + sf->completed = false; + sf->offset = 0; + sf->frame.size = 0; + + dma_sync_single_for_device(dma_dev, sf->frame.buffer_phy, + tb_ring_frame_size(&sf->frame), + DMA_FROM_DEVICE); + + return tb_ring_rx(sdev->rx_ring.ring, &sf->frame); +} + +static int tbstream_dev_alloc_rx_buffers(struct tbstream_dev *sdev) +{ + size_t ring_size = tb_ring_size(sdev->rx_ring.ring); + int i; + + sdev->rx_ring.frames = kcalloc(ring_size, sizeof(struct tbstream_frame), + GFP_KERNEL); + if (!sdev->rx_ring.frames) + return -ENOMEM; + + for (i = 0; i < ring_size; i++) { + struct device *dma_dev = tb_ring_dma_device(sdev->rx_ring.ring); + struct tbstream_frame *sf = &sdev->rx_ring.frames[i]; + dma_addr_t dma_addr; + + sf->page = alloc_page(GFP_KERNEL); + if (!sf->page) + return -ENOMEM; + + dma_addr = dma_map_page(dma_dev, sf->page, 0, TB_MAX_FRAME_SIZE, + DMA_FROM_DEVICE); + if (dma_mapping_error(dma_dev, dma_addr)) { + __free_page(sf->page); + sf->page = NULL; + return -ENOMEM; + } + + sf->sdev = sdev; + sf->frame.callback = tbstream_dev_rx_callback; + sf->frame.buffer_phy = dma_addr; + + tb_ring_rx(sdev->rx_ring.ring, &sf->frame); + } + + sdev->rx_ring.cons = 0; + sdev->rx_ring.prod = 0; + return 0; +} + +static void +tbstream_dev_tx_callback(struct tb_ring *ring, struct ring_frame *frame, + bool canceled) +{ + struct tbstream_frame *sf = container_of(frame, typeof(*sf), frame); + struct tbstream_dev *sdev = sf->sdev; + + if (canceled) + return; + + sdev->tx_ring.prod++; + if (sf->frame.eof == TBSTREAM_DATA) + wake_up_interruptible_poll(&sdev->wait, EPOLLOUT | EPOLLWRNORM); +} + +static int tbstream_dev_alloc_tx_buffers(struct tbstream_dev *sdev) +{ + struct device *dma_dev = tb_ring_dma_device(sdev->tx_ring.ring); + size_t ring_size = tb_ring_size(sdev->tx_ring.ring); + int i; + + sdev->tx_ring.frames = kcalloc(ring_size, sizeof(struct tbstream_frame), + GFP_KERNEL); + if (!sdev->tx_ring.frames) + return -ENOMEM; + + for (i = 0; i < ring_size; i++) { + struct tbstream_frame *sf = &sdev->tx_ring.frames[i]; + dma_addr_t dma_addr; + + sf->page = alloc_page(GFP_KERNEL); + if (!sf->page) + return -ENOMEM; + + dma_addr = dma_map_page(dma_dev, sf->page, 0, TB_MAX_FRAME_SIZE, + DMA_TO_DEVICE); + if (dma_mapping_error(dma_dev, dma_addr)) { + __free_page(sf->page); + sf->page = NULL; + return -ENOMEM; + } + + sf->sdev = sdev; + sf->frame.callback = tbstream_dev_tx_callback; + sf->frame.buffer_phy = dma_addr; + sf->frame.sof = TBSTREAM_FRAME_START; + } + + sdev->tx_ring.cons = 0; + sdev->tx_ring.prod = ring_size - 1; + return 0; +} + +static struct tbstream_frame * +tbstream_dev_alloc_tx(struct tbstream_dev *sdev, enum tbstream_frame_pdf pdf, + struct iov_iter *from, size_t size) +{ + struct device *dma_dev = tb_ring_dma_device(sdev->tx_ring.ring); + struct tbstream_frame *sf; + int index; + + if (!tbstream_ring_available(&sdev->tx_ring)) + return ERR_PTR(-ENOBUFS); + + index = sdev->tx_ring.cons % tb_ring_size(sdev->tx_ring.ring); + sdev->tx_ring.cons++; + + sf = &sdev->tx_ring.frames[index]; + sf->frame.size = size < TB_MAX_FRAME_SIZE ? size : 0; + sf->frame.eof = pdf; + + dma_sync_single_for_cpu(dma_dev, sf->frame.buffer_phy, size, + DMA_TO_DEVICE); + if (pdf == TBSTREAM_DATA) { + if (copy_page_from_iter(sf->page, 0, size, from) != size) + return ERR_PTR(-EFAULT); + } else { + memset(page_address(sf->page), 0, size); + } + dma_sync_single_for_device(dma_dev, sf->frame.buffer_phy, size, + DMA_TO_DEVICE); + return sf; +} + +static int +tbstream_dev_send_data(struct tbstream_dev *sdev, struct iov_iter *from, + size_t size) +{ + struct tbstream_frame *sf; + + sf = tbstream_dev_alloc_tx(sdev, TBSTREAM_DATA, from, size); + if (IS_ERR(sf)) + return PTR_ERR(sf); + return tb_ring_tx(sdev->tx_ring.ring, &sf->frame); +} + +static int tbstream_dev_send_close(struct tbstream_dev *sdev) +{ + struct tbstream_frame *sf; + + sf = tbstream_dev_alloc_tx(sdev, TBSTREAM_CLOSE, NULL, SZ_256); + if (IS_ERR(sf)) + return PTR_ERR(sf); + return tb_ring_tx(sdev->tx_ring.ring, &sf->frame); +} + +static int tbstream_dev_start(struct tbstream_dev *sdev) +{ + struct tb_xdomain *xd = tbstream_dev_xdomain(sdev); + u16 sof_mask, eof_mask; + struct tb_ring *ring; + int ret, e2e_tx_hop; + + ring = tb_ring_alloc_tx(xd->tb->nhi, -1, sdev->ring_size, + RING_FLAG_FRAME | RING_FLAG_E2E); + if (!ring) + return -ENOMEM; + sdev->tx_ring.ring = ring; + + ret = tbstream_dev_alloc_tx_buffers(sdev); + if (ret) + goto err_free_tx; + + e2e_tx_hop = ring->hop; + sof_mask = BIT(TBSTREAM_FRAME_START); + eof_mask = BIT(TBSTREAM_DATA) | BIT(TBSTREAM_CLOSE); + + ring = tb_ring_alloc_rx(xd->tb->nhi, -1, sdev->ring_size, + RING_FLAG_FRAME | RING_FLAG_E2E, e2e_tx_hop, + sof_mask, eof_mask, NULL, NULL); + if (!ring) { + ret = -ENOMEM; + goto err_free_tx_buffers; + } + sdev->rx_ring.ring = ring; + + ret = tb_xdomain_enable_paths(xd, sdev->out_hopid, + sdev->tx_ring.ring->hop, + sdev->in_hopid, + sdev->rx_ring.ring->hop); + if (ret) + goto err_free_rx; + + tb_ring_throttling(sdev->tx_ring.ring, sdev->throttling); + tb_ring_throttling(sdev->rx_ring.ring, sdev->throttling); + + tb_ring_start(sdev->tx_ring.ring); + tb_ring_start(sdev->rx_ring.ring); + + ret = tbstream_dev_alloc_rx_buffers(sdev); + if (ret) + goto err_stop; + return 0; + +err_stop: + tb_ring_stop(sdev->rx_ring.ring); + tb_ring_stop(sdev->tx_ring.ring); +err_free_rx: + tb_ring_free(sdev->rx_ring.ring); +err_free_tx_buffers: + tbstream_ring_free(&sdev->tx_ring); +err_free_tx: + tb_ring_free(sdev->tx_ring.ring); + + return ret; +} + +static void tbstream_dev_stop(struct tbstream_dev *sdev) +{ + struct tb_xdomain *xd; + + /* Wait for the ring to complete any outstanding frames */ + tb_ring_flush(sdev->tx_ring.ring, 500); + tb_ring_stop(sdev->tx_ring.ring); + tb_ring_flush(sdev->rx_ring.ring, 500); + tb_ring_stop(sdev->rx_ring.ring); + + xd = tbstream_dev_xdomain(sdev); + if (xd) { + tb_xdomain_disable_paths(xd, sdev->out_hopid, + sdev->tx_ring.ring->hop, + sdev->in_hopid, + sdev->rx_ring.ring->hop); + } + + tbstream_ring_free(&sdev->rx_ring); + tb_ring_free(sdev->rx_ring.ring); + sdev->rx_ring.ring = NULL; + tbstream_ring_free(&sdev->tx_ring); + tb_ring_free(sdev->tx_ring.ring); + sdev->tx_ring.ring = NULL; +} + +static ssize_t +tbstream_dev_fops_read_iter(struct kiocb *kiocb, struct iov_iter *to) +{ + struct file *file = kiocb->ki_filp; + struct tbstream_dev *sdev = file->private_data; + size_t nbytes; + int ret; + + ret = tbstream_dev_valid(sdev); + if (ret) + return ret; + + if (mutex_lock_interruptible(&sdev->lock)) + return -ERESTARTSYS; + + while (!tbstream_ring_available(&sdev->rx_ring)) { + mutex_unlock(&sdev->lock); + + if (file->f_flags & O_NONBLOCK) + return -EAGAIN; + ret = wait_event_interruptible(sdev->wait, + tbstream_ring_available(&sdev->rx_ring) || + tbstream_dev_valid(sdev) != 0 || + tbstream_dev_closed(sdev)); + if (ret) + return ret; + + ret = tbstream_dev_valid(sdev); + if (ret) + return ret; + + if (mutex_lock_interruptible(&sdev->lock)) + return -ERESTARTSYS; + } + + nbytes = 0; + while (nbytes < iov_iter_count(to)) { + struct tbstream_frame *sf; + size_t size, sf_size; + + sf = tbstream_dev_completed_rx(sdev); + if (!sf) + break; + /* + * CLOSE tunneled packet. If userspace already read + * something then we stop processing now and return + * those bytes. Next time the first frame will be CLOSE + * in which case we return EOF to the user. + */ + if (sf->frame.eof == TBSTREAM_CLOSE) { + if (!nbytes) { + tbstream_dev_consume_rx(sdev); + sdev->closed = true; + } + break; + } + + sf_size = tb_ring_frame_size(&sf->frame); + size = min(iov_iter_count(to) - nbytes, sf_size); + + if (copy_page_to_iter(sf->page, sf->offset, size, to) != size) { + ret = -EFAULT; + break; + } + + /* + * If not all data from the frame is read so leave it in + * place and update the offset accordingly so next read + * gets the rest. + */ + if (size < sf_size) { + sf->offset += size; + sf->frame.size = sf_size - size; + } else { + ret = tbstream_dev_consume_rx(sdev); + if (ret) + break; + } + + nbytes += size; + } + + mutex_unlock(&sdev->lock); + if (ret) + return ret; + return nbytes; +} + +static ssize_t +tbstream_dev_fops_write_iter(struct kiocb *kiocb, struct iov_iter *from) +{ + struct file *file = kiocb->ki_filp; + struct tbstream_dev *sdev = file->private_data; + size_t nbytes; + int ret; + + ret = tbstream_dev_valid(sdev); + if (ret) + return ret; + + if (mutex_lock_interruptible(&sdev->lock)) + return -ERESTARTSYS; + + while (!tbstream_ring_available(&sdev->tx_ring)) { + mutex_unlock(&sdev->lock); + + if (file->f_flags & O_NONBLOCK) + return -EAGAIN; + ret = wait_event_interruptible(sdev->wait, + tbstream_ring_available(&sdev->tx_ring) || + tbstream_dev_valid(sdev) != 0); + if (ret) + return ret; + + ret = tbstream_dev_valid(sdev); + if (ret) + return ret; + + if (tbstream_dev_closed(sdev)) + return 0; + + if (mutex_lock_interruptible(&sdev->lock)) + return -ERESTARTSYS; + } + + nbytes = 0; + while (nbytes < iov_iter_count(from)) { + size_t size; + + size = min(iov_iter_count(from) - nbytes, TB_MAX_FRAME_SIZE); + ret = tbstream_dev_send_data(sdev, from, size); + if (ret) { + /* + * If there are no more buffers we are done for + * this write. + */ + if (ret == -ENOBUFS) + ret = 0; + break; + } + + nbytes += size; + } + + mutex_unlock(&sdev->lock); + if (ret) + return ret; + return nbytes; +} + +static __poll_t +tbstream_dev_fops_poll(struct file *file, struct poll_table_struct *wait) +{ + struct tbstream_dev *sdev = file->private_data; + __poll_t mask = 0; + + poll_wait(file, &sdev->wait, wait); + guard(mutex)(&sdev->lock); + if (tbstream_dev_valid(sdev) != 0) { + mask |= EPOLLHUP | EPOLLERR; + } else { + if (tbstream_ring_available(&sdev->tx_ring)) + mask |= EPOLLOUT | EPOLLWRNORM; + if (tbstream_ring_available(&sdev->rx_ring)) + mask |= EPOLLIN | EPOLLRDNORM; + } + return mask; +} + +static int tbstream_dev_fops_open(struct inode *inode, struct file *file) +{ + struct tbstream_dev *sdev; + struct device *dev; + int ret; + + /* + * The matching tbstream_dev_put() is done in tbstream_dev_fops_release() + * to keep the reference as long as the device is open. + */ + dev = class_find_device_by_devt(&tbstream_class, inode->i_rdev); + if (!dev) + return -ENODEV; + sdev = to_tbstream_dev(dev); + + if (mutex_lock_interruptible(&sdev->lock)) { + tbstream_dev_put(sdev); + return -ERESTARTSYS; + } + + /* + * If there is no stream attached yet, block until it appears + * unless this is opened in non-blocking mode. + */ + while ((ret = tbstream_dev_valid(sdev))) { + mutex_unlock(&sdev->lock); + + if (ret != -ENXIO || (file->f_flags & O_NONBLOCK)) + goto err_put; + + ret = wait_event_interruptible(sdev->wait, + tbstream_dev_valid(sdev) == 0); + if (ret) + goto err_put; + + if (mutex_lock_interruptible(&sdev->lock)) { + ret = -ERESTARTSYS; + goto err_put; + } + } + + /* Only on first open we allocate rings and enable paths */ + if (!sdev->users++) { + ret = tbstream_dev_start(sdev); + if (ret) { + sdev->users--; + goto err_unlock; + } + sdev->closed = false; + } + + file->private_data = sdev; + mutex_unlock(&sdev->lock); + return 0; + +err_unlock: + mutex_unlock(&sdev->lock); +err_put: + tbstream_dev_put(sdev); + + return ret; +} + +static int tbstream_dev_fops_release(struct inode *inode, struct file *file) +{ + struct tbstream_dev *sdev = file->private_data; + + mutex_lock(&sdev->lock); + if (--sdev->users == 0) { + /* + * Send CLOSE tunneled packet to notify the other end + * that we are closing the file. We do this twice if the + * first one fails. + */ + tbstream_dev_send_close(sdev); + tbstream_dev_stop(sdev); + } + mutex_unlock(&sdev->lock); + + file->private_data = NULL; + tbstream_dev_put(sdev); + return 0; +} + +static const struct file_operations tbstream_dev_fops = { + .owner = THIS_MODULE, + .llseek = noop_llseek, + .read_iter = tbstream_dev_fops_read_iter, + .write_iter = tbstream_dev_fops_write_iter, + .poll = tbstream_dev_fops_poll, + .open = tbstream_dev_fops_open, + .release = tbstream_dev_fops_release, +}; + +static inline struct tbstream_dev * +tbstream_dev_from_group(struct config_group *group) +{ + return container_of(group, struct tbstream_dev, group); +} + +static ssize_t tbstream_dev_index_show(struct config_item *item, char *buf) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + + return sysfs_emit(buf, "%d\n", sdev->index); +} +CONFIGFS_ATTR_RO(tbstream_dev_, index); + +static ssize_t tbstream_dev_in_hopid_show(struct config_item *item, char *buf) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + + return sysfs_emit(buf, "%d\n", sdev->in_hopid); +} + +/* svc->lock must be held */ +static void service_remove_properties(struct tb_service *svc, const char *name) +{ + struct tb_property *p; + + if (!svc->local_properties) + return; + + p = tb_property_find(svc->local_properties, name, + TB_PROPERTY_TYPE_DIRECTORY); + if (p) { + tb_property_free_dir(p->value.dir); + tb_property_remove(p); + + dev_dbg(&svc->dev, "removed local directory %s\n", name); + + /* + * Is the service directory empty already? If it is then + * we can release it as well. + */ + tb_property_for_each(svc->local_properties, p) { + if (p->type == TB_PROPERTY_TYPE_DIRECTORY) + return; + } + + tb_property_free_dir(svc->local_properties); + svc->local_properties = NULL; + } +} + +static int service_update_properties(struct tb_service *svc, const char *name, + int in_hopid, int out_hopid) +{ + struct tb_property_dir *dir; + struct tb_property *p; + + guard(mutex)(&svc->lock); + + if (in_hopid < 8 || out_hopid < 8) { + service_remove_properties(svc, name); + return 0; + } + + if (!svc->local_properties) { + /* + * Add the service directory first time we + * populate the entries. + */ + svc->local_properties = tb_property_copy_dir(tbstream_dir); + if (!svc->local_properties) + return -ENOMEM; + } + + p = tb_property_find(svc->local_properties, name, + TB_PROPERTY_TYPE_DIRECTORY); + if (p) { + dir = p->value.dir; + + p = tb_property_find(dir, "inhopid", TB_PROPERTY_TYPE_VALUE); + if (p && p->value.immediate != in_hopid) + p->value.immediate = in_hopid; + p = tb_property_find(dir, "outhopid", TB_PROPERTY_TYPE_VALUE); + if (p && p->value.immediate != out_hopid) + p->value.immediate = out_hopid; + + dev_dbg(&svc->dev, + "updated local directory %s: in HopID %d, out HopID %d\n", + name, in_hopid, out_hopid); + } else { + uuid_t uuid; + int ret; + + uuid_gen(&uuid); + dir = tb_property_create_dir(&uuid); + if (!dir) + return -ENOMEM; + + tb_property_add_immediate(dir, "inhopid", in_hopid); + tb_property_add_immediate(dir, "outhopid", out_hopid); + + ret = tb_property_add_dir(svc->local_properties, name, dir); + if (ret) { + tb_property_free_dir(dir); + return ret; + } + + dev_dbg(&svc->dev, + "added local directory %s: in HopID %d, out HopID %d\n", + name, in_hopid, out_hopid); + } + + return 0; +} + +static int tbstream_dev_update_properties(struct tbstream_dev *sdev) +{ + struct tbstream *stream; + int ret; + + stream = tbstream_get(sdev->stream); + if (!stream) + return 0; + + ret = service_update_properties(stream->svc, + config_item_name(&sdev->group.cg_item), + sdev->in_hopid, sdev->out_hopid); + if (!ret) + tb_service_properties_changed(stream->svc); + + tbstream_put(stream); + return ret; +} + +static int tbstream_dev_alloc_in_hopid(struct tbstream_dev *sdev, int hopid) +{ + struct tb_xdomain *xd = tbstream_dev_xdomain(sdev); + int ret; + + if (sdev->in_hopid > 0 && sdev->in_hopid != hopid) + tb_xdomain_release_in_hopid(xd, sdev->in_hopid); + if (!hopid) { + sdev->in_hopid = hopid; + return 0; + } + ret = tb_xdomain_alloc_in_hopid(xd, hopid); + if (ret < 0) + return ret; + /* + * If specific HopID was asked by the user and we did not get + * that one then release and return error instead. + */ + if (hopid > 0 && hopid != ret) { + tb_xdomain_release_in_hopid(xd, ret); + return -EBUSY; + } + sdev->in_hopid = ret; + return 0; +} + +static int tbstream_dev_alloc_out_hopid(struct tbstream_dev *sdev, int hopid) +{ + struct tb_xdomain *xd = tbstream_dev_xdomain(sdev); + int ret; + + if (sdev->out_hopid > 0 && sdev->out_hopid != hopid) + tb_xdomain_release_out_hopid(xd, sdev->out_hopid); + if (!hopid) { + sdev->out_hopid = hopid; + return 0; + } + ret = tb_xdomain_alloc_out_hopid(xd, hopid); + if (ret < 0) + return ret; + if (hopid > 0 && hopid != ret) { + tb_xdomain_release_out_hopid(xd, ret); + return -EBUSY; + } + sdev->out_hopid = ret; + return 0; +} + +static ssize_t +tbstream_dev_in_hopid_store(struct config_item *item, const char *buf, + size_t count) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + int ret, in_hopid; + + ret = kstrtoint(buf, 0, &in_hopid); + if (ret) + return ret; + + guard(mutex)(&sdev->lock); + if (sdev->users) + return -EBUSY; + if (sdev->stream) { + ret = tbstream_dev_alloc_in_hopid(sdev, in_hopid); + if (ret) + return ret; + ret = tbstream_dev_update_properties(sdev); + } else { + sdev->in_hopid = in_hopid; + } + return ret ? ret : count; +} +CONFIGFS_ATTR(tbstream_dev_, in_hopid); + +static ssize_t tbstream_dev_out_hopid_show(struct config_item *item, char *buf) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + + return sysfs_emit(buf, "%d\n", sdev->out_hopid); +} + +static ssize_t +tbstream_dev_out_hopid_store(struct config_item *item, const char *buf, + size_t count) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + int ret, out_hopid; + + ret = kstrtoint(buf, 0, &out_hopid); + if (ret) + return ret; + + guard(mutex)(&sdev->lock); + if (sdev->users) + return -EBUSY; + if (sdev->stream) { + ret = tbstream_dev_alloc_out_hopid(sdev, out_hopid); + if (ret) + return ret; + ret = tbstream_dev_update_properties(sdev); + } else { + sdev->out_hopid = out_hopid; + } + return ret ? ret : count; +} +CONFIGFS_ATTR(tbstream_dev_, out_hopid); + +static ssize_t tbstream_dev_ring_size_show(struct config_item *item, char *buf) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + + return sysfs_emit(buf, "%u\n", sdev->ring_size); +} + +static ssize_t +tbstream_dev_ring_size_store(struct config_item *item, const char *buf, + size_t count) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + unsigned int ring_size; + int ret; + + ret = kstrtouint(buf, 0, &ring_size); + if (ret) + return ret; + + if (ring_size < TBSTREAM_DEV_MIN_RING_SIZE || + ring_size > TBSTREAM_DEV_MAX_RING_SIZE) + return -EINVAL; + + guard(mutex)(&sdev->lock); + if (sdev->users) + return -EBUSY; + sdev->ring_size = ring_size; + return count; +} +CONFIGFS_ATTR(tbstream_dev_, ring_size); + +static ssize_t tbstream_dev_throttling_show(struct config_item *item, char *buf) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + + return sysfs_emit(buf, "%u\n", sdev->throttling); +} + +static ssize_t +tbstream_dev_throttling_store(struct config_item *item, const char *buf, + size_t count) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + unsigned int throttling; + int ret; + + ret = kstrtouint(buf, 0, &throttling); + if (ret) + return ret; + + if (throttling > TBSTREAM_DEV_MAX_THROTTLING) + return -EINVAL; + + guard(mutex)(&sdev->lock); + if (sdev->users) + return -EBUSY; + sdev->throttling = throttling; + return count; +} +CONFIGFS_ATTR(tbstream_dev_, throttling); + +static struct configfs_attribute *tbstream_dev_attrs[] = { + &tbstream_dev_attr_index, + &tbstream_dev_attr_in_hopid, + &tbstream_dev_attr_out_hopid, + &tbstream_dev_attr_ring_size, + &tbstream_dev_attr_throttling, + NULL, +}; + +static void tbstream_dev_item_release(struct config_item *item) +{ + struct config_group *group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(group); + + /* Undo device_initialize() + cdev_device_add() */ + cdev_device_del(&sdev->cdev, &sdev->dev); + tbstream_dev_put(sdev); +} + +static struct configfs_item_operations tbstream_dev_item_ops = { + .release = tbstream_dev_item_release, +}; + +static const struct config_item_type tbstream_dev_type = { + .ct_owner = THIS_MODULE, + .ct_item_ops = &tbstream_dev_item_ops, + .ct_attrs = tbstream_dev_attrs, +}; + +static void service_get_hopids(struct tb_service *svc, const char *name, + int *in_hopid, int *out_hopid) +{ + struct tb_property_dir *dir; + struct tb_property *p; + + guard(mutex)(&svc->lock); + + /* See if we have directory entry with the matching name */ + p = tb_property_find(svc->remote_properties, name, + TB_PROPERTY_TYPE_DIRECTORY); + if (!p) + return; + + dir = p->value.dir; + + /* + * We need to reverse the HopIDs on our end so that in becomes + * out and vice versa. + */ + p = tb_property_find(dir, "inhopid", TB_PROPERTY_TYPE_VALUE); + if (p && p->value.immediate >= 8) + *out_hopid = p->value.immediate; + p = tb_property_find(dir, "outhopid", TB_PROPERTY_TYPE_VALUE); + if (p && p->value.immediate >= 8) + *in_hopid = p->value.immediate; +} + +static void +tbstream_dev_attach_stream(struct tbstream_dev *sdev, struct tbstream_group *sg) +{ + const char *name = config_item_name(&sdev->group.cg_item); + struct tbstream *stream; + + stream = tbstream_get(sg->stream); + if (!stream) + return; + + scoped_guard(mutex, &sdev->lock) { + sdev->stream = stream; + /* + * If there is no existing configuration (or automatic + * configuration is being used) check if the other side + * has configuration for this and use it. + */ + if (sdev->in_hopid <= 0 && sdev->out_hopid <= 0) + service_get_hopids(stream->svc, name, &sdev->in_hopid, + &sdev->out_hopid); + if (sdev->in_hopid) + tbstream_dev_alloc_in_hopid(sdev, sdev->in_hopid); + if (sdev->out_hopid) + tbstream_dev_alloc_out_hopid(sdev, sdev->out_hopid); + } + + service_update_properties(stream->svc, name, sdev->in_hopid, + sdev->out_hopid); + tb_service_properties_changed(stream->svc); + + /* Notify any openerers that the stream is now attached */ + wake_up_interruptible(&sdev->wait); +} + +static void tbstream_dev_detach_stream(struct tbstream_dev *sdev) +{ + const char *name = config_item_name(&sdev->group.cg_item); + struct tbstream *stream; + struct tb_xdomain *xd; + + scoped_guard(mutex, &sdev->lock) { + stream = sdev->stream; + if (!stream) + return; + sdev->stream = NULL; + + xd = tb_service_parent(stream->svc); + if (sdev->out_hopid > 0) + tb_xdomain_release_out_hopid(xd, sdev->out_hopid); + if (sdev->in_hopid > 0) + tb_xdomain_release_in_hopid(xd, sdev->in_hopid); + } + + service_update_properties(stream->svc, name, 0, 0); + tb_service_properties_changed(stream->svc); + + tbstream_put(stream); + + /* Notify any task that the stream is not valid anymore */ + wake_up_interruptible_poll(&sdev->wait, EPOLLHUP | EPOLLERR); +} + +static inline struct tbstream_group * +to_tbstream_group(struct config_group *group) +{ + return container_of(group, struct tbstream_group, group); +} + +static struct config_group * +tbstream_dev_make_group(struct config_group *group, const char *name) +{ + struct tbstream_group *sg = to_tbstream_group(group); + struct tbstream_dev *sdev; + int ret, index; + + /* + * We want the names to be suitable for passing as property + * directory names. + */ + if (strlen(name) > TB_PROPERTY_KEY_SIZE) + return ERR_PTR(-ENAMETOOLONG); + + sdev = kzalloc_obj(*sdev, GFP_KERNEL); + if (!sdev) + return ERR_PTR(-ENOMEM); + + index = ida_alloc_max(&tbstream_minors, TBSTREAM_DEV_MINORS - 1, + GFP_KERNEL); + if (index < 0) { + kfree(sdev); + return ERR_PTR(index); + } + + sdev->index = index; + sdev->ring_size = TBSTREAM_DEV_RING_SIZE; + sdev->throttling = TBSTREAM_DEV_THROTTLING; + mutex_init(&sdev->lock); + init_waitqueue_head(&sdev->wait); + INIT_LIST_HEAD(&sdev->list); + + sdev->dev.devt = MKDEV(MAJOR(tbstream_devt), index); + sdev->dev.class = &tbstream_class; + sdev->dev.release = tbstream_dev_release; + /* This point forward tbstream_dev_put() must be used to release sdev */ + device_initialize(&sdev->dev); + + ret = dev_set_name(&sdev->dev, "tbstream%d", index); + if (ret) { + tbstream_dev_put(sdev); + return ERR_PTR(ret); + } + + config_group_init_type_name(&sdev->group, name, &tbstream_dev_type); + + scoped_guard(mutex, &sg->lock) + list_add_tail(&sdev->list, &sg->dev_list); + + tbstream_dev_attach_stream(sdev, sg); + + cdev_init(&sdev->cdev, &tbstream_dev_fops); + ret = cdev_device_add(&sdev->cdev, &sdev->dev); + if (ret) { + tbstream_dev_detach_stream(sdev); + /* Calls tbstream_dev_put() */ + config_group_put(&sdev->group); + return ERR_PTR(ret); + } + + return &sdev->group; +} + +static void +tbstream_dev_drop_item(struct config_group *group, struct config_item *item) +{ + struct config_group *sdev_group = to_config_group(item); + struct tbstream_dev *sdev = tbstream_dev_from_group(sdev_group); + struct tbstream_group *sg = to_tbstream_group(group); + + tbstream_dev_detach_stream(sdev); + scoped_guard(mutex, &sg->lock) + list_del(&sdev->list); + config_item_put(item); +} + +static struct configfs_group_operations tbstream_dev_group_ops = { + .make_group = tbstream_dev_make_group, + .drop_item = tbstream_dev_drop_item, +}; + +static void tbstream_item_release(struct config_item *item) +{ + struct config_group *group = to_config_group(item); + struct tbstream_group *sg = to_tbstream_group(group); + + tbstream_put(sg->stream); + kfree(sg); +} + +static struct configfs_item_operations tbstream_item_ops = { + .release = tbstream_item_release, +}; + +static const struct config_item_type tbstream_dev_group_type = { + .ct_owner = THIS_MODULE, + .ct_group_ops = &tbstream_dev_group_ops, + .ct_item_ops = &tbstream_item_ops, +}; + +static struct config_group * +tbstream_make_group(struct config_group *group, const char *name) +{ + struct tbstream_group *sg; + struct tbstream *stream; + int domain, index; + u64 route; + + /* Make sure the format is correct */ + if (sscanf(name, "%u-%llx.%u", &domain, &route, &index) != 3) + return ERR_PTR(-EINVAL); + + sg = kzalloc_obj(*sg, GFP_KERNEL); + if (!sg) + return ERR_PTR(-ENOMEM); + + mutex_init(&sg->lock); + INIT_LIST_HEAD(&sg->dev_list); + + guard(mutex)(&tbstream_lock); + list_for_each_entry(stream, &tbstream_list, list) { + if (sysfs_streq(name, dev_name(&stream->svc->dev))) { + sg->stream = tbstream_get(stream); + break; + } + } + + config_group_init_type_name(&sg->group, name, &tbstream_dev_group_type); + return &sg->group; +} + +static struct configfs_group_operations tbstream_group_ops = { + .make_group = tbstream_make_group, +}; + +static const struct config_item_type tbstream_group_type = { + .ct_owner = THIS_MODULE, + .ct_group_ops = &tbstream_group_ops, +}; + +static struct config_group tbstream_group = { + .cg_item = { + .ci_namebuf = "stream", + .ci_type = &tbstream_group_type, + }, +}; + +/* Returns reference count increased */ +static struct tbstream_group *tbstream_group_find(struct tbstream *stream) +{ + struct config_item *item; + + guard(mutex)(&tbstream_group.cg_subsys->su_mutex); + item = config_group_find_item(&tbstream_group, + dev_name(&stream->svc->dev)); + if (item) + return to_tbstream_group(to_config_group(item)); + return NULL; +} + +static void tbstream_group_attach_stream(struct tbstream *stream) +{ + struct tbstream_group *sg; + struct tbstream_dev *sdev; + + sg = tbstream_group_find(stream); + if (!sg) + return; + + guard(mutex)(&sg->lock); + if (WARN_ON(sg->stream)) { + config_group_put(&sg->group); + return; + } + sg->stream = tbstream_get(stream); + /* + * If there are existing stream devices, attach the stream to + * them now. + */ + list_for_each_entry(sdev, &sg->dev_list, list) + tbstream_dev_attach_stream(sdev, sg); + + config_group_put(&sg->group); +} + +static void tbstream_group_detach_stream(struct tbstream *stream) +{ + struct tbstream_group *sg; + struct tbstream_dev *sdev; + + sg = tbstream_group_find(stream); + if (!sg) + return; + + guard(mutex)(&sg->lock); + if (sg->stream) { + /* Detach this stream from the stream devices */ + list_for_each_entry_reverse(sdev, &sg->dev_list, list) + tbstream_dev_detach_stream(sdev); + tbstream_put(sg->stream); + sg->stream = NULL; + } + + config_group_put(&sg->group); +} + +static int tbstream_probe(struct tb_service *svc, const struct tb_service_id *id) +{ + struct tbstream *stream; + + stream = kzalloc_obj(*stream, GFP_KERNEL); + if (!stream) + return -ENOMEM; + + /* After this point, release stream by calling tbstream_put() */ + kref_init(&stream->kref); + stream->svc = tb_service_get(svc); + INIT_LIST_HEAD(&stream->list); + + scoped_guard(mutex, &tbstream_lock) + list_add_tail(&stream->list, &tbstream_list); + + tbstream_group_attach_stream(stream); + tb_service_set_drvdata(svc, stream); + return 0; +} + +static void tbstream_remove(struct tb_service *svc) +{ + struct tbstream *stream = tb_service_get_drvdata(svc); + + tbstream_group_detach_stream(stream); + scoped_guard(mutex, &tbstream_lock) + list_del(&stream->list); + tbstream_put(stream); +} + +static int __maybe_unused tbstream_suspend(struct device *dev) +{ + struct tb_service *svc = tb_to_service(dev); + struct tbstream *stream = tb_service_get_drvdata(svc); + struct tbstream_group *sg; + struct tbstream_dev *sdev; + + sg = tbstream_group_find(stream); + if (!sg) + return 0; + + list_for_each_entry_reverse(sdev, &sg->dev_list, list) { + /* Stop the stream (if it was open) */ + if (sdev->users) + tbstream_dev_stop(sdev); + } + + config_group_put(&sg->group); + return 0; +} + +static int __maybe_unused tbstream_resume(struct device *dev) +{ + struct tb_service *svc = tb_to_service(dev); + struct tbstream *stream = tb_service_get_drvdata(svc); + struct tbstream_group *sg; + struct tbstream_dev *sdev; + + sg = tbstream_group_find(stream); + if (!sg) + return 0; + + list_for_each_entry(sdev, &sg->dev_list, list) { + int ret; + + if (!sdev->users) + continue; + ret = tbstream_dev_start(sdev); + if (ret) { + config_group_put(&sg->group); + return ret; + } + } + + config_group_put(&sg->group); + return 0; +} + +static const struct dev_pm_ops tbstream_pm_ops = { + SET_SYSTEM_SLEEP_PM_OPS(tbstream_suspend, tbstream_resume) +}; + +static const struct tb_service_id tbstream_ids[] = { + { TB_SERVICE("stream", 1) }, + { }, +}; +MODULE_DEVICE_TABLE(tbsvc, tbstream_ids); + +static struct tb_service_driver tbstream_driver = { + .driver = { + .owner = THIS_MODULE, + .name = "thunderbolt_stream", + .pm = &tbstream_pm_ops, + }, + .probe = tbstream_probe, + .remove = tbstream_remove, + .id_table = tbstream_ids, +}; + +static int __init tbstream_init(void) +{ + int ret; + + ret = alloc_chrdev_region(&tbstream_devt, 0, TBSTREAM_DEV_MINORS, + "tbstream"); + if (ret) + return ret; + + ret = class_register(&tbstream_class); + if (ret) + goto err_unregister_chrdev; + + tbstream_dir = tb_property_create_dir(&tbstream_dir_uuid); + if (!tbstream_dir) { + ret = -ENOMEM; + goto err_unregister_class; + } + + tb_property_add_immediate(tbstream_dir, "prtcid", 1); + tb_property_add_immediate(tbstream_dir, "prtcvers", 1); + tb_property_add_immediate(tbstream_dir, "prtcrevs", 0); + tb_property_add_immediate(tbstream_dir, "prtcstns", 0); + + ret = tb_register_property_dir("stream", tbstream_dir); + if (ret) + goto err_free_dir; + + config_group_init(&tbstream_group); + ret = tb_configfs_register_group(&tbstream_group); + if (ret) + goto err_unregister_dir; + + ret = tb_register_service_driver(&tbstream_driver); + if (ret) + goto err_unregister_group; + return 0; + +err_unregister_group: + tb_configfs_unregister_group(&tbstream_group); +err_unregister_dir: + tb_unregister_property_dir("stream", tbstream_dir); +err_free_dir: + tb_property_free_dir(tbstream_dir); +err_unregister_class: + class_unregister(&tbstream_class); +err_unregister_chrdev: + unregister_chrdev_region(tbstream_devt, TBSTREAM_DEV_MINORS); + + return ret; +} +module_init(tbstream_init); + +static void __exit tbstream_exit(void) +{ + tb_unregister_service_driver(&tbstream_driver); + tb_configfs_unregister_group(&tbstream_group); + tb_unregister_property_dir("stream", tbstream_dir); + tb_property_free_dir(tbstream_dir); + class_unregister(&tbstream_class); + unregister_chrdev_region(tbstream_devt, TBSTREAM_DEV_MINORS); + ida_destroy(&tbstream_minors); +} +module_exit(tbstream_exit); + +MODULE_AUTHOR("Alan Borzeszkowski "); +MODULE_AUTHOR("Mika Westerberg "); +MODULE_DESCRIPTION("Stream data over Thunderbolt/USB4 cable"); +MODULE_LICENSE("GPL"); -- 2.50.1