$treeview $search $mathjax $extrastylesheet
librsync
2.3.1
$projectbrief
|
$projectbrief
|
$searchbox |
00001 /*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*- 00002 * 00003 * librsync -- library for network deltas 00004 * 00005 * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net> 00006 * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au> 00007 * 00008 * This program is free software; you can redistribute it and/or modify 00009 * it under the terms of the GNU Lesser General Public License as published by 00010 * the Free Software Foundation; either version 2.1 of the License, or 00011 * (at your option) any later version. 00012 * 00013 * This program is distributed in the hope that it will be useful, 00014 * but WITHOUT ANY WARRANTY; without even the implied warranty of 00015 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 00016 * GNU Lesser General Public License for more details. 00017 * 00018 * You should have received a copy of the GNU Lesser General Public License 00019 * along with this program; if not, write to the Free Software 00020 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA. 00021 */ 00022 00023 /*= 00024 | Let's climb to the TOP of that 00025 | MOUNTAIN and think about STRIP 00026 | MINING!! 00027 */ 00028 00029 /** \file delta.c 00030 * Generate in streaming mode an rsync delta given a set of signatures, and a 00031 * new file. 00032 * 00033 * The size of blocks for signature generation is determined by the block size 00034 * in the incoming signature. 00035 * 00036 * To calculate a signature, we need to be able to see at least one block of 00037 * the new file at a time. Once we have that, we calculate its weak signature, 00038 * and see if there is any block in the signature hash table that has the same 00039 * weak sum. If there is one, then we also compute the strong sum of the new 00040 * block, and cross check that. If they're the same, then we can assume we have 00041 * a match. 00042 * 00043 * The final block of the file has to be handled a little differently, because 00044 * it may be a short match. Short blocks in the signature don't include their 00045 * length -- we just allow for the final short block of the file to match any 00046 * block in the signature, and if they have the same checksum we assume they 00047 * must have the same length. Therefore, when we emit a COPY command, we have 00048 * to send it with a length that is the same as the block matched, and not the 00049 * block length from the signature. 00050 * 00051 * Profiling results as of v1.26, 2001-03-18: 00052 * 00053 * If everything matches, then we spend almost all our time in rs_mdfour64 and 00054 * rs_weak_sum, which is unavoidable and therefore a good profile. 00055 * 00056 * If nothing matches, it is not so good. 00057 * 00058 * 2002-06-26: Donovan Baarda 00059 * 00060 * The following is based entirely on pysync. It is much cleaner than the 00061 * previous incarnation of this code. It is slightly complicated because in 00062 * this case the output can block, so the main delta loop needs to stop when 00063 * this happens. 00064 * 00065 * In pysync a 'last' attribute is used to hold the last miss or match for 00066 * extending if possible. In this code, basis_len and scoop_pos are used 00067 * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0 00068 * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie, 00069 * nothing). 00070 * 00071 * Pysync is also slightly different in that a 'flush' method is available to 00072 * force output of accumulated data. This 'flush' is use to finalise delta 00073 * calculation. In librsync input is terminated with an eof flag on the input 00074 * stream. I have structured this code similar to pysync with a seperate flush 00075 * function that is used when eof is reached. This allows for a flush style API 00076 * if one is ever needed. Note that flush in pysync can be used for more than 00077 * just terminating delta calculation, so a flush based API can in some ways be 00078 * more flexible... 00079 * 00080 * The input data is first scanned, then processed. Scanning identifies input 00081 * data as misses or matches, and emits the instruction stream. Processing the 00082 * data consumes it off the input scoop and outputs the processed miss data 00083 * into the tube. 00084 * 00085 * The scoop contains all data yet to be processed. The scoop_pos is an index 00086 * into the scoop that indicates the point scanned to. As data is scanned, 00087 * scoop_pos is incremented. As data is processed, it is removed from the scoop 00088 * and scoop_pos adjusted. Everything gets complicated because the tube can 00089 * block. When the tube is blocked, no data can be processed. */ 00090 00091 #include "config.h" 00092 #include <assert.h> 00093 #include <stdlib.h> 00094 #include "librsync.h" 00095 #include "job.h" 00096 #include "sumset.h" 00097 #include "checksum.h" 00098 #include "stream.h" 00099 #include "emit.h" 00100 #include "trace.h" 00101 00102 static rs_result rs_delta_s_scan(rs_job_t *job); 00103 static rs_result rs_delta_s_flush(rs_job_t *job); 00104 static rs_result rs_delta_s_end(rs_job_t *job); 00105 static inline void rs_getinput(rs_job_t *job); 00106 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, 00107 size_t *match_len); 00108 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, 00109 size_t match_len); 00110 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len); 00111 static inline rs_result rs_appendflush(rs_job_t *job); 00112 static inline rs_result rs_processmatch(rs_job_t *job); 00113 static inline rs_result rs_processmiss(rs_job_t *job); 00114 00115 /** Get a block of data if possible, and see if it matches. 00116 * 00117 * On each call, we try to process all of the input data available on the scoop 00118 * and input buffer. */ 00119 static rs_result rs_delta_s_scan(rs_job_t *job) 00120 { 00121 const size_t block_len = job->signature->block_len; 00122 rs_long_t match_pos; 00123 size_t match_len; 00124 rs_result result; 00125 00126 rs_job_check(job); 00127 /* read the input into the scoop */ 00128 rs_getinput(job); 00129 /* output any pending output from the tube */ 00130 result = rs_tube_catchup(job); 00131 /* while output is not blocked and there is a block of data */ 00132 while ((result == RS_DONE) 00133 && ((job->scoop_pos + block_len) < job->scoop_avail)) { 00134 /* check if this block matches */ 00135 if (rs_findmatch(job, &match_pos, &match_len)) { 00136 /* append the match and reset the weak_sum */ 00137 result = rs_appendmatch(job, match_pos, match_len); 00138 weaksum_reset(&job->weak_sum); 00139 } else { 00140 /* rotate the weak_sum and append the miss byte */ 00141 weaksum_rotate(&job->weak_sum, job->scoop_next[job->scoop_pos], 00142 job->scoop_next[job->scoop_pos + block_len]); 00143 result = rs_appendmiss(job, 1); 00144 } 00145 } 00146 /* if we completed OK */ 00147 if (result == RS_DONE) { 00148 /* if we reached eof, we can flush the last fragment */ 00149 if (job->stream->eof_in) { 00150 job->statefn = rs_delta_s_flush; 00151 return RS_RUNNING; 00152 } else { 00153 /* we are blocked waiting for more data */ 00154 return RS_BLOCKED; 00155 } 00156 } 00157 return result; 00158 } 00159 00160 static rs_result rs_delta_s_flush(rs_job_t *job) 00161 { 00162 rs_long_t match_pos; 00163 size_t match_len; 00164 rs_result result; 00165 00166 rs_job_check(job); 00167 /* read the input into the scoop */ 00168 rs_getinput(job); 00169 /* output any pending output */ 00170 result = rs_tube_catchup(job); 00171 /* while output is not blocked and there is any remaining data */ 00172 while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) { 00173 /* check if this block matches */ 00174 if (rs_findmatch(job, &match_pos, &match_len)) { 00175 /* append the match and reset the weak_sum */ 00176 result = rs_appendmatch(job, match_pos, match_len); 00177 weaksum_reset(&job->weak_sum); 00178 } else { 00179 /* rollout from weak_sum and append the miss byte */ 00180 weaksum_rollout(&job->weak_sum, job->scoop_next[job->scoop_pos]); 00181 rs_trace("block reduced to " FMT_SIZE "", 00182 weaksum_count(&job->weak_sum)); 00183 result = rs_appendmiss(job, 1); 00184 } 00185 } 00186 /* if we are not blocked, flush and set end statefn. */ 00187 if (result == RS_DONE) { 00188 result = rs_appendflush(job); 00189 job->statefn = rs_delta_s_end; 00190 } 00191 if (result == RS_DONE) { 00192 return RS_RUNNING; 00193 } 00194 return result; 00195 } 00196 00197 static rs_result rs_delta_s_end(rs_job_t *job) 00198 { 00199 rs_emit_end_cmd(job); 00200 return RS_DONE; 00201 } 00202 00203 static inline void rs_getinput(rs_job_t *job) 00204 { 00205 size_t len; 00206 00207 len = rs_scoop_total_avail(job); 00208 if (job->scoop_avail < len) { 00209 rs_scoop_input(job, len); 00210 } 00211 } 00212 00213 /** find a match at scoop_pos, returning the match_pos and match_len. 00214 * 00215 * Note that this will calculate weak_sum if required. It will also determine 00216 * the match_len. 00217 * 00218 * This routine could be modified to do xdelta style matches that would extend 00219 * matches past block boundaries by matching backwards and forwards beyond the 00220 * block boundaries. Extending backwards would require decrementing scoop_pos 00221 * as appropriate. */ 00222 static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, 00223 size_t *match_len) 00224 { 00225 const size_t block_len = job->signature->block_len; 00226 00227 /* calculate the weak_sum if we don't have one */ 00228 if (weaksum_count(&job->weak_sum) == 0) { 00229 /* set match_len to min(block_len, scan_avail) */ 00230 *match_len = job->scoop_avail - job->scoop_pos; 00231 if (*match_len > block_len) { 00232 *match_len = block_len; 00233 } 00234 /* Update the weak_sum */ 00235 weaksum_update(&job->weak_sum, job->scoop_next + job->scoop_pos, 00236 *match_len); 00237 rs_trace("calculate weak sum from scratch length " FMT_SIZE "", 00238 weaksum_count(&job->weak_sum)); 00239 } else { 00240 /* set the match_len to the weak_sum count */ 00241 *match_len = weaksum_count(&job->weak_sum); 00242 } 00243 *match_pos = 00244 rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum), 00245 job->scoop_next + job->scoop_pos, *match_len); 00246 return *match_pos != -1; 00247 } 00248 00249 /** Append a match at match_pos of length match_len to the delta, extending a 00250 * previous match if possible, or flushing any previous miss/match. */ 00251 static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, 00252 size_t match_len) 00253 { 00254 rs_result result = RS_DONE; 00255 00256 /* if last was a match that can be extended, extend it */ 00257 if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) { 00258 job->basis_len += match_len; 00259 } else { 00260 /* else appendflush the last value */ 00261 result = rs_appendflush(job); 00262 /* make this the new match value */ 00263 job->basis_pos = match_pos; 00264 job->basis_len = match_len; 00265 } 00266 /* increment scoop_pos to point at next unscanned data */ 00267 job->scoop_pos += match_len; 00268 /* we can only process from the scoop if output is not blocked */ 00269 if (result == RS_DONE) { 00270 /* process the match data off the scoop */ 00271 result = rs_processmatch(job); 00272 } 00273 return result; 00274 } 00275 00276 /** Append a miss of length miss_len to the delta, extending a previous miss 00277 * if possible, or flushing any previous match. 00278 * 00279 * This also breaks misses up into 32KB segments to avoid accumulating too much 00280 * in memory. */ 00281 static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len) 00282 { 00283 const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */ 00284 rs_result result = RS_DONE; 00285 00286 /* If last was a match, or max_miss misses, appendflush it. */ 00287 if (job->basis_len || (job->scoop_pos >= max_miss)) { 00288 result = rs_appendflush(job); 00289 } 00290 /* increment scoop_pos */ 00291 job->scoop_pos += miss_len; 00292 return result; 00293 } 00294 00295 /** Flush any accumulating hit or miss, appending it to the delta. */ 00296 static inline rs_result rs_appendflush(rs_job_t *job) 00297 { 00298 /* if last is a match, emit it and reset last by resetting basis_len */ 00299 if (job->basis_len) { 00300 rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len, 00301 job->basis_pos); 00302 rs_emit_copy_cmd(job, job->basis_pos, job->basis_len); 00303 job->basis_len = 0; 00304 return rs_processmatch(job); 00305 /* else if last is a miss, emit and process it */ 00306 } else if (job->scoop_pos) { 00307 rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos); 00308 rs_emit_literal_cmd(job, job->scoop_pos); 00309 return rs_processmiss(job); 00310 } 00311 /* otherwise, nothing to flush so we are done */ 00312 return RS_DONE; 00313 } 00314 00315 /** Process matching data in the scoop. 00316 * 00317 * The scoop contains match data at scoop_next of length scoop_pos. This 00318 * function processes that match data, returning RS_DONE if it completes, or 00319 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to 00320 * still point at the next unscanned data. 00321 * 00322 * This function currently just removes data from the scoop and adjusts 00323 * scoop_pos appropriately. In the future this could be used for something like 00324 * context compressing of miss data. Note that it also calls rs_tube_catchup to 00325 * output any pending output. */ 00326 static inline rs_result rs_processmatch(rs_job_t *job) 00327 { 00328 job->scoop_avail -= job->scoop_pos; 00329 job->scoop_next += job->scoop_pos; 00330 job->scoop_pos = 0; 00331 return rs_tube_catchup(job); 00332 } 00333 00334 /** Process miss data in the scoop. 00335 * 00336 * The scoop contains miss data at scoop_next of length scoop_pos. This 00337 * function processes that miss data, returning RS_DONE if it completes, or 00338 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to 00339 * still point at the next unscanned data. 00340 * 00341 * This function uses rs_tube_copy to queue copying from the scoop into output. 00342 * and uses rs_tube_catchup to do the copying. This automaticly removes data 00343 * from the scoop, but this can block. While rs_tube_catchup is blocked, 00344 * scoop_pos does not point at legit data, so scanning can also not proceed. 00345 * 00346 * In the future this could do compression of miss data before outputing it. */ 00347 static inline rs_result rs_processmiss(rs_job_t *job) 00348 { 00349 rs_tube_copy(job, job->scoop_pos); 00350 job->scoop_pos = 0; 00351 return rs_tube_catchup(job); 00352 } 00353 00354 /** State function that does a slack delta containing only literal data to 00355 * recreate the input. */ 00356 static rs_result rs_delta_s_slack(rs_job_t *job) 00357 { 00358 rs_buffers_t *const stream = job->stream; 00359 size_t avail = stream->avail_in; 00360 00361 if (avail) { 00362 rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail); 00363 rs_emit_literal_cmd(job, avail); 00364 rs_tube_copy(job, avail); 00365 return RS_RUNNING; 00366 } else if (rs_job_input_is_ending(job)) { 00367 job->statefn = rs_delta_s_end; 00368 return RS_RUNNING; 00369 } 00370 return RS_BLOCKED; 00371 } 00372 00373 /** State function for writing out the header of the encoding job. */ 00374 static rs_result rs_delta_s_header(rs_job_t *job) 00375 { 00376 rs_emit_delta_header(job); 00377 if (job->signature) { 00378 job->statefn = rs_delta_s_scan; 00379 } else { 00380 rs_trace("no signature provided for delta, using slack deltas"); 00381 job->statefn = rs_delta_s_slack; 00382 } 00383 return RS_RUNNING; 00384 } 00385 00386 rs_job_t *rs_delta_begin(rs_signature_t *sig) 00387 { 00388 rs_job_t *job; 00389 00390 job = rs_job_new("delta", rs_delta_s_header); 00391 /* Caller can pass NULL sig or empty sig for "slack deltas". */ 00392 if (sig && sig->count > 0) { 00393 rs_signature_check(sig); 00394 /* Caller must have called rs_build_hash_table() by now. */ 00395 assert(sig->hashtable); 00396 job->signature = sig; 00397 weaksum_init(&job->weak_sum, rs_signature_weaksum_kind(sig)); 00398 } 00399 return job; 00400 }