#define _XOPEN_SOURCE 500 /* Allows pread/pwrite prototypes */ #define _FILE_OFFSET_BITS 64 /* * * Copyright (c) 2003 Patricia Jewell Nance * * Permission is hereby granted, free of charge, to any person obtaining a * copy of this software and associated documentation files (the "Software"), * to deal in the Software without restriction, including without limitation * the rights to use, copy, modify, merge, publish, distribute, sublicense, * and/or sell copies of the Software, and to permit persons to whom the * Software is furnished to do so, subject to the following conditions: * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL * THE X CONSORTIUM BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, * WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF * OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE * SOFTWARE. * * Except as contained in this notice, the name of Patricia Nance shall not be * used in advertising or otherwise to promote the sale, use or other dealings * in this Software without prior written authorization from her. * */ #ifndef EXSORT_H #define EXSORT_H #include #include /* exsort() sorts a disk file. The algorithm used is an external merge * sort, and is optimized for sorting extreamly large files, specifically * files much larger than the physical memory of the machine. External * sorting (sorting files too large to hold in memory) is covered fairly * well in the literature (see Sedgwigk). This implementation differs * from most common implementations in two important ways: * * 1 - Care has been taken to maximize utilization of the operating systems * buffer cache. Many external sorting algorithms were developed for * sorting data stored on magnetic tape. Tape streams well and rewinds * poorly, so these algorithms read the tapes sequentially from beginning * to end. The same algorithms will work when implemented on top of * disk files rather than tape drives. However, since disk files support * efficient seeking, the algorithms can be redesigned to run faster by * allowing more seeks. * * 2 - This implementation supports sorting variable sized records. All * descriptions of external sorting (and most of sorting in general) * that I am aware of assume that the records being sorted are all * of the same size. This is often an inconvienient assumption. This * implementation supports sorting files with variable sized records. * To see why this might be useful, remember that alphabatizing a list * of words requires sorting a list of variable sized records. * * The algroithm is I/O bound. Its running time should be proportional * to the log of the number of records because the file must be read * log2(num_records) times. This number can be reduced by writing the * file in sorted chunks rather than in a compleatly random order. The * blksize paramater is used to specify the chunksize. The chunksize * is given in units of records NOT BYTES. All the chunks must have * an equal number of records, except for the last, which may be smaller. */ int exsort( /* File descriptor referencing the file to be sorted. Open for * Read/Write on a seekable file */ int fd, /* File descriptor referencing a scratch file for use by the sorting * algorithm. The contents of this file will be destroyed and it will * grow in size to 1/2 the size of the file being sorted. This descriptor * must be open Read/Write and be seekable */ int sfd, /* If file has been presorted into blocks of N sorted * records, the blocksize goes here. Note that any file * can be considered to have been presorted into blocks * of 1 sorted record, so 1 will always work. However, * significant speed improvements are possible if the * file has been sorted into larger blocks */ size_t blksize, /* The comparison function. Returns 1 if rec1>rect2, -1 if * rec2>rec1, and 0 if rec1==rec2. Token is the token passed in as * the last argument to exsort */ int (*cmp)(void *token, void *rec1, void *rec2), /* The size function. Returns the size of the record pointed to * by buff IFF the size is less than or equal to len. If the size * exceeds len, then this function returns 0 */ size_t (*size)(void *token, void *buff, size_t len), /* User supplied data. This pointer is passed as the token argument * to the size() and cmp() function pointers, and may be utilized * for any purpose by those functions. It is not dereferenced by * any exsort code */ void *token ); #endif /* End of exsort.h */ #define _XOPEN_SOURCE 500 /* Allows pread/pwrite prototypes */ #if 1 # define JLN_DEBUG(x) printf x #else # define JLN_DEBUG(x) #endif #include #include #include #include #include #include #include #include /* #include "exsort.h" */ static int write_at(int fd, char *buf, size_t count, off_t start) { size_t off = 0; do { ssize_t bw = pwrite(fd, buf+off, count-off, start+off); if(bw>0) { off += bw; } else { /* Some sort of write error occured */ if(bw==0 || errno!=EINTR) { return -1; } } } while(off0) { char buf[8192]; size_t rs = bleft>sizeof(buf)? sizeof(buf) : bleft; ssize_t br = pread(s_fd, buf, rs, roff); if(br>0) { if(write_at(d_fd, buf, br, woff)) { return -1; } /* If we made it here both the read and the write moved br bytes of * data. This is a good place to update our counters and offsets */ roff += br; woff += br; bleft -= br; } else { /* Some sort of read problem occured */ if(br==0 || (errno!=EINTR)) { return -1; } } } return 0; } struct IObuf { int ifd; char *buf; size_t length; size_t used; off_t off; }; struct MergeInfo { off_t left; size_t s; ssize_t off; }; struct SortBlock { off_t start; off_t length; off_t records; }; struct ExsortInfo { int (*cmp)(void *token, void *rec1, void *rec2); size_t (*size)(void *token, void *buff, size_t len); void *token; }; static void bind_buffer(int fd, struct IObuf *buf) { buf->ifd = fd; buf->used = 0; buf->off = 0; } static void seek_buffer(off_t start_pos, struct IObuf *buf) { buf->off = start_pos; } static void shift_buffer(size_t count, struct IObuf *buf) { if(count>=buf->used) { buf->used = 0; } else { size_t nused = buf->used - count; memmove(buf->buf, buf->buf+count, nused); buf->used = nused; } buf->off += count; } static ssize_t fill_buffer(struct IObuf *buf) { ssize_t rv; do { rv = pread(buf->ifd, buf->buf, buf->length, buf->off); } while(rv==-1 && errno==EINTR); buf->used = rv; return rv; } static ssize_t append_buffer(struct IObuf *buf) { size_t delta = buf->used; ssize_t rv; do { rv = pread(buf->ifd, buf->buf+delta, buf->length-delta, buf->off+delta); } while(rv==-1 && errno==EINTR); buf->used += rv; return rv; } static int double_buffer(struct IObuf *buf) { buf->length = buf->length? 2*buf->length : 4096; buf->buf = realloc(buf->buf, buf->length); return buf->buf? 0 : -1; } static void free_buffer(struct IObuf *buf) { if(!buf->buf) return; free(buf->buf); buf->buf = 0; } /* Originally I had ideas about making this function merge two chunks rather * than just determining how large a chunk was. We could save a read pass * through the file that way. However trying to combine the merge logic * along with the buffer resizing, EOF detection, and error handling makes * the logic of the code very difficult to follow. And after I got it * implemented it actually seemed to be slower than the original which * just looked for the chunk. While I a convinced that it is in theory * possible to make the code faster by merging two chunks here, I dont * feel like the gain is worth the pain of the complexity. */ static int find_chunk( struct SortBlock *s, int fd, int *hit_eof, off_t r_off, size_t blksize, struct IObuf *topbuf, struct IObuf *botbuf, struct ExsortInfo *info ) { size_t i; ssize_t r; ssize_t boff; off_t w_off = 0; s->start = r_off; s->length = 0; s->records = 0; boff = 0; bind_buffer(fd, botbuf); seek_buffer(r_off, botbuf); for(i=0;;) { ssize_t left = botbuf->used - boff; if(left>0) { char *buff = botbuf->buf + boff; size_t s = info->size(info->token, buff, left); if(s>0 && s<=left) { boff += s; if(++i>=blksize) { /* This is the last block, make sure to flush */ assert(boff<=botbuf->used); shift_buffer(boff, botbuf); w_off += boff; break; /* We are done with the bottom */ } continue; /* Skip the buffer filling code below */ } } /* If we get here it means we need to read a record which is not * fully in the buffer. This could either happen because the * buffer is too small, or because the buffer needs to be flushed * and refilled */ if(boff==0 && botbuf->length==botbuf->used) { /* The buffer is too small, we must make it larger. We grow both * topbuf and botbuf because latter we want to assue that any * record will fit into either buffer */ if(double_buffer(topbuf)) return -1; if(double_buffer(botbuf)) return -1; r = append_buffer(botbuf); if(r<1) { /* We have a partial record in the buffer, thus we should be * able to read the rest of the record. If the read failed * it is a fatal error */ return -1; } } else { /* The buffer has old data in it. Flush and refill and try again */ if(boff) { assert(boff<=botbuf->used); shift_buffer(boff, botbuf); w_off += boff; boff = 0; } r = append_buffer(botbuf); if(r<1) { if(r==0) { /* There is no more data, we hit EOF */ *hit_eof = 1; s->length = w_off; s->records = i; return 0; } else { return -1; /* Some sort of fatal error */ } } } } s->length = w_off; s->records = blksize; return 0; } static int write_vec(int fd, struct iovec *iov, size_t count) { for(;;) { ssize_t wb = writev(fd, iov, count); if(wb>0) return 0; if(wb!=-1 || errno!=EINTR) { return -1; } } } static int merge_adjacent( int fd, int sfd, struct SortBlock *tblock, struct SortBlock *bblock, struct IObuf *topbuf, struct IObuf *botbuf, struct ExsortInfo *info) { struct iovec iov[4096]; const size_t nio = sizeof(iov)/sizeof(iov[0]); size_t idx; off_t pos; struct MergeInfo binfo, tinfo, *inf; binfo.left = bblock->records; tinfo.left = tblock->records; /* Copy the bottom block into the scratch file */ if(move_bytes(fd, bblock->start, sfd, 0, bblock->length)) { return -1; } /* Position the file descriptor for the writev calls */ if(lseek(fd, bblock->start, SEEK_SET)==-1) { return -1; } /* Do the merging */ idx = 0; bind_buffer(sfd, botbuf); seek_buffer(0, botbuf); bind_buffer(fd, topbuf); seek_buffer(tblock->start, topbuf); tinfo.off = binfo.off = 0; pos = bblock->start; /* We know that the buffers are large enough to hold the largest record. * thus we can be sure that they contain at least 1 record. */ if(fill_buffer(botbuf)<1) return -1; if(fill_buffer(topbuf)<1) return -1; assert(tinfo.left>0 && binfo.left>0); tinfo.s = info->size(info->token, topbuf->buf, topbuf->used - tinfo.off); binfo.s = info->size(info->token, botbuf->buf, botbuf->used - binfo.off); if(tinfo.left>0 && binfo.left>0) for(;;) { struct IObuf *iobuf; size_t left; char *record; char *trecord = topbuf->buf + tinfo.off; char *brecord = botbuf->buf + binfo.off; int cval = info->cmp(info->token, brecord, trecord); if(cval<=0) { /* Get record from bottom */ inf = &binfo; record = brecord; iobuf = botbuf; } else { /* Get record from top */ inf = &tinfo; record = trecord; iobuf = topbuf; } left = iobuf->used - inf->off; inf->off += inf->s; if(idx>0 && ((char*)iov[idx-1].iov_base)+iov[idx-1].iov_len == record) { iov[idx-1].iov_len += inf->s; } else { iov[idx].iov_base = record; iov[idx].iov_len = inf->s; ++idx; } /* If there are reasons to flush our iovectors to disk, we set inf->s * to zero, which will cause the flush block to be entered below. Note * that the way things are set up we only call info->size() once. Either * here or at the bottom of the flush block */ if(--inf->left<1 || left<=inf->s || idx+1>nio) { inf->s = 0; } else { inf->s = info->size(info->token, record+inf->s, left-inf->s); } if(inf->s<1) { /* This is the flush block. We have an iovec with data that needs to * be written to disk. This is where we dump it. */ if(write_vec(fd, iov, idx)) { return -1; } idx = 0; if(binfo.off) { shift_buffer(binfo.off, botbuf); pos += binfo.off; binfo.off = 0; } if(tinfo.off) { shift_buffer(tinfo.off, topbuf); pos += tinfo.off; tinfo.off = 0; } if(inf->left<1) { /* This is the only non-error exit from the for(;;) loop. If we are * here one of the two blocks we are merging has become empty. */ break; } /* Refill the buffer */ if(append_buffer(iobuf)<1) { return -1; } inf->s = info->size(info->token, iobuf->buf, iobuf->used - inf->off); } } if(tinfo.left) { /* We do not have to do anything for this case. The records are * already in the correct positions */ } else if(binfo.left) { /* Move bytes from scratch file back into regular file */ off_t len = bblock->length - botbuf->off; if(move_bytes(sfd, botbuf->off, fd, pos, len)) { return -1; } } bblock->records += tblock->records; bblock->length += tblock->length; return 0; } static int exsort_real( int fd, int sfd, size_t blksize, struct ExsortInfo *info, struct IObuf *topbuf, struct IObuf *botbuf ) { struct SortBlock stack[64]; int sp = 0; int hit_eof = 0; off_t r_off = 0; int Scount = 0; for(;;) { ++Scount; if(hit_eof && sp==1) { /* We are done!! */ return 0; } else if(hit_eof || (sp>1 && 2*stack[sp-1].records>stack[sp-2].records)) { /* Merge stack[sp-1] and stack[sp-2] */ struct SortBlock *tb = &stack[sp-1]; struct SortBlock *bb = &stack[sp-2]; JLN_DEBUG(("Merge: depth=%d records/size %lld/%lld and %lld/%lld\n", sp-1, (long long)tb->records, (long long)tb->length, (long long) bb->records, (long long)bb->length)); if(merge_adjacent(fd, sfd, tb, bb, topbuf, botbuf, info)) { return -1; } --sp; } else { /* Scan the input file looking for a new chunk */ struct SortBlock *s = &stack[sp]; int rv; JLN_DEBUG(("Search: Looking for new data at offset %lld\n", (long long) r_off)); rv = find_chunk(s, fd, &hit_eof, r_off, blksize, topbuf, botbuf, info); if(rv<0) { return -1; /* error */ } if(s->records>0) { ++sp; r_off = s->start + s->length; } } #if 0 printf("Scount = %d ", Scount); fflush(0); system("md5sum testfile"); if(Scount==61) exit(1); #endif } } int exsort( int fd, int sfd, size_t blksize, int (*cmp)(void *token, void *rec1, void *rec2), size_t (*size)(void *token, void *buff, size_t len), void *token ) { struct ExsortInfo info = {cmp, size, token}; struct IObuf topbuf = {0}; struct IObuf botbuf = {0}; int rv = -1; if(!double_buffer(&topbuf) && !double_buffer(&botbuf)) { rv = exsort_real(fd, sfd, blksize, &info, &topbuf, &botbuf); } free_buffer(&botbuf); free_buffer(&topbuf); return rv; } /* End of exsort.c */ /* Start of test.c. Test program for exsort */ #include #include #include #include #include #include #include /* #include "exsort.h" */ static void die(char *fmt, ...) { va_list ap; va_start(ap, fmt); vfprintf(stderr, fmt, ap); va_end(ap); exit(-1); } /* Creates a test file to be sorted. The file contains max records, and * they are blocked into chunks of csize sorted records. */ static int create_test_file(const char *fname, size_t max, size_t csize) { int dmax; size_t ctmp; size_t i; FILE *fp = fopen(fname, "w"); if(!fp) return -1; for(dmax=0, ctmp=csize; ctmp>0; ctmp /= 26) ++dmax; /* We divide i by 2 in the loop below so that we generate duplicate entries */ for(i=0; i=0; --pos) { buf[pos]='a' + ctmp%26; ctmp /= 26; } buf[dmax+1] = '\n'; buf[dmax+2] = '\0'; if(fputs(buf, fp)<0) { return -1; } } if(fflush(fp)) { return -1; } fsync(fileno(fp)); if(fclose(fp)<0) { return -1; } return 0; } struct prof { size_t cmp; size_t size; }; static int str_cmp(void *token, void *rec1, void *rec2) { signed char *r1 = (char*) rec1; signed char *r2 = (char*) rec2; ((struct prof*)token)->cmp++; for(;;++r1, ++r2) { int del = *r2; /* Stop the comparison when the first whitespace character is reached * OR when the end of the line is reached */ if(del=='\n' || del==' ' || del=='\t') { if(*r1=='\n' || *r1==' ' || *r1=='\t') return 0; return -1; } del -= *r1; if(del) { if(*r1=='\n' || *r1==' ' || *r1=='\t') return 1; return del>0? -1 : 1; } } } static size_t str_size(void *token, void *buff, size_t len) { char *cp = (char*) buff; size_t i; ((struct prof*)token)->size++; for(i=0; i1) { size_t s = strtol(av[1], 0, 0); if(s>0) { nele = s; } if(ac>2) { s = strtol(av[2], 0, 0); if(s>0) { csize = s; } } } strcpy(tname, fname); strcat(tname, ".tmp"); printf("Creating file with %ld elements\n", (long)nele); if(create_test_file(fname, nele, csize)) { die("Failed to create test file\n"); } if(stat(fname, &sbuf)==0) { printf("File size is %lld bytes\n", (long long)sbuf.st_size); } fd = open(fname, O_RDWR, 0); if(fd==-1) die("Can not open %s\n", fname); tfd = open(tname, O_RDWR | O_CREAT | O_TRUNC, 0660); if(tfd==-1) die("Can not open %s\n", tname); gettimeofday(&tb, 0); if(exsort(fd, tfd, csize, str_cmp, str_size, &prof)) { die("exsort failed. Errno = %d\n", errno); } gettimeofday(&te, 0); delt = te.tv_sec - tb.tv_sec + 1e-6*(te.tv_usec - tb.tv_usec); printf("Sort of %d elements took %8.6f seconds\n", nele, delt); printf("Profile: %u cmp() calls, %u size() calls\n", prof.cmp, prof.size); return 0; }