* [hail patch 1/3] chunkd: Add checksum table to on-disk format
2010-09-15 3:14 [hail patch 0/3] chunkd: on-disk checksumming and get-partial operation Jeff Garzik
@ 2010-09-15 3:15 ` Jeff Garzik
2010-09-15 3:16 ` [hail patch 2/3] chunkd: checksum data prior to returning via GET Jeff Garzik
` (2 subsequent siblings)
3 siblings, 0 replies; 5+ messages in thread
From: Jeff Garzik @ 2010-09-15 3:15 UTC (permalink / raw)
To: hail-devel
commit f1de17a6e2b3afdbfbfa581228280b65a4a17e5f
Author: Jeff Garzik <jeff@garzik.org>
Date: Thu Aug 5 17:47:03 2010 -0400
chunkd: Add checksum table to on-disk format, one sum per 64k of data
Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
chunkd/be-fs.c | 162 ++++++++++++++++++++++++++++++++++++++++++++++++---------
1 file changed, 137 insertions(+), 25 deletions(-)
diff --git a/chunkd/be-fs.c b/chunkd/be-fs.c
index 4b851a7..d714e7c 100644
--- a/chunkd/be-fs.c
+++ b/chunkd/be-fs.c
@@ -53,14 +53,23 @@ struct fs_obj {
int in_fd;
char *in_fn;
off_t sendfile_ofs;
+
+ size_t checked_bytes;
+ SHA_CTX checksum;
+ unsigned int csum_idx;
+ void *csum_tbl;
+ size_t csum_tbl_sz;
+
+ unsigned int n_blk;
};
struct be_fs_obj_hdr {
char magic[4];
uint32_t key_len;
uint64_t value_len;
+ uint32_t n_blk;
- char reserved[16];
+ char reserved[12];
unsigned char hash[CHD_CSUM_SZ];
char owner[128];
@@ -208,6 +217,8 @@ static struct fs_obj *fs_obj_alloc(void)
obj->out_fd = -1;
obj->in_fd = -1;
+ SHA1_Init(&obj->checksum);
+
return obj;
}
@@ -318,6 +329,17 @@ static bool key_valid(const void *key, size_t key_len)
return true;
}
+static unsigned int fs_blk_count(uint64_t data_len)
+{
+ uint64_t n_blk;
+
+ n_blk = data_len >> CHUNK_BLK_ORDER;
+ if (data_len & (CHUNK_BLK_SZ - 1))
+ n_blk++;
+
+ return (unsigned int) n_blk;
+}
+
struct backend_obj *fs_obj_new(uint32_t table_id,
const void *key, size_t key_len,
uint64_t data_len,
@@ -325,6 +347,7 @@ struct backend_obj *fs_obj_new(uint32_t table_id,
{
struct fs_obj *obj;
char *fn = NULL;
+ size_t csum_bytes;
enum chunk_errcode erc = che_InternalError;
off_t skip_len;
@@ -339,6 +362,13 @@ struct backend_obj *fs_obj_new(uint32_t table_id,
return NULL;
}
+ obj->n_blk = fs_blk_count(data_len);
+ csum_bytes = obj->n_blk * CHD_CSUM_SZ;
+ obj->csum_tbl = malloc(csum_bytes);
+ if (!obj->csum_tbl)
+ goto err_out;
+ obj->csum_tbl_sz = csum_bytes;
+
/* build local fs pathname */
fn = fs_obj_pathname(table_id, key, key_len);
if (!fn)
@@ -359,7 +389,7 @@ struct backend_obj *fs_obj_new(uint32_t table_id,
obj->out_fn = fn;
/* calculate size of front-of-file metadata area */
- skip_len = sizeof(struct be_fs_obj_hdr) + key_len;
+ skip_len = sizeof(struct be_fs_obj_hdr) + key_len + csum_bytes;
/* position file pointer where object data (as in, not metadata)
* will begin
@@ -397,7 +427,10 @@ struct backend_obj *fs_obj_open(uint32_t table_id, const char *user,
struct be_fs_obj_hdr hdr;
ssize_t rrc;
uint64_t value_len, tmp64;
+ size_t csum_bytes;
enum chunk_errcode erc = che_InternalError;
+ struct iovec iov[2];
+ size_t total_rd_len;
if (!key_valid(key, key_len)) {
*err_code = che_InvalidKey;
@@ -457,23 +490,45 @@ struct backend_obj *fs_obj_open(uint32_t table_id, const char *user,
goto err_out;
value_len = GUINT64_FROM_LE(hdr.value_len);
+ obj->n_blk = GUINT32_FROM_LE(hdr.n_blk);
+ csum_bytes = obj->n_blk * CHD_CSUM_SZ;
/* verify file size large enough to contain value */
- tmp64 = value_len + sizeof(hdr) + key_len;
+ tmp64 = value_len + sizeof(hdr) + key_len + csum_bytes;
if (G_UNLIKELY(st.st_size < tmp64)) {
applog(LOG_ERR, "obj(%s) size error, too small", obj->in_fn);
goto err_out;
}
+ /* verify expected size of checksum table */
+ if (G_UNLIKELY(fs_blk_count(value_len) != obj->n_blk)) {
+ applog(LOG_ERR, "obj(%s) unexpected blk count "
+ "(%u from val sz, %u from hdr)",
+ obj->in_fn, fs_blk_count(value_len), obj->n_blk);
+ goto err_out;
+ }
+
+ obj->csum_tbl = malloc(csum_bytes);
+ if (!obj->csum_tbl)
+ goto err_out;
+ obj->csum_tbl_sz = csum_bytes;
+
obj->bo.key = malloc(key_len);
obj->bo.key_len = key_len;
if (!obj->bo.key)
goto err_out;
- /* read object variable-length header */
- rrc = read(obj->in_fd, obj->bo.key, key_len);
- if ((rrc != key_len) || (memcmp(key, obj->bo.key, key_len))) {
- applog(LOG_ERR, "read hdr key obj(%s) failed: %s",
+ /* init additional header segment list */
+ iov[0].iov_base = obj->bo.key;
+ iov[0].iov_len = key_len;
+ iov[1].iov_base = obj->csum_tbl;
+ iov[1].iov_len = csum_bytes;
+ total_rd_len = iov[0].iov_len + iov[1].iov_len;
+
+ /* read additional header segments (key, checksum table) */
+ rrc = readv(obj->in_fd, iov, ARRAY_SIZE(iov));
+ if ((rrc != total_rd_len) || (memcmp(key, obj->bo.key, key_len))) {
+ applog(LOG_ERR, "read addnl hdrs(%s) failed: %s",
obj->in_fn,
(rrc < 0) ? strerror(errno) : "<unknown reasons>");
goto err_out;
@@ -516,6 +571,7 @@ void fs_obj_free(struct backend_obj *bo)
if (obj->in_fd >= 0)
close(obj->in_fd);
+ free(obj->csum_tbl);
free(obj);
}
@@ -532,19 +588,58 @@ ssize_t fs_obj_read(struct backend_obj *bo, void *ptr, size_t len)
return rc;
}
+static void obj_flush_csum(struct backend_obj *bo)
+{
+ struct fs_obj *obj = bo->private;
+ unsigned char md[CHD_CSUM_SZ];
+
+ if (G_UNLIKELY(obj->csum_idx >= obj->n_blk)) {
+ applog(LOG_ERR, "BUG %s: cidx %u, n_blk %u",
+ __func__, obj->csum_idx, obj->n_blk);
+ return;
+ }
+
+ SHA1_Final(md, &obj->checksum);
+
+ memcpy(obj->csum_tbl + ((obj->csum_idx++) * CHD_CSUM_SZ),
+ md, CHD_CSUM_SZ);
+
+ obj->checked_bytes = 0;
+ SHA1_Init(&obj->checksum);
+}
+
ssize_t fs_obj_write(struct backend_obj *bo, const void *ptr, size_t len)
{
struct fs_obj *obj = bo->private;
- ssize_t rc;
+ ssize_t total_written = 0;
- rc = write(obj->out_fd, ptr, len);
- if (rc < 0)
- applog(LOG_ERR, "obj write(%s) failed: %s",
- obj->out_fn, strerror(errno));
- else
- obj->written_bytes += rc;
+ while (len > 0) {
+ size_t unchecked;
+ ssize_t wrc;
- return rc;
+ unchecked = CHUNK_BLK_SZ - obj->checked_bytes;
+
+ wrc = write(obj->out_fd, ptr, MIN(unchecked, len));
+ if (wrc < 0) {
+ applog(LOG_ERR, "obj write(%s) failed: %s",
+ obj->out_fn, strerror(errno));
+ return wrc;
+ }
+
+ SHA1_Update(&obj->checksum, ptr, wrc);
+
+ total_written += wrc;
+ obj->written_bytes += wrc;
+ obj->checked_bytes += wrc;
+ ptr += wrc;
+ len -= wrc;
+
+ /* if at end of 64k block, update csum table with new csum */
+ if (obj->checked_bytes == CHUNK_BLK_SZ)
+ obj_flush_csum(bo);
+ }
+
+ return total_written;
}
#if defined(HAVE_SENDFILE) && defined(__linux__)
@@ -554,10 +649,11 @@ ssize_t fs_obj_sendfile(struct backend_obj *bo, int out_fd, size_t len)
struct fs_obj *obj = bo->private;
ssize_t rc;
- if (obj->sendfile_ofs == 0) {
- obj->sendfile_ofs += sizeof(struct be_fs_obj_hdr);
- obj->sendfile_ofs += bo->key_len;
- }
+ if (obj->sendfile_ofs == 0)
+ obj->sendfile_ofs =
+ sizeof(struct be_fs_obj_hdr) +
+ bo->key_len +
+ obj->csum_tbl_sz;
rc = sendfile(out_fd, obj->in_fd, &obj->sendfile_ofs, len);
if (rc < 0)
@@ -575,10 +671,11 @@ ssize_t fs_obj_sendfile(struct backend_obj *bo, int out_fd, size_t len)
ssize_t rc;
off_t sbytes = 0;
- if (obj->sendfile_ofs == 0) {
- obj->sendfile_ofs += sizeof(struct be_fs_obj_hdr);
- obj->sendfile_ofs += bo->key_len;
- }
+ if (obj->sendfile_ofs == 0)
+ obj->sendfile_ofs =
+ sizeof(struct be_fs_obj_hdr) +
+ bo->key_len +
+ obj->csum_tbl_sz;
rc = sendfile(obj->in_fd, out_fd, obj->sendfile_ofs, len,
NULL, &sbytes, 0);
@@ -610,7 +707,7 @@ bool fs_obj_write_commit(struct backend_obj *bo, const char *user,
struct be_fs_obj_hdr hdr;
ssize_t wrc;
size_t total_wr_len;
- struct iovec iov[2];
+ struct iovec iov[3];
if (G_UNLIKELY(obj->bo.size != obj->written_bytes)) {
applog(LOG_ERR, "BUG(%s): size/written_bytes mismatch: %llu/%llu",
@@ -626,6 +723,19 @@ bool fs_obj_write_commit(struct backend_obj *bo, const char *user,
strncpy(hdr.owner, user, sizeof(hdr.owner));
hdr.key_len = GUINT32_TO_LE(bo->key_len);
hdr.value_len = GUINT64_TO_LE(obj->written_bytes);
+ hdr.n_blk = GUINT32_TO_LE(obj->n_blk);
+
+ /* update checksum table with final csum, if necessary */
+ if (obj->checked_bytes > 0)
+ obj_flush_csum(bo);
+
+ if (G_UNLIKELY(obj->csum_idx != obj->n_blk)) {
+ applog(LOG_ERR, "BUG(%s): csum_idx/n_blk mismatch: %u/%u",
+ obj->out_fn, obj->csum_idx, obj->n_blk);
+ return false;
+ }
+
+ obj->csum_idx = 0;
/* go back to beginning of file */
if (lseek(obj->out_fd, 0, SEEK_SET) < 0) {
@@ -639,7 +749,9 @@ bool fs_obj_write_commit(struct backend_obj *bo, const char *user,
iov[0].iov_len = sizeof(hdr);
iov[1].iov_base = bo->key;
iov[1].iov_len = bo->key_len;
- total_wr_len = iov[0].iov_len + iov[1].iov_len;
+ iov[2].iov_base = obj->csum_tbl;
+ iov[2].iov_len = obj->csum_tbl_sz;
+ total_wr_len = iov[0].iov_len + iov[1].iov_len + iov[2].iov_len;
/* write object header segments */
wrc = writev(obj->out_fd, iov, ARRAY_SIZE(iov));
^ permalink raw reply related [flat|nested] 5+ messages in thread* [hail patch 3/3] chunkd: new get-partial operation
2010-09-15 3:14 [hail patch 0/3] chunkd: on-disk checksumming and get-partial operation Jeff Garzik
2010-09-15 3:15 ` [hail patch 1/3] chunkd: Add checksum table to on-disk format Jeff Garzik
2010-09-15 3:16 ` [hail patch 2/3] chunkd: checksum data prior to returning via GET Jeff Garzik
@ 2010-09-15 3:16 ` Jeff Garzik
2010-09-16 4:04 ` [hail patch 0/3] chunkd: on-disk checksumming and " Jeff Garzik
3 siblings, 0 replies; 5+ messages in thread
From: Jeff Garzik @ 2010-09-15 3:16 UTC (permalink / raw)
To: hail-devel
commit f2493782233b8b581edd5975c3b2019d80ca6552
Author: Jeff Garzik <jeff@garzik.org>
Date: Tue Sep 14 22:59:50 2010 -0400
chunk: add get-partial operation
Signed-off-by: Jeff Garzik <jgarzik@redhat.com>
chunkd/be-fs.c | 25 +++++++++-
chunkd/chunkd.h | 6 +-
chunkd/object.c | 113 +++++++++++++++++++++++++++++++++++++++++++++++
chunkd/server.c | 14 +++++
doc/chcli.8 | 4 +
include/chunk_msg.h | 17 ++++++-
include/chunkc.h | 33 +++++++++++++
lib/chunkdc.c | 125 ++++++++++++++++++++++++++++++++++++++++++++++++++++
lib/chunksrv.c | 3 +
tools/chcli.c | 102 ++++++++++++++++++++++++++++++++++++++++++
10 files changed, 435 insertions(+), 7 deletions(-)
diff --git a/chunkd/be-fs.c b/chunkd/be-fs.c
index 24baad7..c2dc901 100644
--- a/chunkd/be-fs.c
+++ b/chunkd/be-fs.c
@@ -55,6 +55,8 @@ struct fs_obj {
off_t in_pos;
off_t sendfile_ofs;
+ off_t value_ofs;
+
off_t tail_pos;
size_t tail_len;
@@ -396,12 +398,13 @@ struct backend_obj *fs_obj_new(uint32_t table_id,
/* calculate size of front-of-file metadata area */
skip_len = sizeof(struct be_fs_obj_hdr) + key_len + csum_bytes;
+ obj->value_ofs = skip_len;
/* position file pointer where object data (as in, not metadata)
* will begin
*/
errno = 0;
- if (lseek(obj->out_fd, skip_len, SEEK_SET) != skip_len) {
+ if (lseek(obj->out_fd, obj->value_ofs, SEEK_SET) != obj->value_ofs) {
applog(LOG_ERR, "obj hdr seek(%s) failed: %s",
fn, strerror(errno));
goto err_out;
@@ -500,9 +503,10 @@ struct backend_obj *fs_obj_open(uint32_t table_id, const char *user,
csum_bytes = obj->n_blk * CHD_CSUM_SZ;
obj->tail_pos = value_len & ~(CHUNK_BLK_SZ - 1);
obj->tail_len = value_len & (CHUNK_BLK_SZ - 1);
+ obj->value_ofs = sizeof(hdr) + key_len + csum_bytes;
/* verify file size large enough to contain value */
- tmp64 = value_len + sizeof(hdr) + key_len + csum_bytes;
+ tmp64 = obj->value_ofs + value_len;
if (G_UNLIKELY(st.st_size < tmp64)) {
applog(LOG_ERR, "obj(%s) size error, too small", obj->in_fn);
goto err_out;
@@ -596,6 +600,23 @@ static bool can_csum_blk(struct fs_obj *obj, size_t len)
return false;
}
+int fs_obj_seek(struct backend_obj *bo, off_t ofs)
+{
+ struct fs_obj *obj = bo->private;
+ off_t rc;
+
+ rc = lseek(obj->in_fd, obj->value_ofs + ofs, SEEK_SET);
+ if (rc == (off_t)-1) {
+ applog(LOG_ERR, "obj seek(%s) failed: %s",
+ obj->in_fn, strerror(errno));
+ return -errno;
+ }
+
+ obj->in_pos = ofs;
+
+ return 0;
+}
+
ssize_t fs_obj_read(struct backend_obj *bo, void *ptr, size_t len)
{
struct fs_obj *obj = bo->private;
diff --git a/chunkd/chunkd.h b/chunkd/chunkd.h
index 7da2c5f..15e96be 100644
--- a/chunkd/chunkd.h
+++ b/chunkd/chunkd.h
@@ -36,9 +36,6 @@
#endif
enum {
- CHUNK_BLK_ORDER = 16, /* 64k blocks */
- CHUNK_BLK_SZ = 1 << CHUNK_BLK_ORDER,
-
CLI_DATA_BUF_SZ = CHUNK_BLK_SZ,
CHD_TRASH_MAX = 1000,
@@ -95,6 +92,7 @@ struct client {
bool writing;
struct chunksrv_req creq;
+ struct chunksrv_req_getpart creq_getpart;
unsigned int req_used; /* amount of req_buf in use */
void *req_ptr; /* start of unexamined data */
uint16_t key_len;
@@ -264,6 +262,7 @@ extern struct backend_obj *fs_obj_open(uint32_t table_id, const char *user,
enum chunk_errcode *err_code);
extern ssize_t fs_obj_write(struct backend_obj *bo, const void *ptr, size_t len);
extern ssize_t fs_obj_read(struct backend_obj *bo, void *ptr, size_t len);
+extern int fs_obj_seek(struct backend_obj *bo, off_t ofs);
extern void fs_obj_free(struct backend_obj *bo);
extern bool fs_obj_write_commit(struct backend_obj *bo, const char *user,
unsigned char *md, bool sync_data);
@@ -290,6 +289,7 @@ extern int fs_obj_do_sum(const char *fn, unsigned int klen, unsigned char *md);
extern bool object_del(struct client *cli);
extern bool object_put(struct client *cli);
extern bool object_get(struct client *cli, bool want_body);
+extern bool object_get_part(struct client *cli);
extern bool object_cp(struct client *cli);
extern bool cli_evt_data_in(struct client *cli, unsigned int events);
extern void cli_out_end(struct client *cli);
diff --git a/chunkd/object.c b/chunkd/object.c
index d7d3cb6..3242ba0 100644
--- a/chunkd/object.c
+++ b/chunkd/object.c
@@ -354,6 +354,119 @@ start_write:
return cli_write_start(cli);
}
+bool object_get_part(struct client *cli)
+{
+ static const uint64_t max_getpart = CHUNK_MAX_GETPART * CHUNK_BLK_SZ;
+ int rc;
+ enum chunk_errcode err = che_InternalError;
+ struct backend_obj *obj;
+ struct chunksrv_resp_get *get_resp = NULL;
+ uint64_t offset, length, remain;
+ uint64_t aligned_ofs, aligned_len, aligned_rem;
+ ssize_t rrc;
+ void *mem = NULL;
+
+ get_resp = calloc(1, sizeof(*get_resp));
+ if (!get_resp) {
+ cli->state = evt_dispose;
+ return true;
+ }
+
+ resp_init_req(&get_resp->resp, &cli->creq);
+
+ cli->in_obj = obj = fs_obj_open(cli->table_id, cli->user, cli->key,
+ cli->key_len, &err);
+ if (!obj) {
+ free(get_resp);
+ return cli_err(cli, err, true);
+ }
+
+ cli->in_len = obj->size;
+
+ /* obtain requested offset */
+ offset = le64_to_cpu(cli->creq_getpart.offset);
+ if (offset > obj->size) {
+ err = che_InvalidSeek;
+ free(get_resp);
+ return cli_err(cli, err, true);
+ }
+
+ /* align to block boundary */
+ aligned_ofs = offset & ~CHUNK_BLK_MASK;
+ remain = obj->size - offset;
+ aligned_rem = obj->size - aligned_ofs;
+
+ /* obtain requested length; 0 == "until end of object" */
+ length = le64_to_cpu(cli->creq.data_len);
+ if (length == 0 || length > remain)
+ length = remain;
+ if (length > max_getpart)
+ length = max_getpart;
+
+ /* calculate length based on block size */
+ aligned_len = length + (offset - aligned_ofs);
+ if (aligned_len & CHUNK_BLK_MASK)
+ aligned_len += (CHUNK_BLK_SZ - (aligned_len & CHUNK_BLK_MASK));
+ if (aligned_len > aligned_rem)
+ aligned_len = aligned_rem;
+
+ if (length) {
+ /* seek to offset */
+ rc = fs_obj_seek(obj, aligned_ofs);
+ if (rc) {
+ err = che_InvalidSeek;
+ free(get_resp);
+ return cli_err(cli, err, true);
+ }
+
+ /* allocate buffer to hold all get_part request data */
+ mem = malloc(aligned_len);
+ if (!mem) {
+ free(get_resp);
+ return cli_err(cli, err, true);
+ }
+
+ /* read requested data in its entirety */
+ rrc = fs_obj_read(obj, mem, aligned_len);
+ if (rrc != aligned_len) {
+ free(mem);
+ free(get_resp);
+ return cli_err(cli, err, true);
+ }
+ }
+
+ /* fill in response */
+ if (length == remain)
+ get_resp->resp.flags |= CHF_GET_PART_LAST;
+ get_resp->resp.data_len = cpu_to_le64(length);
+ SHA1(mem, aligned_len, get_resp->resp.hash);
+ get_resp->mtime = cpu_to_le64(obj->mtime);
+
+ /* write response header */
+ rc = cli_writeq(cli, get_resp, sizeof(*get_resp), cli_cb_free, get_resp);
+ if (rc) {
+ free(mem);
+ free(get_resp);
+ return true;
+ }
+
+ if (length) {
+ /* write response data */
+ rc = cli_writeq(cli, mem + (offset - aligned_ofs),
+ length, cli_cb_free, mem);
+ if (rc) {
+ free(mem);
+ free(get_resp); /* FIXME: double-free due to
+ cli_wq success? */
+ return true;
+ }
+ }
+
+ cli_in_end(cli);
+
+ return cli_write_start(cli);
+}
+
static void worker_cp_thr(struct worker_info *wi)
{
void *buf = NULL;
diff --git a/chunkd/server.c b/chunkd/server.c
index d3b593f..b816416 100644
--- a/chunkd/server.c
+++ b/chunkd/server.c
@@ -149,6 +149,10 @@ static struct {
[che_KeyExists] =
{ "che_KeyExists", 403,
"Key already exists" },
+
+ [che_InvalidSeek] =
+ { "che_InvalidSeek", 404,
+ "Invalid seek" },
};
void applog(int prio, const char *fmt, ...)
@@ -1063,6 +1067,7 @@ static const char *op2str(enum chunksrv_ops op)
case CHO_CHECK_STATUS: return "CHO_CHECK_STATUS";
case CHO_START_TLS: return "CHO_START_TLS";
case CHO_CP: return "CHO_CP";
+ case CHO_GET_PART: return "CHO_GET_PART";
default:
return "BUG/UNKNOWN!";
@@ -1151,6 +1156,9 @@ static bool cli_evt_exec_req(struct client *cli, unsigned int events)
case CHO_GET_META:
rcb = object_get(cli, false);
break;
+ case CHO_GET_PART:
+ rcb = object_get_part(cli);
+ break;
case CHO_PUT:
rcb = object_put(cli);
break;
@@ -1273,6 +1281,12 @@ static bool cli_evt_read_var(struct client *cli, unsigned int events)
cli->req_used = 0;
cli->state = evt_read_var;
cli->second_var = true;
+ } else if (cli->creq.op == CHO_GET_PART && !cli->second_var) {
+ cli->req_ptr = &cli->creq_getpart;
+ cli->var_len = sizeof(cli->creq_getpart);
+ cli->req_used = 0;
+ cli->state = evt_read_var;
+ cli->second_var = true;
} else
cli->state = evt_exec_req;
diff --git a/doc/chcli.8 b/doc/chcli.8
index 0de059b..3092fca 100644
--- a/doc/chcli.8
+++ b/doc/chcli.8
@@ -123,6 +123,10 @@ The following commands are available:
.B GET key
Retrieve the data object associated with the specified key.
.TP
+.B GETPART key offset length
+Retrieve a subset of the data object associated with the specified key,
+starting at given offset, for the given length.
+.TP
.B PUT key val
Store data object associated with the specified key.
.TP
diff --git a/include/chunk_msg.h b/include/chunk_msg.h
index 4c170e4..91ca1fb 100644
--- a/include/chunk_msg.h
+++ b/include/chunk_msg.h
@@ -31,6 +31,13 @@ enum {
CHD_SIG_SZ = 64,
};
+enum {
+ CHUNK_BLK_ORDER = 16, /* 64k blocks */
+ CHUNK_BLK_SZ = 1ULL << CHUNK_BLK_ORDER,
+ CHUNK_BLK_MASK = CHUNK_BLK_SZ - 1ULL,
+ CHUNK_MAX_GETPART = 4, /* max GET_PART req: 256k */
+};
+
enum chunksrv_ops {
CHO_NOP = 0, /* No-op (ping server) */
CHO_GET = 1, /* GET object */
@@ -50,6 +57,7 @@ enum chunksrv_ops {
CHO_START_TLS = 10, /* Encrypt all subsequent msgs */
CHO_CP = 11, /* local object copy (intra-table) */
+ CHO_GET_PART = 12, /* GET subset of object */
};
enum chunk_errcode {
@@ -64,12 +72,14 @@ enum chunk_errcode {
che_InvalidTable = 8,
che_Busy = 9,
che_KeyExists = 10,
+ che_InvalidSeek = 11,
};
enum chunk_flags {
CHF_SYNC = (1 << 0), /* force write to media */
CHF_TBL_CREAT = (1 << 1), /* create tbl, if needed */
CHF_TBL_EXCL = (1 << 2), /* fail, if tbl exists */
+ CHF_GET_PART_LAST = (1 << 3), /* true, if end-of-obj*/
};
struct chunksrv_req {
@@ -84,10 +94,15 @@ struct chunksrv_req {
/* variable-length key */
};
+struct chunksrv_req_getpart {
+ uint64_t offset; /* GET_PART offset */
+};
+
struct chunksrv_resp {
uint8_t magic[CHD_MAGIC_SZ]; /* CHUNKD_MAGIC */
uint8_t resp_code; /* chunk_errcode's */
- uint8_t rsv1[3];
+ uint8_t flags; /* CHF_xxx */
+ uint8_t rsv1[2];
uint32_t nonce; /* txn id, copied from request */
uint64_t data_len; /* len of addn'l data */
unsigned char hash[CHD_CSUM_SZ]; /* SHA1 checksum */
diff --git a/include/chunkc.h b/include/chunkc.h
index 1fd7066..e3c2bb7 100644
--- a/include/chunkc.h
+++ b/include/chunkc.h
@@ -51,7 +51,8 @@ struct st_client {
SSL_CTX *ssl_ctx;
SSL *ssl;
- char req_buf[sizeof(struct chunksrv_req) + CHD_KEY_SZ];
+ char req_buf[sizeof(struct chunksrv_req) + CHD_KEY_SZ +
+ sizeof(struct chunksrv_req_getpart)];
};
extern void stc_free(struct st_client *stc);
@@ -74,6 +75,19 @@ extern bool stc_get_start(struct st_client *stc, const void *key,
size_t key_len,int *pfd, uint64_t *len);
extern size_t stc_get_recv(struct st_client *stc, void *data, size_t len);
+extern bool stc_get_part(struct st_client *stc, const void *key, size_t key_len,
+ uint64_t offset, uint64_t max_len,
+ size_t (*write_cb)(void *, size_t, size_t, void *),
+ void *user_data);
+extern void *stc_get_part_inline(struct st_client *stc,
+ const void *key, size_t key_len,
+ uint64_t offset, uint64_t max_len,
+ size_t *len);
+extern bool stc_get_part_start(struct st_client *stc, const void *key,
+ size_t key_len,
+ uint64_t offset, uint64_t max_len,
+ int *pfd, uint64_t *len);
+
extern bool stc_put(struct st_client *stc, const void *key, size_t key_len,
size_t (*read_cb)(void *, size_t, size_t, void *),
uint64_t len, void *user_data, uint32_t flags);
@@ -113,6 +127,23 @@ static inline bool stc_get_startz(struct st_client *stc, const char *key,
return stc_get_start(stc, key, strlen(key) + 1, pfd, len);
}
+static inline void *stc_get_part_inlinez(struct st_client *stc,
+ const char *key,
+ uint64_t offset, uint64_t max_len,
+ size_t *len)
+{
+ return stc_get_part_inline(stc, key, strlen(key) + 1, offset, max_len,
+ len);
+}
+
+static inline bool stc_get_part_startz(struct st_client *stc, const char *key,
+ uint64_t offset, uint64_t max_len,
+ int *pfd, uint64_t *len)
+{
+ return stc_get_part_start(stc, key, strlen(key) + 1,
+ offset, max_len, pfd, len);
+}
+
static inline bool stc_put_inlinez(struct st_client *stc, const char *key,
void *data, uint64_t len, uint32_t flags)
{
diff --git a/lib/chunkdc.c b/lib/chunkdc.c
index 13f5d36..6ee9fc4 100644
--- a/lib/chunkdc.c
+++ b/lib/chunkdc.c
@@ -521,6 +521,131 @@ size_t stc_get_recv(struct st_client *stc, void *data, size_t data_len)
return done_cnt;
}
+/*
+ * Request the transfer in the chunk server.
+ */
+static bool stc_get_part_req(struct st_client *stc, const void *key,
+ uint64_t offset, uint64_t max_len,
+ size_t key_len, uint64_t *plen)
+{
+ struct chunksrv_resp_get get_resp;
+ struct chunksrv_req *req = (struct chunksrv_req *) stc->req_buf;
+ struct chunksrv_req_getpart gpr;
+
+ if (stc->verbose)
+ fprintf(stderr, "libstc: GET(%u)\n", (unsigned int) key_len);
+
+ if (!key_valid(key, key_len))
+ return false;
+
+ /* initialize request */
+ req_init(stc, req);
+ req->op = CHO_GET_PART;
+ req->data_len = cpu_to_le64(max_len);
+ req_set_key(req, key, key_len);
+
+ gpr.offset = cpu_to_le64(offset);
+ memcpy(stc->req_buf + sizeof(struct chunksrv_req) + key_len,
+ &gpr, sizeof(struct chunksrv_req_getpart));
+
+ /* sign request */
+ chreq_sign(req, stc->key, req->sig);
+
+ /* write request */
+ if (!net_write(stc, req, req_len(req)))
+ return false;
+
+ /* read response header */
+ if (!resp_read(stc, &get_resp.resp))
+ return false;
+
+ /* check response code */
+ if (get_resp.resp.resp_code != che_Success) {
+ if (stc->verbose)
+ fprintf(stderr, "GET resp code: %d\n", get_resp.resp.resp_code);
+ return false;
+ }
+
+ /* read rest of response header */
+ if (!net_read(stc, &get_resp.mtime,
+ sizeof(get_resp) - sizeof(get_resp.resp)))
+ return false;
+
+ *plen = le64_to_cpu(get_resp.resp.data_len);
+ return true;
+}
+
+bool stc_get_part(struct st_client *stc, const void *key, size_t key_len,
+ uint64_t offset, uint64_t max_len,
+ size_t (*write_cb)(void *, size_t, size_t, void *),
+ void *user_data)
+{
+ char netbuf[4096];
+ uint64_t content_len;
+
+ if (!stc_get_part_req(stc, key, key_len, offset, max_len, &content_len))
+ return false;
+
+ /* read response data */
+ while (content_len) {
+ size_t xfer_len;
+
+ xfer_len = MIN(content_len, sizeof(netbuf));
+ if (!net_read(stc, netbuf, xfer_len))
+ return false;
+
+ write_cb(netbuf, xfer_len, 1, user_data);
+ content_len -= xfer_len;
+ }
+
+ return true;
+}
+
+/*
+ * Set stc to be used for streaming transfers.
+ * In chunkd protocol, this delivers the size of the presumed object,
+ * and clients are expected to fetch exactly psize amount.
+ */
+bool stc_get_part_start(struct st_client *stc, const void *key, size_t key_len,
+ uint64_t offset, uint64_t max_len,
+ int *pfd, uint64_t *psize)
+{
+
+ if (!stc_get_part_req(stc, key, key_len, offset, max_len, psize))
+ return false;
+
+ *pfd = stc->fd;
+ return true;
+}
+
+void *stc_get_part_inline(struct st_client *stc, const void *key,
+ size_t key_len, uint64_t offset, uint64_t max_len,
+ size_t *len)
+{
+ bool rcb;
+ void *mem;
+ GByteArray *all_data;
+
+ all_data = g_byte_array_new();
+ if (!all_data)
+ return NULL;
+
+ rcb = stc_get_part(stc, key, key_len, offset, max_len,
+ all_data_cb, all_data);
+ if (!rcb) {
+ g_byte_array_free(all_data, TRUE);
+ return NULL;
+ }
+
+ if (len)
+ *len = all_data->len;
+
+ mem = all_data->data;
+
+ g_byte_array_free(all_data, FALSE);
+ return mem;
+}
+
bool stc_table_open(struct st_client *stc, const void *key, size_t key_len,
uint32_t flags)
{
diff --git a/lib/chunksrv.c b/lib/chunksrv.c
index a9bfc46..6bda870 100644
--- a/lib/chunksrv.c
+++ b/lib/chunksrv.c
@@ -32,6 +32,9 @@ size_t req_len(const struct chunksrv_req *req)
len = sizeof(struct chunksrv_req) + GUINT16_FROM_LE(req->key_len);
+ if (req->op == CHO_GET_PART)
+ len += sizeof(struct chunksrv_req_getpart);
+
return len;
}
diff --git a/tools/chcli.c b/tools/chcli.c
index 160af18..c310a06 100644
--- a/tools/chcli.c
+++ b/tools/chcli.c
@@ -94,6 +94,7 @@ enum chcli_cmd {
CHC_CHECKSTATUS,
CHC_CHECKSTART,
CHC_CP,
+ CHC_GET_PART,
};
struct chcli_host {
@@ -187,6 +188,7 @@ static void show_cmds(void)
"Supported chcli commands:\n"
"\n"
"GET key Retrieve key, send to output (def: stdout)\n"
+"GETPART key offset length Retrieve key subset, send to output (def: stdout)\n"
"PUT key val Store key\n"
"DEL key Delete key\n"
"PING Ping server\n"
@@ -341,6 +343,8 @@ static error_t parse_opt (int key, char *arg, struct argp_state *state)
cmd_mode = CHC_CHECKSTART;
else if (!strcasecmp(arg, "cp"))
cmd_mode = CHC_CP;
+ else if (!strcasecmp(arg, "getpart"))
+ cmd_mode = CHC_GET_PART;
else
argp_usage(state); /* invalid cmd */
break;
@@ -714,6 +718,102 @@ static int cmd_get(void)
return 0;
}
+static int cmd_get_part(void)
+{
+ struct st_client *stc;
+ int rfd = -1, wfd;
+ unsigned long long pofs = 0, plen = 0;
+ uint64_t ofs, len;
+
+ /* if key data not supplied via file, absorb first cmd arg */
+ if (!key_data) {
+ if (!n_cmd_args) {
+ fprintf(stderr, "GETPART requires key arg\n");
+ return 1;
+ }
+
+ key_data = cmd_args[0];
+ key_data_len = strlen(cmd_args[0]) + 1;
+
+ cmd_args++;
+ n_cmd_args--;
+ }
+
+ /* parse offset, length required args */
+ if (n_cmd_args != 2) {
+ fprintf(stderr, "GETPART requires offset, length args\n");
+ return 1;
+ }
+ if ((sscanf(cmd_args[0], "%llu", &pofs) != 1) ||
+ (sscanf(cmd_args[1], "%llu", &plen) != 1)) {
+ fprintf(stderr, "invalid GETPART offset and/or length arg\n");
+ return 1;
+ }
+ ofs = pofs;
+ len = plen;
+
+ if (key_data_len < 1 || key_data_len > CHD_KEY_SZ) {
+ fprintf(stderr, "GET: invalid key size %u\n",
+ (unsigned int) key_data_len);
+ return 1;
+ }
+
+ stc = chcli_stc_new();
+ if (!stc)
+ return 1;
+
+ if (!stc_get_part_start(stc, key_data, key_data_len,
+ ofs, len, &rfd, &get_len)) {
+ fprintf(stderr, "GET initiation failed\n");
+ stc_free(stc);
+ return 1;
+ }
+
+ if (!output_fn || !strcmp(output_fn, "-"))
+ wfd = STDOUT_FILENO;
+ else {
+ wfd = open(output_fn, O_CREAT | O_TRUNC | O_WRONLY, 0666);
+ if (wfd < 0) {
+ fprintf(stderr, "GET output file %s open failed: %s\n",
+ output_fn,
+ strerror(errno));
+ stc_free(stc);
+ return 1;
+ }
+ }
+
+ while (get_len > 0) {
+ size_t need_len;
+ ssize_t rc;
+
+ need_len = MIN(GET_BUFSZ, get_len);
+
+ if (!recv_buf(stc, rfd, get_buf, need_len)) {
+ fprintf(stderr, "GET buffer failed\n");
+ stc_free(stc);
+ return 1;
+ }
+
+ rc = write(wfd, get_buf, need_len);
+ if (rc < 0) {
+ fprintf(stderr, "GET write to output failed: %s\n",
+ strerror(errno));
+ unlink(output_fn);
+ stc_free(stc);
+ return 1;
+ }
+
+ get_len -= rc;
+ }
+
+ if (wfd != STDOUT_FILENO)
+ close(wfd);
+
+ stc_free(stc);
+
+ return 0;
+}
+
static int cmd_check_start(void)
{
struct st_client *stc;
@@ -830,6 +930,8 @@ int main (int argc, char *argv[])
return 1;
case CHC_GET:
return cmd_get();
+ case CHC_GET_PART:
+ return cmd_get_part();
case CHC_PUT:
return cmd_put();
case CHC_DEL:
^ permalink raw reply related [flat|nested] 5+ messages in thread