refactor illegal char handling and file adding for media

- add writeData() so we can add files to media folder without having to
  save them in an intermediate folder first
- we were stripping or checking for illegal filename characters in
  multiple places; factor those checks out into separate routines
- add * to invalid char list, and disallow both forward and backslash
- remove size checks in syncAdd(); is handled server-side
This commit is contained in:
Damien Elmes 2013-07-11 15:33:24 +09:00
parent a3190d52f1
commit a538e29480
2 changed files with 36 additions and 37 deletions

View file

@ -85,25 +85,29 @@ class MediaManager(object):
##########################################################################
def addFile(self, opath):
"""Copy PATH to MEDIADIR, and return new filename.
If the same name exists, compare checksums."""
mdir = self.dir()
return self.writeData(opath, open(opath, "rb").read())
def writeData(self, opath, data):
# if fname is a full path, use only the basename
fname = os.path.basename(opath)
# remove any dangerous characters
base = re.sub(r"[][<>:/\\&?\"\|]", "", os.path.basename(opath))
base = self.stripIllegal(fname)
(root, ext) = os.path.splitext(base)
def repl(match):
n = int(match.group(1))
return " (%d)" % (n+1)
# find the first available name
csum = checksum(data)
while True:
path = os.path.join(mdir, root + ext)
fname = root + ext
path = os.path.join(self.dir(), fname)
# if it doesn't exist, copy it directly
if not os.path.exists(path):
shutil.copyfile(opath, path)
return os.path.basename(os.path.basename(path))
open(path, "wb").write(data)
return fname
# if it's identical, reuse
if self.filesIdentical(opath, path):
return os.path.basename(path)
if checksum(open(path, "rb").read()) == csum:
return fname
# otherwise, increment the index in the filename
reg = " \((\d+)\)$"
if not re.search(reg, root):
@ -111,11 +115,6 @@ If the same name exists, compare checksums."""
else:
root = re.sub(reg, repl, root)
def filesIdentical(self, path1, path2):
"True if files are the same."
return (checksum(open(path1, "rb").read()) ==
checksum(open(path2, "rb").read()))
# String manipulation
##########################################################################
@ -262,16 +261,11 @@ If the same name exists, compare checksums."""
finished = False
meta = None
media = []
sizecnt = 0
# get meta info first
assert z.getinfo("_meta").file_size < 100000
meta = json.loads(z.read("_meta"))
nextUsn = int(z.read("_usn"))
# then loop through all files
for i in z.infolist():
# check for zip bombs
sizecnt += i.file_size
assert sizecnt < 100*1024*1024
if i.filename == "_meta" or i.filename == "_usn":
# ignore previously-retrieved meta
continue
@ -282,9 +276,6 @@ If the same name exists, compare checksums."""
data = z.read(i)
csum = checksum(data)
name = meta[i.filename]
# can we store the file on this system?
if self.illegal(name):
continue
# save file
open(name, "wb").write(data)
# update db
@ -302,15 +293,16 @@ If the same name exists, compare checksums."""
self.syncMod()
return finished
def illegal(self, f):
if isWin:
for c in f:
if c in "<>:\"/\\|?*^":
return True
elif isMac:
for c in f:
if c in ":\\/":
return True
# Illegal characters
##########################################################################
_illegalCharReg = re.compile(r'[][><:"/?*^\\|\0]')
def stripIllegal(self, str):
return re.sub(self._illegalCharReg, "", str)
def hasIllegal(self, str):
return not not re.search(self._illegalCharReg, str)
# Media syncing - bundling zip files to send to server
##########################################################################
@ -431,12 +423,7 @@ create table log (fname text primary key, type int);
if f.lower() == "thumbs.db":
continue
# and files with invalid chars
bad = False
for c in "\0", "/", "\\", ":":
if c in f:
bad = True
break
if bad:
if self.hasIllegal(f):
continue
# empty files are invalid; clean them up and continue
if not os.path.getsize(f):

View file

@ -97,3 +97,15 @@ def test_changes():
d.media.findChanges()
assert len(list(added())) == 1
assert len(list(d.media.removed())) == 1
def test_illegal():
d = getEmptyDeck()
aString = "a:b|cd\\e/f\0g*h"
good = "abcdefgh"
assert d.media.stripIllegal(aString) == good
for c in aString:
bad = d.media.hasIllegal("somestring"+c+"morestring")
if bad:
assert(c not in good)
else:
assert(c in good)