librsync  2.3.2
delta.c
Go to the documentation of this file.
1/*= -*- c-basic-offset: 4; indent-tabs-mode: nil; -*-
2 *
3 * librsync -- library for network deltas
4 *
5 * Copyright (C) 2000, 2001 by Martin Pool <mbp@sourcefrog.net>
6 * Copyright (C) 2003 by Donovan Baarda <abo@minkirri.apana.org.au>
7 *
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU Lesser General Public License as published by
10 * the Free Software Foundation; either version 2.1 of the License, or
11 * (at your option) any later version.
12 *
13 * This program is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU Lesser General Public License for more details.
17 *
18 * You should have received a copy of the GNU Lesser General Public License
19 * along with this program; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21 */
22
23 /*=
24 | Let's climb to the TOP of that
25 | MOUNTAIN and think about STRIP
26 | MINING!!
27 */
28
29/** \file delta.c
30 * Generate in streaming mode an rsync delta given a set of signatures, and a
31 * new file.
32 *
33 * The size of blocks for signature generation is determined by the block size
34 * in the incoming signature.
35 *
36 * To calculate a signature, we need to be able to see at least one block of
37 * the new file at a time. Once we have that, we calculate its weak signature,
38 * and see if there is any block in the signature hash table that has the same
39 * weak sum. If there is one, then we also compute the strong sum of the new
40 * block, and cross check that. If they're the same, then we can assume we have
41 * a match.
42 *
43 * The final block of the file has to be handled a little differently, because
44 * it may be a short match. Short blocks in the signature don't include their
45 * length -- we just allow for the final short block of the file to match any
46 * block in the signature, and if they have the same checksum we assume they
47 * must have the same length. Therefore, when we emit a COPY command, we have
48 * to send it with a length that is the same as the block matched, and not the
49 * block length from the signature.
50 *
51 * Profiling results as of v1.26, 2001-03-18:
52 *
53 * If everything matches, then we spend almost all our time in rs_mdfour64 and
54 * rs_weak_sum, which is unavoidable and therefore a good profile.
55 *
56 * If nothing matches, it is not so good.
57 *
58 * 2002-06-26: Donovan Baarda
59 *
60 * The following is based entirely on pysync. It is much cleaner than the
61 * previous incarnation of this code. It is slightly complicated because in
62 * this case the output can block, so the main delta loop needs to stop when
63 * this happens.
64 *
65 * In pysync a 'last' attribute is used to hold the last miss or match for
66 * extending if possible. In this code, basis_len and scoop_pos are used
67 * instead of 'last'. When basis_len > 0, last is a match. When basis_len = 0
68 * and scoop_pos is > 0, last is a miss. When both are 0, last is None (ie,
69 * nothing).
70 *
71 * Pysync is also slightly different in that a 'flush' method is available to
72 * force output of accumulated data. This 'flush' is use to finalise delta
73 * calculation. In librsync input is terminated with an eof flag on the input
74 * stream. I have structured this code similar to pysync with a seperate flush
75 * function that is used when eof is reached. This allows for a flush style API
76 * if one is ever needed. Note that flush in pysync can be used for more than
77 * just terminating delta calculation, so a flush based API can in some ways be
78 * more flexible...
79 *
80 * The input data is first scanned, then processed. Scanning identifies input
81 * data as misses or matches, and emits the instruction stream. Processing the
82 * data consumes it off the input scoop and outputs the processed miss data
83 * into the tube.
84 *
85 * The scoop contains all data yet to be processed. The scoop_pos is an index
86 * into the scoop that indicates the point scanned to. As data is scanned,
87 * scoop_pos is incremented. As data is processed, it is removed from the scoop
88 * and scoop_pos adjusted. Everything gets complicated because the tube can
89 * block. When the tube is blocked, no data can be processed. */
90
91#include "config.h"
92#include <assert.h>
93#include <stdlib.h>
94#include "librsync.h"
95#include "job.h"
96#include "sumset.h"
97#include "checksum.h"
98#include "stream.h"
99#include "emit.h"
100#include "trace.h"
101
103static rs_result rs_delta_s_flush(rs_job_t *job);
104static rs_result rs_delta_s_end(rs_job_t *job);
105static inline void rs_getinput(rs_job_t *job);
106static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
107 size_t *match_len);
108static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
109 size_t match_len);
110static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len);
111static inline rs_result rs_appendflush(rs_job_t *job);
112static inline rs_result rs_processmatch(rs_job_t *job);
113static inline rs_result rs_processmiss(rs_job_t *job);
114
115/** Get a block of data if possible, and see if it matches.
116 *
117 * On each call, we try to process all of the input data available on the scoop
118 * and input buffer. */
120{
121 const size_t block_len = job->signature->block_len;
122 rs_long_t match_pos;
123 size_t match_len;
124 rs_result result;
125
126 rs_job_check(job);
127 /* read the input into the scoop */
128 rs_getinput(job);
129 /* output any pending output from the tube */
130 result = rs_tube_catchup(job);
131 /* while output is not blocked and there is a block of data */
132 while ((result == RS_DONE)
133 && ((job->scoop_pos + block_len) < job->scoop_avail)) {
134 /* check if this block matches */
135 if (rs_findmatch(job, &match_pos, &match_len)) {
136 /* append the match and reset the weak_sum */
137 result = rs_appendmatch(job, match_pos, match_len);
138 weaksum_reset(&job->weak_sum);
139 } else {
140 /* rotate the weak_sum and append the miss byte */
141 weaksum_rotate(&job->weak_sum, job->scoop_next[job->scoop_pos],
142 job->scoop_next[job->scoop_pos + block_len]);
143 result = rs_appendmiss(job, 1);
144 }
145 }
146 /* if we completed OK */
147 if (result == RS_DONE) {
148 /* if we reached eof, we can flush the last fragment */
149 if (job->stream->eof_in) {
150 job->statefn = rs_delta_s_flush;
151 return RS_RUNNING;
152 } else {
153 /* we are blocked waiting for more data */
154 return RS_BLOCKED;
155 }
156 }
157 return result;
158}
159
160static rs_result rs_delta_s_flush(rs_job_t *job)
161{
162 rs_long_t match_pos;
163 size_t match_len;
164 rs_result result;
165
166 rs_job_check(job);
167 /* read the input into the scoop */
168 rs_getinput(job);
169 /* output any pending output */
170 result = rs_tube_catchup(job);
171 /* while output is not blocked and there is any remaining data */
172 while ((result == RS_DONE) && (job->scoop_pos < job->scoop_avail)) {
173 /* check if this block matches */
174 if (rs_findmatch(job, &match_pos, &match_len)) {
175 /* append the match and reset the weak_sum */
176 result = rs_appendmatch(job, match_pos, match_len);
177 weaksum_reset(&job->weak_sum);
178 } else {
179 /* rollout from weak_sum and append the miss byte */
180 weaksum_rollout(&job->weak_sum, job->scoop_next[job->scoop_pos]);
181 rs_trace("block reduced to " FMT_SIZE "",
182 weaksum_count(&job->weak_sum));
183 result = rs_appendmiss(job, 1);
184 }
185 }
186 /* if we are not blocked, flush and set end statefn. */
187 if (result == RS_DONE) {
188 result = rs_appendflush(job);
189 job->statefn = rs_delta_s_end;
190 }
191 if (result == RS_DONE) {
192 return RS_RUNNING;
193 }
194 return result;
195}
196
197static rs_result rs_delta_s_end(rs_job_t *job)
198{
199 rs_emit_end_cmd(job);
200 return RS_DONE;
201}
202
203static inline void rs_getinput(rs_job_t *job)
204{
205 size_t len;
206
207 len = rs_scoop_total_avail(job);
208 if (job->scoop_avail < len) {
209 rs_scoop_input(job, len);
210 }
211}
212
213/** find a match at scoop_pos, returning the match_pos and match_len.
214 *
215 * Note that this will calculate weak_sum if required. It will also determine
216 * the match_len.
217 *
218 * This routine could be modified to do xdelta style matches that would extend
219 * matches past block boundaries by matching backwards and forwards beyond the
220 * block boundaries. Extending backwards would require decrementing scoop_pos
221 * as appropriate. */
222static inline int rs_findmatch(rs_job_t *job, rs_long_t *match_pos,
223 size_t *match_len)
224{
225 const size_t block_len = job->signature->block_len;
226
227 /* calculate the weak_sum if we don't have one */
228 if (weaksum_count(&job->weak_sum) == 0) {
229 /* set match_len to min(block_len, scan_avail) */
230 *match_len = job->scoop_avail - job->scoop_pos;
231 if (*match_len > block_len) {
232 *match_len = block_len;
233 }
234 /* Update the weak_sum */
235 weaksum_update(&job->weak_sum, job->scoop_next + job->scoop_pos,
236 *match_len);
237 rs_trace("calculate weak sum from scratch length " FMT_SIZE "",
238 weaksum_count(&job->weak_sum));
239 } else {
240 /* set the match_len to the weak_sum count */
241 *match_len = weaksum_count(&job->weak_sum);
242 }
243 *match_pos =
244 rs_signature_find_match(job->signature, weaksum_digest(&job->weak_sum),
245 job->scoop_next + job->scoop_pos, *match_len);
246 return *match_pos != -1;
247}
248
249/** Append a match at match_pos of length match_len to the delta, extending a
250 * previous match if possible, or flushing any previous miss/match. */
251static inline rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos,
252 size_t match_len)
253{
254 rs_result result = RS_DONE;
255
256 /* if last was a match that can be extended, extend it */
257 if (job->basis_len && (job->basis_pos + job->basis_len) == match_pos) {
258 job->basis_len += match_len;
259 } else {
260 /* else appendflush the last value */
261 result = rs_appendflush(job);
262 /* make this the new match value */
263 job->basis_pos = match_pos;
264 job->basis_len = match_len;
265 }
266 /* increment scoop_pos to point at next unscanned data */
267 job->scoop_pos += match_len;
268 /* we can only process from the scoop if output is not blocked */
269 if (result == RS_DONE) {
270 /* process the match data off the scoop */
271 result = rs_processmatch(job);
272 }
273 return result;
274}
275
276/** Append a miss of length miss_len to the delta, extending a previous miss
277 * if possible, or flushing any previous match.
278 *
279 * This also breaks misses up into 32KB segments to avoid accumulating too much
280 * in memory. */
281static inline rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
282{
283 const size_t max_miss = 32768; /* For 0.01% 3 command bytes overhead. */
284 rs_result result = RS_DONE;
285
286 /* If last was a match, or max_miss misses, appendflush it. */
287 if (job->basis_len || (job->scoop_pos >= max_miss)) {
288 result = rs_appendflush(job);
289 }
290 /* increment scoop_pos */
291 job->scoop_pos += miss_len;
292 return result;
293}
294
295/** Flush any accumulating hit or miss, appending it to the delta. */
297{
298 /* if last is a match, emit it and reset last by resetting basis_len */
299 if (job->basis_len) {
300 rs_trace("matched " FMT_LONG " bytes at " FMT_LONG "!", job->basis_len,
301 job->basis_pos);
302 rs_emit_copy_cmd(job, job->basis_pos, job->basis_len);
303 job->basis_len = 0;
304 return rs_processmatch(job);
305 /* else if last is a miss, emit and process it */
306 } else if (job->scoop_pos) {
307 rs_trace("got " FMT_SIZE " bytes of literal data", job->scoop_pos);
308 rs_emit_literal_cmd(job, (int)job->scoop_pos);
309 return rs_processmiss(job);
310 }
311 /* otherwise, nothing to flush so we are done */
312 return RS_DONE;
313}
314
315/** Process matching data in the scoop.
316 *
317 * The scoop contains match data at scoop_next of length scoop_pos. This
318 * function processes that match data, returning RS_DONE if it completes, or
319 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
320 * still point at the next unscanned data.
321 *
322 * This function currently just removes data from the scoop and adjusts
323 * scoop_pos appropriately. In the future this could be used for something like
324 * context compressing of miss data. Note that it also calls rs_tube_catchup to
325 * output any pending output. */
327{
328 job->scoop_avail -= job->scoop_pos;
329 job->scoop_next += job->scoop_pos;
330 job->scoop_pos = 0;
331 return rs_tube_catchup(job);
332}
333
334/** Process miss data in the scoop.
335 *
336 * The scoop contains miss data at scoop_next of length scoop_pos. This
337 * function processes that miss data, returning RS_DONE if it completes, or
338 * RS_BLOCKED if it gets blocked. After it completes scoop_pos is reset to
339 * still point at the next unscanned data.
340 *
341 * This function uses rs_tube_copy to queue copying from the scoop into output.
342 * and uses rs_tube_catchup to do the copying. This automaticly removes data
343 * from the scoop, but this can block. While rs_tube_catchup is blocked,
344 * scoop_pos does not point at legit data, so scanning can also not proceed.
345 *
346 * In the future this could do compression of miss data before outputing it. */
348{
349 rs_tube_copy(job, job->scoop_pos);
350 job->scoop_pos = 0;
351 return rs_tube_catchup(job);
352}
353
354/** State function that does a slack delta containing only literal data to
355 * recreate the input. */
357{
358 rs_buffers_t *const stream = job->stream;
359 size_t avail = stream->avail_in;
360
361 if (avail) {
362 rs_trace("emit slack delta for " FMT_SIZE " available bytes", avail);
363 rs_emit_literal_cmd(job, (int)avail);
364 rs_tube_copy(job, avail);
365 return RS_RUNNING;
366 } else if (rs_job_input_is_ending(job)) {
367 job->statefn = rs_delta_s_end;
368 return RS_RUNNING;
369 }
370 return RS_BLOCKED;
371}
372
373/** State function for writing out the header of the encoding job. */
375{
377 if (job->signature) {
379 } else {
380 rs_trace("no signature provided for delta, using slack deltas");
382 }
383 return RS_RUNNING;
384}
385
387{
388 rs_job_t *job;
389
390 job = rs_job_new("delta", rs_delta_s_header);
391 /* Caller can pass NULL sig or empty sig for "slack deltas". */
392 if (sig && sig->count > 0) {
393 rs_signature_check(sig);
394 /* Caller must have called rs_build_hash_table() by now. */
395 assert(sig->hashtable);
396 job->signature = sig;
397 weaksum_init(&job->weak_sum, rs_signature_weaksum_kind(sig));
398 }
399 return job;
400}
static int rs_findmatch(rs_job_t *job, rs_long_t *match_pos, size_t *match_len)
find a match at scoop_pos, returning the match_pos and match_len.
Definition: delta.c:222
static rs_result rs_processmiss(rs_job_t *job)
Process miss data in the scoop.
Definition: delta.c:347
static rs_result rs_processmatch(rs_job_t *job)
Process matching data in the scoop.
Definition: delta.c:326
rs_job_t * rs_delta_begin(rs_signature_t *sig)
Prepare to compute a streaming delta.
Definition: delta.c:386
static rs_result rs_delta_s_scan(rs_job_t *job)
Get a block of data if possible, and see if it matches.
Definition: delta.c:119
static rs_result rs_delta_s_slack(rs_job_t *job)
State function that does a slack delta containing only literal data to recreate the input.
Definition: delta.c:356
static rs_result rs_appendmiss(rs_job_t *job, size_t miss_len)
Append a miss of length miss_len to the delta, extending a previous miss if possible,...
Definition: delta.c:281
static rs_result rs_appendflush(rs_job_t *job)
Flush any accumulating hit or miss, appending it to the delta.
Definition: delta.c:296
static rs_result rs_delta_s_header(rs_job_t *job)
State function for writing out the header of the encoding job.
Definition: delta.c:374
static rs_result rs_appendmatch(rs_job_t *job, rs_long_t match_pos, size_t match_len)
Append a match at match_pos of length match_len to the delta, extending a previous match if possible,...
Definition: delta.c:251
void rs_emit_end_cmd(rs_job_t *job)
Write an END command.
Definition: emit.c:120
void rs_emit_literal_cmd(rs_job_t *job, int len)
Write a LITERAL command.
Definition: emit.c:47
void rs_emit_copy_cmd(rs_job_t *job, rs_long_t where, rs_long_t len)
Write a COPY command for given offset and length.
Definition: emit.c:80
void rs_emit_delta_header(rs_job_t *job)
Write the magic for the start of a delta.
Definition: emit.c:40
How to emit commands to the client.
Public header for librsync.
rs_result
Return codes from nonblocking rsync operations.
Definition: librsync.h:180
@ RS_RUNNING
The job is still running, and not yet finished or blocked.
Definition: librsync.h:183
@ RS_DONE
Completed successfully.
Definition: librsync.h:181
@ RS_BLOCKED
Blocked waiting for more data.
Definition: librsync.h:182
size_t rs_scoop_total_avail(rs_job_t *job)
Return the total number of bytes available including the scoop and input buffer.
Definition: scoop.c:227
void rs_scoop_input(rs_job_t *job, size_t len)
Try to accept a from the input buffer to get LEN bytes in the scoop.
Definition: scoop.c:67
Manage librsync streams of IO.
rs_result rs_tube_catchup(rs_job_t *job)
Put whatever will fit from the tube into the output of the stream.
Definition: tube.c:159
void rs_tube_copy(rs_job_t *job, size_t len)
Queue up a request to copy through len bytes from the input to the output of the stream.
Definition: tube.c:200
Description of input and output buffers.
Definition: librsync.h:322
size_t avail_in
Number of bytes available at next_in.
Definition: librsync.h:336
int eof_in
True if there is no more data after this.
Definition: librsync.h:339
The contents of this structure are private.
Definition: job.h:26
rs_long_t basis_pos
Copy from the basis position.
Definition: job.h:92
rs_result(* statefn)(rs_job_t *)
Callback for each processing step.
Definition: job.h:35
rs_signature_t * signature
Pointer to the signature that's being used by the operation.
Definition: job.h:51
weaksum_t weak_sum
The rollsum weak signature accumulator used by delta.c.
Definition: job.h:63
Signature of a whole file.
Definition: sumset.h:37
int count
Total number of blocks.
Definition: sumset.h:41
hashtable_t * hashtable
The hashtable for finding matches.
Definition: sumset.h:44
int block_len
The block length.
Definition: sumset.h:39
logging functions.