File archive.py of Package obs-git
import fnmatch
import os
import re
import subprocess
import sys
import tarfile
import shutil
import glob
import locale
import logging
import tempfile
from TarSCM.helpers import Helpers
try:
from io import StringIO
except:
from StringIO import StringIO
METADATA_PATTERN = re.compile(r'.*/\.(bzr|git|hg|svn)(\/.*|$)')
class BaseArchive():
def __init__(self):
self.helpers = Helpers()
self.archivefile = None
self.metafile = None
def extract_from_archive(self, repodir, files, outdir):
"""Extract all files directly outside of the archive.
"""
if files is None:
return
for filename in files:
path = os.path.join(repodir, filename)
path_glob = glob.glob(path)
if not path_glob:
sys.exit("%s: No such file or directory" % path)
for src in path_glob:
r_src = os.path.realpath(src)
if not r_src.startswith(repodir):
sys.exit("%s: tries to escape the repository" % src)
shutil.copy2(src, outdir)
class ObsCpio(BaseArchive):
def create_archive(self, scm_object, **kwargs):
"""Create an OBS cpio archive of repodir in destination directory.
"""
basename = kwargs['basename']
dstname = kwargs['dstname']
version = kwargs['version']
args = kwargs['cli']
commit = scm_object.get_current_commit()
package_metadata = args.package_meta
(workdir, topdir) = os.path.split(scm_object.arch_dir)
extension = 'obscpio'
cwd = os.getcwd()
os.chdir(workdir)
archivefilename = os.path.join(args.outdir, dstname + '.' + extension)
archivefile = open(archivefilename, "w")
# detect reproducible support
params = ['cpio', '--create', '--format=newc', '--owner', '0:0']
chkcmd = "cpio --create --format=newc --reproducible "
chkcmd += "</dev/null >/dev/null 2>&1"
if os.system(chkcmd) == 0:
params.append('--reproducible')
proc = subprocess.Popen(
params,
shell = False,
stdin = subprocess.PIPE,
stdout = archivefile,
stderr = subprocess.STDOUT
)
# transform glob patterns to regular expressions
includes = ''
excludes = r'$.'
topdir_re = '(' + topdir + '/)('
if args.include:
incl_arr = [fnmatch.translate(x + '*') for x in args.include]
match_list = r'|'.join(incl_arr)
includes = topdir_re + match_list + ')'
if args.exclude:
excl_arr = [fnmatch.translate(x) for x in args.exclude]
excludes = topdir_re + r'|'.join(excl_arr) + ')'
# add topdir without filtering for now
cpiolist = []
for root, dirs, files in os.walk(topdir, topdown=False):
# excludes
dirs[:] = [os.path.join(root, d) for d in dirs]
dirs[:] = [d for d in dirs if not re.match(excludes, d)]
dirs[:] = [d for d in dirs if re.match(includes, d)]
# exclude/include files
files = [os.path.join(root, f) for f in files]
files = [f for f in files if not re.match(excludes, f)]
files = [f for f in files if re.match(includes, f)]
for name in dirs:
if not METADATA_PATTERN.match(name) or package_metadata:
cpiolist.append(name)
for name in files:
if not METADATA_PATTERN.match(name) or package_metadata:
cpiolist.append(name)
tstamp = self.helpers.get_timestamp(scm_object, args, topdir)
for name in sorted(cpiolist):
try:
os.utime(name, (tstamp, tstamp))
except OSError:
pass
# bytes() break in python2 with a TypeError as it expects only 1
# arg
try:
proc.stdin.write(name.encode('UTF-8', 'surrogateescape'))
except (TypeError, UnicodeDecodeError):
proc.stdin.write(name)
proc.stdin.write(b"\n")
proc.stdin.close()
ret_code = proc.wait()
if ret_code != 0:
raise SystemExit("Creating the cpio archive failed!")
archivefile.close()
# write meta data
infofile = os.path.join(args.outdir, basename + '.obsinfo')
logging.debug("Writing to obsinfo file '%s'", infofile)
metafile = open(infofile, "w")
metafile.write("name: " + basename + "\n")
metafile.write("version: " + version + "\n")
metafile.write("mtime: " + str(tstamp) + "\n")
if commit:
metafile.write("commit: " + commit + "\n")
metafile.close()
self.archivefile = archivefile.name
self.metafile = metafile.name
os.chdir(cwd)
class Tar(BaseArchive):
def create_archive(self, scm_object, **kwargs):
"""Create a tarball of repodir in destination directory."""
(workdir, topdir) = os.path.split(scm_object.arch_dir)
args = kwargs['cli']
outdir = args.outdir
dstname = kwargs['dstname']
extension = (args.extension or 'tar')
exclude = args.exclude
include = args.include
package_metadata = args.package_meta
timestamp = self.helpers.get_timestamp(
scm_object,
args,
scm_object.clone_dir
)
incl_patterns = []
excl_patterns = []
for i in include:
# for backward compatibility add a trailing '*' if i isn't a
# pattern
if fnmatch.translate(i) == fnmatch.translate(i + r''):
i += r'*'
pat = fnmatch.translate(os.path.join(topdir, i))
incl_patterns.append(re.compile(pat))
for exc in exclude:
pat = fnmatch.translate(os.path.join(topdir, exc))
excl_patterns.append(re.compile(pat))
def tar_exclude(filename):
"""
Exclude (return True) or add (return False) file to tar achive.
"""
if not package_metadata and METADATA_PATTERN.match(filename):
return True
if incl_patterns:
for pat in incl_patterns:
if pat.match(filename):
return False
return True
for pat in excl_patterns:
if pat.match(filename):
return True
return False
def reset(tarinfo):
"""Python 2.7 only: reset uid/gid to 0/0 (root)."""
tarinfo.uid = tarinfo.gid = 0
tarinfo.uname = tarinfo.gname = "root"
if timestamp != 0:
tarinfo.mtime = timestamp
return tarinfo
def tar_filter(tarinfo):
if tar_exclude(tarinfo.name):
return None
return reset(tarinfo)
cwd = os.getcwd()
os.chdir(workdir)
enc = locale.getpreferredencoding()
out_file = os.path.join(outdir, dstname + '.' + extension)
with tarfile.open(out_file, "w", encoding=enc) as tar:
try:
tar.add(topdir, recursive=False, filter=reset)
except TypeError:
# Python 2.6 compatibility
tar.add(topdir, recursive=False)
for entry in map(lambda x: os.path.join(topdir, x),
sorted(os.listdir(topdir))):
try:
tar.add(entry, filter=tar_filter)
except TypeError:
# Python 2.6 compatibility
tar.add(entry, exclude=tar_exclude)
self.archivefile = tar.name
os.chdir(cwd)
class Gbp(BaseArchive):
def create_archive(self, scm_object, **kwargs):
"""Create Debian source artefacts using git-buildpackage.
"""
args = kwargs['cli']
version = kwargs['version']
(workdir, topdir) = os.path.split(scm_object.clone_dir)
cwd = os.getcwd()
os.chdir(workdir)
if not args.revision:
revision = 'origin/master'
else:
revision = 'origin/' + args.revision
command = ['gbp', 'buildpackage', '--git-notify=off',
'--git-force-create', '--git-cleaner="true"']
# we are not on a proper local branch due to using git-reset but we
# anyway use the --git-export option
command.extend(['--git-ignore-branch',
"--git-export=%s" % revision])
# gbp can load submodules without having to run the git command, and
# will ignore submodules even if loaded manually unless this option is
# passed.
if args.submodules:
command.extend(['--git-submodules'])
# create local pristine-tar branch if present
ret, output = self.helpers.run_cmd(['git', 'rev-parse', '--verify',
'--quiet', 'origin/pristine-tar'],
cwd=scm_object.clone_dir)
if not ret:
ret, output = self.helpers.run_cmd(['git', 'update-ref',
'refs/heads/pristine-tar',
'origin/pristine-tar'],
cwd=scm_object.clone_dir)
if not ret:
command.append('--git-pristine-tar')
else:
command.append('--git-no-pristine-tar')
else:
command.append('--git-no-pristine-tar')
# Prevent potentially dangerous arguments from being passed to gbp,
# e.g. via cleaner, postexport or other hooks.
if args.gbp_build_args:
build_args = args.gbp_build_args.split(' ')
safe_args = re.compile(
'--git-verbose|--git-upstream-tree=.*|--git-no-pristine-tar')
p = re.compile('--git-.*|--hook-.*|--.*-hook=.*')
gbp_args = [arg for arg in build_args if safe_args.match(arg)]
dpkg_args = [arg for arg in build_args if not p.match(arg)]
ignored_args = list(set(build_args) - set(gbp_args + dpkg_args))
if ignored_args:
logging.info("Ignoring build_args: %s" % ignored_args)
command.extend(gbp_args + dpkg_args)
# Set the version in the changelog. Note that we can't simply use
# --source-option=-Dversion=$ver as it will not change the tarball
# name, which means dpkg-source -x pkg.dsc will fail as the names
# and version will not match
cl_path = os.path.join(scm_object.clone_dir, 'debian', 'changelog')
skip_versions = ['', '_none_', '_auto_', None]
if (os.path.isfile(cl_path) and version not in skip_versions):
# Some characters are legal in Debian's versions but not in a git
# tag, so they get substituted
version = re.sub(r'_', r'~', version)
version = re.sub(r'%', r':', version)
with open(cl_path, 'r') as cl:
lines = cl.readlines()
old_version = re.search(r'.+ \((.+)\) .+', lines[0]).group(1)
# non-native packages MUST have a debian revision (-xyz)
drev_ov = re.search(r'-', old_version)
drev_v = re.search(r'-', version)
if (drev_ov is not None and drev_v) is None:
logging.warning("Package is non-native but requested version"
" %s is native! Ignoring.", version)
else:
with open(cl_path, 'w+') as cl:
# A valid debian changelog has 'package (version) release'
# as the first line, if it's malformed we don't care as it
# will not even build
logging.debug("Setting version to %s", version)
# gbp by default complains about uncommitted changes
command.append("--git-ignore-new")
lines[0] = re.sub(r'^(.+) \(.+\) (.+)',
r'\1 (%s) \2' % version, lines[0])
cl.write("".join(lines))
logging.debug("Running in %s", scm_object.clone_dir)
self.helpers.safe_run(command, cwd=scm_object.clone_dir)
# Use dpkg to find out what source artefacts have been built and copy
# them back, which allows the script to be future-proof and work with
# all present and future package formats
sources = self.helpers.safe_run(['dpkg-scansources', workdir],
cwd=workdir)[1]
FILES_PATTERN = re.compile(
r'^Files:(.*(?:\n .*)+)', flags=re.MULTILINE)
for match in FILES_PATTERN.findall(sources):
logging.info("Files:")
for line in match.strip().split("\n"):
fname = line.strip().split(' ')[2]
logging.info(" %s", fname)
input_file = os.path.join(workdir, fname)
output_file = os.path.join(args.outdir, fname)
filename_matches_dsc = fnmatch.fnmatch(fname, '*.dsc')
if (args.gbp_dch_release_update and filename_matches_dsc):
# This tag is used by the build-recipe-dsc to set the OBS
# revision: https://github.com/openSUSE/obs-build/pull/192
logging.debug("Setting OBS-DCH-RELEASE in %s", input_file)
with open(input_file, "a") as dsc_file:
dsc_file.write("OBS-DCH-RELEASE: 1")
shutil.copy(input_file, output_file)
os.chdir(cwd)