File python-rpmfile-1.0.0+git20190702.208ac80.obscpio of Package python-rpmfile
07070100000000000081A4000003E800000064000000015D1BA0290000016A000000000000000000000000000000000000003200000000python-rpmfile-1.0.0+git20190702.208ac80/.project<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
<name>rpmquery</name>
<comment></comment>
<projects>
</projects>
<buildSpec>
<buildCommand>
<name>org.python.pydev.PyDevBuilder</name>
<arguments>
</arguments>
</buildCommand>
</buildSpec>
<natures>
<nature>org.python.pydev.pythonNature</nature>
</natures>
</projectDescription>
07070100000001000081A4000003E800000064000000015D1BA0290000019E000000000000000000000000000000000000003700000000python-rpmfile-1.0.0+git20190702.208ac80/.pydevproject<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<?eclipse-pydev version="1.0"?>
<pydev_project>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_INTERPRETER">Default</pydev_property>
<pydev_property name="org.python.pydev.PYTHON_PROJECT_VERSION">python 2.7</pydev_property>
<pydev_pathproperty name="org.python.pydev.PROJECT_SOURCE_PATH">
<path>/rpmquery</path>
</pydev_pathproperty>
</pydev_project>
07070100000002000041ED000003E800000064000000025D1BA02900000000000000000000000000000000000000000000003300000000python-rpmfile-1.0.0+git20190702.208ac80/.settings07070100000003000081A4000003E800000064000000015D1BA02900000095000000000000000000000000000000000000005400000000python-rpmfile-1.0.0+git20190702.208ac80/.settings/org.eclipse.core.resources.prefseclipse.preferences.version=1
encoding//rpmfile/__init__.py=iso-8859-15
encoding//rpmfile/cpiofile.py=utf-8
encoding//rpmfile/rpmdefs.py=iso-8859-15
07070100000004000081A4000003E800000064000000015D1BA02900000062000000000000000000000000000000000000003500000000python-rpmfile-1.0.0+git20190702.208ac80/.travis.ymllanguage: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
script:
- python setup.py test
07070100000005000081A4000003E800000064000000015D1BA0290000042F000000000000000000000000000000000000003100000000python-rpmfile-1.0.0+git20190702.208ac80/LICENSECopyright (c) 2015 Sean Ross-Ross
MIT License
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
07070100000006000081A4000003E800000064000000015D1BA02900000389000000000000000000000000000000000000003300000000python-rpmfile-1.0.0+git20190702.208ac80/README.md# rpmfile
[](https://travis-ci.org/srossross/rpmfile)
Tools for inspecting RPM files in python. This module is modeled after the
[tarfile](https://docs.python.org/3/library/tarfile.html) module.
## Example
```python
import rpmfile
with rpmfile.open('file.rpm') as rpm:
# Inspect the RPM headers
print(rpm.headers.keys())
print(rpm.headers.get('arch', 'noarch'))
# Extract a fileobject from the archive
fd = rpm.extractfile('./usr/bin/script')
print(fd.read())
for member in rpm.getmembers():
print(member)
```
## Classes
* rpmfile.RPMFile: The RPMFile object provides an interface to a RPM archive
* rpmfile.RPMInfo: An RPMInfo object represents one member in a RPMFile.
## Code in this module was borrowed from:
* https://bitbucket.org/krp/cpiofile
* https://github.com/mjvm/pyrpm
07070100000007000041ED000003E800000064000000025D1BA02900000000000000000000000000000000000000000000003100000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile07070100000008000081A4000003E800000064000000015D1BA02900001668000000000000000000000000000000000000003D00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/__init__.py
from __future__ import print_function, unicode_literals, absolute_import
from .headers import get_headers
import sys
import io
import gzip
try:
import lzma
except ImportError:
pass
import struct
from rpmfile import cpiofile
from functools import wraps
from rpmfile.io_extra import _SubFile
pad = lambda fileobj: (4 - (fileobj.tell() % 4)) % 4
class NoLZMAModuleError(NotImplementedError):
pass
class RPMInfo(object):
'''
Informational class which holds the details about an
archive member given by an RPM entry block.
RPMInfo objects are returned by RPMFile.getmember() and
RPMFile.getmembers() and are
usually created internally.
'''
_new_coder = struct.Struct(b'8s8s8s8s8s8s8s8s8s8s8s8s8s')
def __init__(self, name, file_start, file_size, initial_offset, isdir):
self.name = name
self.file_start = file_start
self.size = file_size
self.initial_offset = initial_offset
self._isdir = isdir
@property
def isdir(self):
return self._isdir
def __repr__(self):
return '<RPMMember %r>' % self.name
@classmethod
def _read(cls, magic, fileobj):
if magic == b'070701':
return cls._read_new(fileobj, magic=magic)
else:
raise Exception('bad magic number %r' % magic)
@classmethod
def _read_new(cls, fileobj, magic=None):
coder = cls._new_coder
initial_offset = fileobj.tell()
d = coder.unpack_from(fileobj.read(coder.size))
namesize = int(d[11], 16)
name = fileobj.read(namesize)[:-1].decode('utf-8')
fileobj.seek(pad(fileobj), 1)
file_start = fileobj.tell()
file_size = int(d[6], 16)
fileobj.seek(file_size, 1)
fileobj.seek(pad(fileobj), 1)
nlink = int(d[4], 16)
isdir = nlink == 2 and file_size == 0
return cls(name, file_start, file_size, initial_offset, isdir)
class RPMFile(object):
'''
Open an RPM archive `name'. `mode' must be 'r' to
read from an existing archive.
If `fileobj' is given, it is used for reading or writing data. If it
can be determined, `mode' is overridden by `fileobj's mode.
`fileobj' is not closed, when TarFile is closed.
'''
def __init__(self, name=None, mode='rb', fileobj=None):
if mode != 'rb':
raise NotImplementedError("currently the only supported mode is 'rb'")
self._fileobj = fileobj or io.open(name, mode)
self._header_range, self._headers = get_headers(self._fileobj)
self._ownes_fd = fileobj is None
@property
def data_offset(self):
return self._header_range[1]
@property
def header_range(self):
return self._header_range
@property
def headers(self):
'RPM headers'
return self._headers
def __enter__(self):
return self
def __exit__(self, *excinfo):
if self._ownes_fd:
self._fileobj.close()
_members = None
def getmembers(self):
'''
Return the members of the archive as a list of RPMInfo objects. The
list has the same order as the members in the archive.
'''
if self._members is None:
self._members = _members = []
g = self.data_file
magic = g.read(2)
while magic:
if magic == b'07':
magic += g.read(4)
member = RPMInfo._read(magic, g)
if member.name == 'TRAILER!!!':
break
if not member.isdir:
_members.append(member)
magic = g.read(2)
return _members
return self._members
def getmember(self, name):
'''
Return an RPMInfo object for member `name'. If `name' can not be
found in the archive, KeyError is raised. If a member occurs more
than once in the archive, its last occurrence is assumed to be the
most up-to-date version.
'''
members = self.getmembers()
for m in members[::-1]:
if m.name == name:
return m
raise KeyError("member %s could not be found" % name)
def extractfile(self, member):
'''
Extract a member from the archive as a file object. `member' may be
a filename or an RPMInfo object.
The file-like object is read-only and provides the following
methods: read(), readline(), readlines(), seek() and tell()
'''
if not isinstance(member, RPMInfo):
member = self.getmember(member)
return _SubFile(self.data_file, member.file_start, member.size)
_data_file = None
@property
def data_file(self):
"""Return the uncompressed raw CPIO data of the RPM archive."""
if self._data_file is None:
fileobj = _SubFile(self._fileobj, self.data_offset)
if self.headers["archive_compression"] == b"xz":
if not getattr(sys.modules[__name__], 'lzma', False):
raise NoLZMAModuleError('lzma module not present')
self._data_file = lzma.LZMAFile(fileobj)
else:
self._data_file = gzip.GzipFile(fileobj=fileobj)
return self._data_file
def open(name=None, mode='rb', fileobj=None):
'''
Open an RPM archive for reading. Return
an appropriate RPMFile class.
'''
return RPMFile(name, mode, fileobj)
def main():
print(sys.argv[1])
with open(sys.argv[1]) as rpm:
print(rpm.headers)
for m in rpm.getmembers():
print(m)
print('done')
if __name__ == '__main__':
main()
07070100000009000081A4000003E800000064000000015D1BA0290000453F000000000000000000000000000000000000003D00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/cpiofile.py#!/usr/bin/env python -3
# -*- coding: utf-8 -*-
#
# Copyright © 2011, 2013 K Richard Pixley
#
# See LICENSE for details.
#
# Time-stamp: <30-Jun-2013 19:07:22 PDT by rich@noir.com>
"""
Cpiofile is a library which reads and writes unix style 'cpio' format
archives.
.. todo:: open vs context manager
.. todo:: make is_cpiofile work on fileobj
"""
from __future__ import unicode_literals, print_function
__docformat__ = 'restructuredtext en'
__all__ = [
'CheckSumError',
'CpioError',
'CpioFile',
'CpioMember',
'HeaderError',
'InvalidFileFormat',
'InvalidFileFormatNull',
'is_cpiofile',
'valid_magic',
]
import abc
import io
import mmap
import os
import struct
class CpioError(Exception):
"""Base class for CpioFile exceptions"""
pass
class CheckSumError(CpioError):
"""Exception indicating a check sum error"""
pass
class InvalidFileFormat(CpioError):
"""Exception indicating a file format error"""
pass
class InvalidFileFormatNull(InvalidFileFormat):
"""Exception indicating a null file"""
pass
class HeaderError(CpioError):
"""Exception indicating a header error"""
pass
def valid_magic(block):
"""predicate indicating whether *block* includes a valid magic number"""
return CpioMember.valid_magic(block)
def is_cpiofile(name):
"""predicate indicating whether *name* is a valid cpiofile"""
with io.open(name, 'rb') as fff:
return valid_magic(fff.read(16))
class StructBase(object):
"""
An abstract base class representing objects which are inherently
based on a struct.
"""
__metaclass__ = abc.ABCMeta
coder = None
"""
The :py:class:`struct.Struct` used to encode/decode this object
into a block of memory. This is expected to be overridden by
subclasses.
""" # pylint: disable=W0105
@property
def size(self):
"""
Exact size in bytes of a block of memory into which is suitable
for packing this instance.
"""
return self.coder.size
def unpack(self, block):
"""convenience function for unpacking"""
return self.unpack_from(block)
@abc.abstractmethod
def unpack_from(self, block, offset=0):
"""
Set the values of this instance from an in-memory
representation of the struct.
:param string block: block of memory from which to unpack
:param int offset: optional offset into the memory block from
which to start unpacking
"""
raise NotImplementedError
def pack(self):
"""convenience function for packing"""
block = bytearray(self.size)
self.pack_into(block)
return block
@abc.abstractmethod
def pack_into(self, block, offset=0):
"""
Store the values of this instance into an in-memory
representation of the file.
:param string block: block of memory into which to pack
:param int offset: optional offset into the memory block into
which to start packing
"""
raise NotImplementedError
__hash__ = None
def __eq__(self, other):
raise NotImplementedError
def __ne__(self, other):
return not self.__eq__(other)
def close_enough(self, other):
"""
This is a comparison similar to __eq__ except that here the
goal is to determine whether two objects are "close enough"
despite perhaps having been produced at different times in
different locations in the file system.
"""
return self == other
class CpioFile(StructBase):
"""Class representing an entire cpio file"""
_members = []
def __init__(self):
self._members = []
@property
def members(self):
"""accessor for a list of the members of this cpio file"""
return self._members
@property
def names(self):
"""accessor for a list of names of the members of this cpio file"""
return [member.name for member in self.members]
def __enter__(self):
return self
def __exit__(self, thingy, value, traceback):
self.close()
@classmethod
def open(cls, name=None, mode=None):
return cls._open(cls(), name)
def _open(self, name=None, fileobj=None, mymap=None, block=None):
"""
The _open function takes some form of file identifier and creates
an :py:class:`CpioFile` instance from it.
:param :py:class:`str` name: a file name
:param :py:class:`file` fileobj: if given, this overrides *name*
:param :py:class:`mmap.mmap` mymap: if given, this overrides *fileobj*
:param :py:class:`bytes` block: file contents in a block of memory, (if given, this overrides *mymap*)
The file to be used can be specified in any of four different
forms, (in reverse precedence):
#. a file name
#. :py:class:`file` object
#. :py:mod:`mmap.mmap`, or
#. a block of memory
"""
if block is not None:
if not name:
name = '<unknown>'
self.unpack_from(block)
if fileobj:
fileobj.close()
return self
if mymap is not None:
block = mymap
elif fileobj:
try:
mymap = mmap.mmap(fileobj.fileno(), 0,
mmap.MAP_SHARED, mmap.PROT_READ)
# pylint: disable=W0702
except:
mymap = 0
block = fileobj.read()
elif name:
fileobj = io.open(os.path.normpath(os.path.expanduser(name)), 'rb')
else:
assert False
return self._open(name=name,
fileobj=fileobj,
mymap=mymap,
block=block)
def close(self):
"""noop - here for completeness"""
pass
def unpack_from(self, block, offset=0):
pointer = offset
print("unpack_from")
while 'TRAILER!!!' not in self.names:
cmem = CpioMember.encoded_class(block, pointer)()
print(type(cmem))
self.members.append(cmem.unpack_from(block, pointer))
pointer += cmem.size
del self.members[-1]
def pack_into(self, block, offset=0):
pointer = offset
for member in self.members:
member.pack_into(block, pointer)
pointer += member.size
cmtype = type(self.members[0]) if self.members else CpioMemberNew
cmt = cmtype()
cmt.name = 'TRAILER!!!'
cmt.pack_into(block, pointer)
def get_member(self, name):
"""return a member by *name*"""
for member in self.members:
if member.name == name:
return member
return None
def __eq__(self, other):
raise NotImplementedError
class CpioMember(StructBase):
"""class representing a member of a cpio archive"""
coder = None
name = None
magic = None
devmajor = None
devminor = None
ino = None
mode = None
uid = None
gid = None
nlink = None
rdevmajor = None
rdevminor = None
mtime = None
filesize = None
content = None
@staticmethod
def valid_magic(block, offset=0):
"""
predicate indicating whether a block of memory has a valid magic number
"""
try:
return CpioMember.encoded_class(block, offset)
except InvalidFileFormat:
return False
@staticmethod
def encoded_class(block, offset=0):
"""
predicate indicating whether a block of memory includes a magic number
"""
if not block:
raise InvalidFileFormatNull
for key in __magicmap__:
if block.find(key, offset, offset + len(key)) > -1:
return __magicmap__[key]
raise InvalidFileFormat
def unpack_from(self, block, offset=0):
(self.magic, dev, self.ino, self.mode,
self.uid, self.gid, self.nlink, rdev,
mtimehigh, mtimelow, namesize, filesizehigh,
filesizelow) = self.coder.unpack_from(block, offset)
self.devmajor = os.major(dev)
self.devminor = os.minor(dev)
self.rdevmajor = os.major(rdev)
self.rdevminor = os.minor(rdev)
self.mtime = (mtimehigh << 16) | mtimelow
self.filesize = (filesizehigh << 16) | filesizelow
namestart = offset + self.coder.size
datastart = namestart + namesize
self.name = block[namestart:datastart - 1] # drop the null
if isinstance(self, CpioMemberBin) and (namesize & 1):
datastart += 1 # skip a pad byte if necessary
self.content = block[datastart:datastart + self.filesize]
return self
def pack_into(self, block, offset=0):
namesize = len(self.name)
dev = os.makedev(self.devmajor, self.devminor)
rdev = os.makedev(self.rdevmajor, self.rdevminor)
mtimehigh = self.mtime >> 16
mtimelow = self.mtime & 0xffff
filesizehigh = self.filesize >> 16
filesizelow = self.filesize & 0xffff
self.coder.pack_into(block, offset, self.magic, dev,
self.ino, self.mode, self.uid, self.gid,
self.nlink, rdev, mtimehigh, mtimelow,
namesize, filesizehigh, filesizelow)
namestart = offset + self.coder.size
datastart = namestart + namesize + 1
block[namestart:datastart - 1] = self.name
block[datastart - 1] = '\x00'
if isinstance(self, CpioMemberBin) and (namesize & 1):
datastart += 1
block[datastart - 1] = '\x00'
block[datastart:datastart + self.filesize] = self.content
if isinstance(self, CpioMemberBin) and (self.filesize & 1):
block[datastart + self.filesize] = '\x00'
return self
@property
def size(self):
return (self.coder.size
+ len(self.name) + 1
+ self.filesize)
def __repr__(self):
return (b'<{0}@{1}: coder={2}, name=\'{3}\', magic=\'{4}\''
+ ', devmajor={5}, devminor={6}, ino={7}, mode={8}'
+ ', uid={9}, gid={10}, nlink={11}, rdevmajor={12}'
+ ', rdevmino={13}, mtime={14}, filesize={15}>'
.format(self.__class__.__name__, hex(id(self)), self.coder,
self.name, self.magic, self.devmajor, self.devminor,
self.ino, self.mode, self.uid, self.gid, self.nlink,
self.rdevmajor, self.rdevminor, self.mtime,
self.filesize))
def __eq__(self, other):
return (isinstance(other, self.__class__)
and self.coder == other.coder
and self.magic == other.magic
and self.devmajor == other.devmajor
and self.devminor == other.devminor
and self.ino == other.ino
and self.mode == other.mode
and self.uid == other.uid
and self.gid == other.gid
and self.nlink == other.nlink
and self.rdevmajor == other.rdevmajor
and self.rdevminor == other.rdevminor
and self.mtime == other.mtime
and self.filesize == other.filesize)
close_enough = __eq__
class CpioMemberBin(CpioMember):
"""intermediate class indicating binary members - for subclassing only"""
@property
def size(self):
namesize = len(self.name) + 1 # add null
retval = self.coder.size
retval += namesize
if isinstance(self, CpioMemberBin) and (namesize & 1):
retval += 1
retval += self.filesize
if isinstance(self, CpioMemberBin) and (self.filesize & 1):
retval += 1
return retval
class CpioMember32b(CpioMemberBin):
"""
.. todo:: need to pad after name and after content for old binary.
"""
coder = struct.Struct(b'>2sHHHHHHHHHHHH')
class CpioMember32l(CpioMemberBin):
"""class representing a 32bit little endian binary member"""
coder = struct.Struct(b'<2sHHHHHHHHHHHH')
class CpioMemberODC(CpioMember):
"""class representing an ODC member"""
coder = struct.Struct(b'=6s6s6s6s6s6s6s6s11s6s11s')
def unpack_from(self, block, offset=0):
(self.magic, dev, ino, mode,
uid, gid, nlink, rdev,
mtime, namesize, filesize) = self.coder.unpack_from(block, offset)
_namesize = namesize
self.ino = int(ino, 8)
self.mode = int(mode, 8)
self.uid = int(uid, 8)
self.gid = int(gid, 8)
self.nlink = int(nlink, 8)
dev = int(dev, 8)
rdev = int(rdev, 8)
self.devmajor = os.major(dev)
self.devminor = os.minor(dev)
self.rdevmajor = os.major(rdev)
self.rdevminor = os.minor(rdev)
self.mtime = int(mtime, 8)
namesize = int(namesize, 8)
self.filesize = int(filesize, 8)
namestart = offset + self.coder.size
datastart = namestart + namesize
self.name = block[namestart:datastart - 1] # drop the null
print('+', _namesize, self.name)
self.content = block[datastart:datastart + self.filesize]
return self
def pack_into(self, block, offset=0):
dev = os.makedev(self.devmajor, self.devminor)
ino = str(self.ino)
mode = str(self.mode)
uid = str(self.uid)
gid = str(self.gid)
nlink = str(self.nlink)
rdev = os.makedev(self.rdevmajor, self.rdevminor)
mtime = str(self.mtime)
namesize = str(len(self.name) + 1) # add a null
filesize = str(self.filesize)
self.coder.pack_into(block, offset, self.magic, dev,
ino, mode, uid, gid,
nlink, rdev, mtime, namesize,
filesize)
namesize = len(self.name) + 1
namestart = offset + self.coder.size
datastart = namestart + namesize
block[namestart:datastart - 2] = self.name
block[datastart - 1] = '\x00'
block[datastart:datastart + self.filesize] = self.content
return self
class CpioMemberNew(CpioMember):
"""class representing a new member"""
coder = struct.Struct(b'6s8s8s8s8s8s8s8s8s8s8s8s8s8s')
# pylint: disable=W0613
@staticmethod
def _checksum(block, offset, length):
"""return a checksum for *block* at *offset* and *length*"""
return 0
# pylint: enable=W0613
def unpack_from(self, block, offset=0):
unpacks = self.coder.unpack_from(block, offset)
self.magic = unpacks[0]
self.ino = int(unpacks[1], 16)
self.mode = int(unpacks[2], 16)
self.uid = int(unpacks[3], 16)
self.gid = int(unpacks[4], 16)
self.nlink = int(unpacks[5], 16)
self.mtime = int(unpacks[6], 16)
self.filesize = int(unpacks[7], 16)
self.devmajor = int(unpacks[8], 16)
self.devminor = int(unpacks[9], 16)
self.rdevmajor = int(unpacks[10], 16)
self.rdevminor = int(unpacks[11], 16)
namesize = int(unpacks[12], 16)
check = int(unpacks[13], 16)
namestart = offset + self.coder.size
nameend = namestart + namesize
datastart = nameend + ((4 - (nameend % 4)) % 4) # pad
dataend = datastart + self.filesize
self.name = block[namestart:nameend - 1] # drop the null
print( "name", namesize, self.name)
print( 'pad', ((4 - (nameend % 4)) % 4) )# pad
self.content = block[datastart:dataend]
if check != self._checksum(self.content, 0, self.filesize):
raise CheckSumError
return self
def pack_into(self, block, offset=0):
namesize = len(self.name) + 1
# unused: rdev = os.makedev(self.rdevmajor, self.rdevminor)
self.coder.pack_into(
block, offset, self.magic, str(self.ino), str(self.mode),
str(self.uid), str(self.gid), str(self.nlink),
str(self.mtime), str(self.filesize), str(self.devmajor),
str(self.devminor), str(self.rdevmajor),
str(self.rdevminor), str(namesize),
self._checksum(self.content, 0, self.filesize))
namestart = offset + self.coder.size
nameend = namestart + namesize
datastart = nameend + ((4 - (nameend % 4)) % 4) # pad
dataend = datastart + self.filesize
block[namestart:nameend] = self.name
for i in range(nameend, datastart):
block[i] = '\x00'
block[datastart:dataend] = self.content
padend = dataend + ((4 - (datastart % 4)) % 4) # pad
for i in range(dataend, padend):
block[i] = '\x00'
return self
@property
def size(self):
retval = self.coder.size
retval += len(self.name) + 1
retval += ((4 - (retval % 4)) % 4)
retval += self.filesize
retval += ((4 - (retval % 4)) % 4)
return retval
class CpioMemberCRC(CpioMemberNew):
"""class representing a cpio archive member with a CRC"""
@staticmethod
def _checksum(block, offset, length):
csum = 0
for i in range(length):
csum += ord(block[offset + i])
return csum & 0xffffffff
__magicmap__ = {
b'\x71\xc7': CpioMember32b,
b'\xc7\x71': CpioMember32l,
b'070707': CpioMemberODC,
b'070701': CpioMemberNew,
b'070702': CpioMemberCRC,
}
0707010000000A000081A4000003E800000064000000015D1BA02900000052000000000000000000000000000000000000003B00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/errors.py'''
Created on Jan 10, 2014
@author: sean
'''
class RPMError(Exception):
pass0707010000000B000081A4000003E800000064000000015D1BA029000012ED000000000000000000000000000000000000003C00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/headers.py'''
Created on Jan 10, 2014
@author: sean
'''
from __future__ import print_function, unicode_literals, absolute_import
import sys
import struct
from pprint import pprint
from .errors import RPMError
tags = {'signature': 267
, 'md5': 269
, 'name': 1000
, 'version': 1001
, 'release': 1002
, 'serial': 1003
, 'summary': 1004
, 'description': 1005
, 'buildtime': 1006
, 'buildhost': 1007
, 'installtime': 1008
, 'size': 1009
, 'distribution': 1010
, 'vendor': 1011
, 'gif': 1012
, 'xpm': 1013
, 'copyright': 1014
, 'packager': 1015
, 'group': 1016
, 'changelog': 1017
, 'source': 1018
, 'patch': 1019
, 'url': 1020
, 'os': 1021
, 'arch': 1022
, 'prein': 1023
, 'postin': 1024
, 'preun': 1025
, 'postun': 1026
, 'filenames': 1027
, 'filesizes': 1028
, 'filestates': 1029
, 'filemodes': 1030
, 'fileuids': 1031
, 'filegids': 1032
, 'filerdevs': 1033
, 'filemtimes': 1034
, 'filemd5s': 1035
, 'filelinktos': 1036
, 'fileflags': 1037
, 'root': 1038
, 'fileusername': 1039
, 'filegroupname': 1040
, 'exclude': 1041
, 'exclusive': 1042
, 'icon': 1043
, 'sourcerpm': 1044
, 'fileverifyflags': 1045
, 'archivesize': 1046
, 'provides': 1047
, 'requireflags': 1048
, 'requirename': 1049
, 'requireversion': 1050
, 'nosource': 1051
, 'nopatch': 1052
, 'conflictflags': 1053
, 'conflictname': 1054
, 'conflictversion': 1055
, 'defaultprefix': 1056
, 'buildroot': 1057
, 'installprefix': 1058
, 'excludearch': 1059
, 'excludeos': 1060
, 'exclusivearch': 1061
, 'exclusiveos': 1062
, 'autoreqprov': 1063
, 'rpmversion': 1064
, 'triggerscripts': 1065
, 'triggername': 1066
, 'triggerversion': 1067
, 'triggerflags': 1068
, 'triggerindex': 1069
, 'verifyscript': 1079
, 'basenames': 1117
, 'archive_format': 1124
, 'archive_compression': 1125
, 'target': 1132
, 'authors':1081
, 'comments':1082
}
rtags = {value:key for (key, value) in tags.items()}
def extract_string(offset, count, store):
assert count == 1
idx = store[offset:].index(b'\x00')
return store[offset:offset + idx]
def extract_array(offset, count, store):
return store[offset:].split(b'\x00', count)[:-1]
def extract_bin(offset, count, store):
return store[offset:offset + count]
def extract_int32(offset, count, store):
values = struct.unpack(b'!' + b'i' * count, store[offset:offset + 4 * count])
if count == 1: values = values[0]
return values
def extract_int16(offset, count, store):
values = struct.unpack(b'!' + b'h' * count, store[offset:offset + 2 * count])
if count == 1: values = values[0]
return values
ty_map = {
3: extract_int16,
4: extract_int32,
6: extract_string,
7: extract_bin,
8: extract_array,
9: extract_string,
}
def extract_data(ty, offset, count, store):
extract = ty_map.get(ty)
if extract:
return extract(offset, count, store)
else:
return 'could not extract %s' % ty
def _readheader(fileobj):
char = fileobj.read(1)
while char != b'\x8e':
char = fileobj.read(1)
if char is None or char == b'':
raise RPMError("reached end of file without finding magic char \x8e")
magic = b'\x8e' + fileobj.read(2)
from binascii import hexlify
assert hexlify(magic) == b'8eade8', hexlify(magic)
version = ord(fileobj.read(1))
header_start = fileobj.tell() - 4 # -4 for magic
_ = fileobj.read(4)
num_entries, = struct.unpack(b'!i', fileobj.read(4))
header_structure_size, = struct.unpack(b'!i', fileobj.read(4))
header = struct.Struct(b'!iiii')
entries = []
for _ in range(num_entries):
entry = header.unpack(fileobj.read(header.size))
entries.append(entry)
store = fileobj.read(header_structure_size)
store
headers = {}
for tag, ty, offset, count in entries:
key = rtags.get(tag, tag)
value = extract_data(ty, offset, count, store)
headers[key] = value
header_end = fileobj.tell()
return (header_start, header_end), headers
def get_headers(fileobj):
lead = struct.Struct(b'!4sBBhh66shh16s')
data = fileobj.read(lead.size)
value = lead.unpack(data)
# Not sure what the first set of headers are for
first_range, first_headers = _readheader(fileobj)
second_range, second_headers = _readheader(fileobj)
first_headers.update(second_headers)
return second_range, first_headers
def main():
with open(sys.argv[1]) as fileobj:
headers = get_headers(fileobj)
pprint(headers)
if __name__ == '__main__':
main()
0707010000000C000081A4000003E800000064000000015D1BA02900000813000000000000000000000000000000000000003D00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/io_extra.py'''
Created on Jan 11, 2014
@author: sean
'''
import io
def _doc(from_func):
'''copy doc from one function to another
use as a decorator eg::
@_doc(file.tell)
def tell(..):
...
'''
def decorator(to_func):
to_func.__doc__ = from_func.__doc__
return to_func
return decorator
class _SubFile(object):
"""A thin wrapper around an existing file object that
provides a part of its data as an individual file
object.
"""
def __init__(self, fileobj, start=0, size=None):
self._fileobj = fileobj
self._start = start
if size is None:
fileobj.seek(0, 2)
pos = fileobj.tell()
self._size = pos - start
else:
self._size = size
self._pos = 0
def __getattr__(self, attr):
return getattr(self._fileobj, attr)
@_doc(io.FileIO.tell)
def tell(self):
return self._pos
@_doc(io.FileIO.seek)
def seek(self, offset, whence=0):
if whence == 0:
self._pos = offset
elif whence == 1:
self._pos += offset
else:
self._pos = self._size + offset
self._pos = max(0, self._pos)
def _n(self, size=None):
if not size:
size = self._size
return min(size, self._size - self._pos)
@_doc(io.FileIO.read)
def read(self, size=None):
self._fileobj.seek(self._pos + self._start, 0)
n = self._n(size)
self._pos += n
return self._fileobj.read(n)
@_doc(io.FileIO.readline)
def readline(self, size=None):
self._fileobj.seek(self._pos + self._start, 0)
n = self._n(size)
line = self._fileobj.readline(n)
self._pos += len(line)
return line
@_doc(io.FileIO.readlines)
def readlines(self, size=None):
n = self._n(size)
line = self.readline(n)
n -= len(line)
while line:
yield line
line = self.readline(n)
n -= len(line)
0707010000000D000081A4000003E800000064000000015D1BA0290000061F000000000000000000000000000000000000003C00000000python-rpmfile-1.0.0+git20190702.208ac80/rpmfile/rpmdefs.py# -*- coding: iso-8859-15 -*-
# -*- Mode: Python; py-ident-offset: 4 -*-
# vim:ts=4:sw=4:et
'''
rpm definitions
Mario Morgado (BSD) https://github.com/mjvm/pyrpm
'''
__revision__ = '$Rev$'[6:-2]
RPM_LEAD_MAGIC_NUMBER = '\xed\xab\xee\xdb'
RPM_HEADER_MAGIC_NUMBER = '\x8e\xad\xe8'
RPMTAG_MIN_NUMBER = 1000
RPMTAG_MAX_NUMBER = 1146
#signature tags
RPMSIGTAG_SIZE = 1000
RPMSIGTAG_LEMD5_1 = 1001
RPMSIGTAG_PGP = 1002
RPMSIGTAG_LEMD5_2 = 1003
RPMSIGTAG_MD5 = 1004
RPMSIGTAG_GPG = 1005
RPMSIGTAG_PGP5 = 1006
MD5_SIZE = 16 #16 bytes long
PGP_SIZE = 152 #152 bytes long
# data types definition
RPM_DATA_TYPE_NULL = 0
RPM_DATA_TYPE_CHAR = 1
RPM_DATA_TYPE_INT8 = 2
RPM_DATA_TYPE_INT16 = 3
RPM_DATA_TYPE_INT32 = 4
RPM_DATA_TYPE_INT64 = 5
RPM_DATA_TYPE_STRING = 6
RPM_DATA_TYPE_BIN = 7
RPM_DATA_TYPE_STRING_ARRAY = 8
RPM_DATA_TYPE_I18NSTRING_TYPE = 9
RPM_DATA_TYPES = (RPM_DATA_TYPE_NULL,
RPM_DATA_TYPE_CHAR,
RPM_DATA_TYPE_INT8,
RPM_DATA_TYPE_INT16,
RPM_DATA_TYPE_INT32,
RPM_DATA_TYPE_INT64,
RPM_DATA_TYPE_STRING,
RPM_DATA_TYPE_BIN,
RPM_DATA_TYPE_STRING_ARRAY,)
RPMTAG_NAME = 1000
RPMTAG_VERSION = 1001
RPMTAG_RELEASE = 1002
RPMTAG_DESCRIPTION = 1005
RPMTAG_COPYRIGHT = 1014
RPMTAG_URL = 1020
RPMTAG_ARCH = 1022
RPMTAGS = (RPMTAG_NAME,
RPMTAG_VERSION,
RPMTAG_RELEASE,
RPMTAG_DESCRIPTION,
RPMTAG_COPYRIGHT,
RPMTAG_URL,
RPMTAG_ARCH,)
0707010000000E000081A4000003E800000064000000015D1BA02900000246000000000000000000000000000000000000003200000000python-rpmfile-1.0.0+git20190702.208ac80/setup.py'''
@author: sean
'''
from setuptools import setup, find_packages
from os.path import isfile
if isfile('README.md'):
with open('README.md') as readme:
long_description = readme.read()
else:
long_description = '???'
setup(
name='rpmfile',
description='Read rpm archive files',
version="0.1.5",
author='Sean Ross-Ross',
author_email='srossross@gmail.com',
url='https://github.com/srossross/rpmfile',
license='MIT',
long_description=long_description,
long_description_content_type='text/markdown',
packages=find_packages(),
)
0707010000000F000041ED000003E800000064000000025D1BA02900000000000000000000000000000000000000000000002F00000000python-rpmfile-1.0.0+git20190702.208ac80/tests07070100000010000081A4000003E800000064000000015D1BA02900000000000000000000000000000000000000000000003B00000000python-rpmfile-1.0.0+git20190702.208ac80/tests/__init__.py07070100000011000081A4000003E800000064000000015D1BA02900000A62000000000000000000000000000000000000003F00000000python-rpmfile-1.0.0+git20190702.208ac80/tests/test_extract.pyimport os
import sys
import shutil
import hashlib
import tempfile
import unittest
from functools import wraps
try:
from urllib2 import urlopen
except ImportError:
from urllib.request import urlopen
import rpmfile
def download(url, rpmname):
def _downloader(func):
@wraps(func)
def wrapper(*args, **kwds):
args = list(args)
rpmpath = os.path.join(args[0].tempdir, rpmname)
args.append(rpmpath)
download = urlopen(url)
with open(rpmpath, 'wb') as target_file:
target_file.write(download.read())
download.close()
return func(*args, **kwds)
return wrapper
return _downloader
class TempDirTest(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.prevdir = os.getcwd()
cls.tempdir = tempfile.mkdtemp()
os.chdir(cls.tempdir)
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.tempdir)
os.chdir(cls.prevdir)
@unittest.skipUnless(sys.version_info.major >= 3 \
and sys.version_info.minor >= 3,
'Need lzma module')
@download('https://download.clearlinux.org/releases/10540/clear/x86_64/os/Packages/sudo-setuid-1.8.17p1-34.x86_64.rpm', 'sudo.rpm')
def test_lzma_sudo(self, rpmpath):
with rpmfile.open(rpmpath) as rpm:
# Inspect the RPM headers
self.assertIn('name', rpm.headers.keys())
self.assertEqual(rpm.headers.get('arch', 'noarch'), b'x86_64')
members = list(rpm.getmembers())
self.assertEqual(len(members), 1)
fd = rpm.extractfile('./usr/bin/sudo')
calculated = hashlib.md5(fd.read()).hexdigest()
self.assertEqual(calculated, 'a208f3d9170ecfa69a0f4ccc78d2f8f6')
@download('https://download.fedoraproject.org/pub/fedora/linux/releases/30/Everything/source/tree/Packages/r/rpm-4.14.2.1-4.fc30.1.src.rpm', 'sample.rpm')
def test_autoclose(self, rpmpath):
"""Test that RPMFile.open context manager properly closes rpm file"""
rpm_ref = None
with rpmfile.open(rpmpath) as rpm:
rpm_ref = rpm
# Inspect the RPM headers
self.assertIn('name', rpm.headers.keys())
self.assertEqual(rpm.headers.get('arch', 'noarch'), b'x86_64')
members = list(rpm.getmembers())
self.assertEqual(len(members), 13)
# Test that RPMFile owned file descriptor and that underlying file is really closed
self.assertTrue(rpm_ref._fileobj.closed)
self.assertTrue(rpm_ref._ownes_fd)
07070100000012000081A4000003E800000064000000015D1BA029000003E9000000000000000000000000000000000000003F00000000python-rpmfile-1.0.0+git20190702.208ac80/tests/test_subfile.py'''
Created on Jan 11, 2014
@author: sean
'''
import unittest
import rpmfile
import io
class Test(unittest.TestCase):
def test_seek(self):
fd = io.BytesIO(b'Hello world')
sub = rpmfile._SubFile(fd, start=2, size=4)
sub.seek(0)
self.assertEqual(sub.tell(), 0)
sub.seek(1)
self.assertEqual(sub.tell(), 1)
sub.seek(1, 1)
self.assertEqual(sub.tell(), 2)
sub.seek(-1, 1)
self.assertEqual(sub.tell(), 1)
sub.seek(-10, 1)
self.assertEqual(sub.tell(), 0)
def test_read(self):
fd = io.BytesIO(b'Hello world')
sub = rpmfile._SubFile(fd, start=2, size=4)
self.assertEqual(sub.read(), b'llo ')
self.assertEqual(sub.read(), b'')
sub.seek(0)
self.assertEqual(sub.read(2), b'll')
sub.seek(0)
self.assertEqual(sub.read(10), b'llo ')
if __name__ == "__main__":
# import sys;sys.argv = ['', 'Test.testSeek']
unittest.main()
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!84 blocks