File python-iptables-1.0.0.obscpio of Package python-iptables
07070100000000000081A40000000000000000000000015EE6923E000001DD000000000000000000000000000000000000002100000000python-iptables-1.0.0/.gitignore*.o
*.so
*.pyc
*.pyo
*.bak
/build
/doc/_build
/.project
/MANIFEST
/dist
/scripts
/Vagrantfile
/.vagrant
/tags
*/__pycache__/
*.pyc
/build/
/.pybuild/
*.egg-info
# Debian build related files
/debian/python-iptables/
/debian/python3-iptables/
/debian/python-iptables-dbg/
/debian/python3-iptables-dbg/
/debian/python-iptables-doc/
/debian/files
/debian/outfile
/debian/*.debhelper
/debian/*.log
/debian/*-stamp
/debian/*.substvars
# Added exclusion for PyCharm files
.idea/*
07070100000001000081A40000000000000000000000015EE6923E00000108000000000000000000000000000000000000002200000000python-iptables-1.0.0/.travis.ymldist: xenial
language: python
python:
- "2.7"
- "3.4"
- "3.5"
- "3.6"
- "3.7"
install:
- python setup.py build
- python setup.py install
- sudo pip install coveralls
script:
- sudo PATH=$PATH coverage run setup.py test
after_success:
coveralls
07070100000002000081A40000000000000000000000015EE6923E00000043000000000000000000000000000000000000002200000000python-iptables-1.0.0/MANIFEST.ininclude doc/* iptc/* libxtwrapper/* NOTICE README setup.py test.py
07070100000003000081A40000000000000000000000015EE6923E000029BD000000000000000000000000000000000000001D00000000python-iptables-1.0.0/NOTICE
Copyright (c) 2010-, Vilmos Nebehaj and others
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Apache License
Version 2.0, January 2004
http://www.apache.org/licenses/
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
1. Definitions.
"License" shall mean the terms and conditions for use, reproduction,
and distribution as defined by Sections 1 through 9 of this document.
"Licensor" shall mean the copyright owner or entity authorized by
the copyright owner that is granting the License.
"Legal Entity" shall mean the union of the acting entity and all
other entities that control, are controlled by, or are under common
control with that entity. For the purposes of this definition,
"control" means (i) the power, direct or indirect, to cause the
direction or management of such entity, whether by contract or
otherwise, or (ii) ownership of fifty percent (50%) or more of the
outstanding shares, or (iii) beneficial ownership of such entity.
"You" (or "Your") shall mean an individual or Legal Entity
exercising permissions granted by this License.
"Source" form shall mean the preferred form for making modifications,
including but not limited to software source code, documentation
source, and configuration files.
"Object" form shall mean any form resulting from mechanical
transformation or translation of a Source form, including but
not limited to compiled object code, generated documentation,
and conversions to other media types.
"Work" shall mean the work of authorship, whether in Source or
Object form, made available under the License, as indicated by a
copyright notice that is included in or attached to the work
(an example is provided in the Appendix below).
"Derivative Works" shall mean any work, whether in Source or Object
form, that is based on (or derived from) the Work and for which the
editorial revisions, annotations, elaborations, or other modifications
represent, as a whole, an original work of authorship. For the purposes
of this License, Derivative Works shall not include works that remain
separable from, or merely link (or bind by name) to the interfaces of,
the Work and Derivative Works thereof.
"Contribution" shall mean any work of authorship, including
the original version of the Work and any modifications or additions
to that Work or Derivative Works thereof, that is intentionally
submitted to Licensor for inclusion in the Work by the copyright owner
or by an individual or Legal Entity authorized to submit on behalf of
the copyright owner. For the purposes of this definition, "submitted"
means any form of electronic, verbal, or written communication sent
to the Licensor or its representatives, including but not limited to
communication on electronic mailing lists, source code control systems,
and issue tracking systems that are managed by, or on behalf of, the
Licensor for the purpose of discussing and improving the Work, but
excluding communication that is conspicuously marked or otherwise
designated in writing by the copyright owner as "Not a Contribution."
"Contributor" shall mean Licensor and any individual or Legal Entity
on behalf of whom a Contribution has been received by Licensor and
subsequently incorporated within the Work.
2. Grant of Copyright License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
copyright license to reproduce, prepare Derivative Works of,
publicly display, publicly perform, sublicense, and distribute the
Work and such Derivative Works in Source or Object form.
3. Grant of Patent License. Subject to the terms and conditions of
this License, each Contributor hereby grants to You a perpetual,
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
(except as stated in this section) patent license to make, have made,
use, offer to sell, sell, import, and otherwise transfer the Work,
where such license applies only to those patent claims licensable
by such Contributor that are necessarily infringed by their
Contribution(s) alone or by combination of their Contribution(s)
with the Work to which such Contribution(s) was submitted. If You
institute patent litigation against any entity (including a
cross-claim or counterclaim in a lawsuit) alleging that the Work
or a Contribution incorporated within the Work constitutes direct
or contributory patent infringement, then any patent licenses
granted to You under this License for that Work shall terminate
as of the date such litigation is filed.
4. Redistribution. You may reproduce and distribute copies of the
Work or Derivative Works thereof in any medium, with or without
modifications, and in Source or Object form, provided that You
meet the following conditions:
(a) You must give any other recipients of the Work or
Derivative Works a copy of this License; and
(b) You must cause any modified files to carry prominent notices
stating that You changed the files; and
(c) You must retain, in the Source form of any Derivative Works
that You distribute, all copyright, patent, trademark, and
attribution notices from the Source form of the Work,
excluding those notices that do not pertain to any part of
the Derivative Works; and
(d) If the Work includes a "NOTICE" text file as part of its
distribution, then any Derivative Works that You distribute must
include a readable copy of the attribution notices contained
within such NOTICE file, excluding those notices that do not
pertain to any part of the Derivative Works, in at least one
of the following places: within a NOTICE text file distributed
as part of the Derivative Works; within the Source form or
documentation, if provided along with the Derivative Works; or,
within a display generated by the Derivative Works, if and
wherever such third-party notices normally appear. The contents
of the NOTICE file are for informational purposes only and
do not modify the License. You may add Your own attribution
notices within Derivative Works that You distribute, alongside
or as an addendum to the NOTICE text from the Work, provided
that such additional attribution notices cannot be construed
as modifying the License.
You may add Your own copyright statement to Your modifications and
may provide additional or different license terms and conditions
for use, reproduction, or distribution of Your modifications, or
for any such Derivative Works as a whole, provided Your use,
reproduction, and distribution of the Work otherwise complies with
the conditions stated in this License.
5. Submission of Contributions. Unless You explicitly state otherwise,
any Contribution intentionally submitted for inclusion in the Work
by You to the Licensor shall be under the terms and conditions of
this License, without any additional terms or conditions.
Notwithstanding the above, nothing herein shall supersede or modify
the terms of any separate license agreement you may have executed
with Licensor regarding such Contributions.
6. Trademarks. This License does not grant permission to use the trade
names, trademarks, service marks, or product names of the Licensor,
except as required for reasonable and customary use in describing the
origin of the Work and reproducing the content of the NOTICE file.
7. Disclaimer of Warranty. Unless required by applicable law or
agreed to in writing, Licensor provides the Work (and each
Contributor provides its Contributions) on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied, including, without limitation, any warranties or conditions
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
PARTICULAR PURPOSE. You are solely responsible for determining the
appropriateness of using or redistributing the Work and assume any
risks associated with Your exercise of permissions under this License.
8. Limitation of Liability. In no event and under no legal theory,
whether in tort (including negligence), contract, or otherwise,
unless required by applicable law (such as deliberate and grossly
negligent acts) or agreed to in writing, shall any Contributor be
liable to You for damages, including any direct, indirect, special,
incidental, or consequential damages of any character arising as a
result of this License or out of the use or inability to use the
Work (including but not limited to damages for loss of goodwill,
work stoppage, computer failure or malfunction, or any and all
other commercial damages or losses), even if such Contributor
has been advised of the possibility of such damages.
9. Accepting Warranty or Additional Liability. While redistributing
the Work or Derivative Works thereof, You may choose to offer,
and charge a fee for, acceptance of support, warranty, indemnity,
or other liability obligations and/or rights consistent with this
License. However, in accepting such obligations, You may act only
on Your own behalf and on Your sole responsibility, not on behalf
of any other Contributor, and only if You agree to indemnify,
defend, and hold each Contributor harmless for any liability
incurred by, or claims asserted against, such Contributor by reason
of your accepting any such warranty or additional liability.
END OF TERMS AND CONDITIONS
07070100000004000081ED0000000000000000000000015EE6923E0000545A000000000000000000000000000000000000002000000000python-iptables-1.0.0/README.mdIntroduction
============
About python-iptables
---------------------
**Iptables** is the tool that is used to manage **netfilter**, the
standard packet filtering and manipulation framework under Linux. As the
iptables manpage puts it:
> Iptables is used to set up, maintain, and inspect the tables of IPv4
> packet filter rules in the Linux kernel. Several different tables may
> be defined.
>
> Each table contains a number of built-in chains and may also contain
> user- defined chains.
>
> Each chain is a list of rules which can match a set of packets. Each
> rule specifies what to do with a packet that matches. This is called a
> target, which may be a jump to a user-defined chain in the same table.
`Python-iptables` provides a pythonesque wrapper via python bindings to
iptables under Linux. Interoperability with iptables is achieved via
using the iptables C libraries (`libiptc`, `libxtables`, and the
iptables extensions), not calling the iptables binary and parsing its
output. It is meant primarily for dynamic and/or complex routers and
firewalls, where rules are often updated or changed, or Python programs
wish to interface with the Linux iptables framework..
If you are looking for `ebtables` python bindings, check out
[python-ebtables](https://github.com/ldx/python-ebtables/).
`Python-iptables` supports Python 2.6, 2.7 and 3.4.
[](https://flattr.com/submit/auto?user_id=ldx&url=https%3A%2F%2Fgithub.com%2Fldx%2Fpython-iptables)
[](https://pypi.python.org/pypi/python-iptables)
[](https://travis-ci.org/ldx/python-iptables)
[](https://coveralls.io/r/ldx/python-iptables?branch=codecoverage)
[](https://landscape.io/github/ldx/python-iptables/codecoverage)
[](https://pypi.python.org/pypi/python-iptables)
[](https://pypi.python.org/pypi/python-iptables)
Installing via pip
------------------
The usual way:
pip install --upgrade python-iptables
Compiling from source
---------------------
First make sure you have iptables installed (most Linux distributions
install it by default). `Python-iptables` needs the shared libraries
`libiptc.so` and `libxtables.so` coming with iptables, they are
installed in `/lib` on Ubuntu.
You can compile `python-iptables` in the usual distutils way:
% cd python-iptables
% python setup.py build
If you like, `python-iptables` can also be installed into a
`virtualenv`:
% mkvirtualenv python-iptables
% python setup.py install
If you install `python-iptables` as a system package, make sure the
directory where `distutils` installs shared libraries is in the dynamic
linker's search path (it's in `/etc/ld.so.conf` or in one of the files
in the folder `/etc/ld.co.conf.d`). Under Ubuntu `distutils` by default
installs into `/usr/local/lib`.
Now you can run the tests:
% sudo PATH=$PATH python setup.py test
WARNING: this test will manipulate iptables rules.
Don't do this on a production machine.
Would you like to continue? y/n y
[...]
The `PATH=$PATH` part is necessary after `sudo` if you have installed
into a `virtualenv`, since `sudo` will reset your environment to a
system setting otherwise..
Once everything is in place you can fire up python to check whether the
package can be imported:
% sudo PATH=$PATH python
>>> import iptc
>>>
Of course you need to be root to be able to use iptables.
Using a custom iptables install
-------------------------------
If you are stuck on a system with an old version of `iptables`, you can
install a more up to date version to a custom location, and ask
`python-iptables` to use libraries at that location.
To install `iptables` to `/tmp/iptables`:
% git clone git://git.netfilter.org/iptables && cd iptables
% ./autogen.sh
% ./configure --prefix=/tmp/iptables
% make
% make install
Make sure the dependencies `iptables` needs are installed.
Now you can point `python-iptables` to this install path via:
% sudo PATH=$PATH IPTABLES_LIBDIR=/tmp/iptables/lib XTABLES_LIBDIR=/tmp/iptables/lib/xtables python
>>> import iptc
>>>
What is supported
-----------------
The basic iptables framework and all the match/target extensions are
supported by `python-iptables`, including IPv4 and IPv6 ones. All IPv4
and IPv6 tables are supported as well.
Full documentation with API reference is available
[here](http://ldx.github.com/python-iptables/).
Examples
========
High level abstractions
-----------------------
``python-iptables`` implements a low-level interface that tries to closely
match the underlying C libraries. The module ``iptc.easy`` improves the
usability of the library by providing a rich set of high-level functions
designed to simplify the interaction with the library, for example:
>>> import iptc
>>> iptc.easy.dump_table('nat', ipv6=False)
{'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []}
>>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False)
[{'comment': {'comment': 'DNS traffic to Google'},
'counters': (1, 56),
'dst': '8.8.8.8/32',
'protocol': 'udp',
'target': 'ACCEPT',
'udp': {'dport': '53'}}]
>>> iptc.easy.add_chain('filter', 'TestChain')
True
>>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}
>>> iptc.easy.insert_rule('filter', 'TestChain', rule_d)
>>> iptc.easy.dump_chain('filter', 'TestChain')
[{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}]
>>> iptc.easy.delete_chain('filter', 'TestChain', flush=True)
>>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto
>>> iptc.easy.add_chain('filter', 'TestChainGoto')
>>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}}
>>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d)
Rules
-----
In `python-iptables`, you usually first create a rule, and set any
source/destination address, in/out interface and protocol specifiers,
for example:
>>> import iptc
>>> rule = iptc.Rule()
>>> rule.in_interface = "eth0"
>>> rule.src = "192.168.1.0/255.255.255.0"
>>> rule.protocol = "tcp"
This creates a rule that will match TCP packets coming in on eth0, with
a source IP address of 192.168.1.0/255.255.255.0.
A rule may contain matches and a target. A match is like a filter
matching certain packet attributes, while a target tells what to do with
the packet (drop it, accept it, transform it somehow, etc). One can
create a match or target via a Rule:
>>> rule = iptc.Rule()
>>> m = rule.create_match("tcp")
>>> t = rule.create_target("DROP")
Match and target parameters can be changed after creating them. It is
also perfectly valid to create a match or target via instantiating them
with their constructor, but you still need a rule and you have to add
the matches and the target to their rule manually:
>>> rule = iptc.Rule()
>>> match = iptc.Match(rule, "tcp")
>>> target = iptc.Target(rule, "DROP")
>>> rule.add_match(match)
>>> rule.target = target
Any parameters a match or target might take can be set via the
attributes of the object. To set the destination port for a TCP match:
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> match = rule.create_match("tcp")
>>> match.dport = "80"
To set up a rule that matches packets marked with 0xff:
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> match = rule.create_match("mark")
>>> match.mark = "0xff"
Parameters are always strings. You can supply any string as the
parameter value, but note that most extensions validate their
parameters. For example this:
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> rule.target = iptc.Target(rule, "ACCEPT")
>>> match = iptc.Match(rule, "state")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> match.state = "RELATED,ESTABLISHED"
>>> rule.add_match(match)
>>> chain.insert_rule(rule)
will work. However, if you change the state parameter:
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> rule.target = iptc.Target(rule, "ACCEPT")
>>> match = iptc.Match(rule, "state")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> match.state = "RELATED,ESTABLISHED,FOOBAR"
>>> rule.add_match(match)
>>> chain.insert_rule(rule)
`python-iptables` will throw an exception:
Traceback (most recent call last):
File "state.py", line 7, in <module>
match.state = "RELATED,ESTABLISHED,FOOBAR"
File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 369, in __setattr__
self.parse(name.replace("_", "-"), value)
File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 286, in parse
self._parse(argv, inv, entry)
File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 516, in _parse
ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p)))
File "/home/user/Projects/python-iptables/iptc/xtables.py", line 736, in new
ret = fn(*args)
File "/home/user/Projects/python-iptables/iptc/xtables.py", line 1031, in parse_match
argv[1]))
iptc.xtables.XTablesError: state: parameter error -2 (RELATED,ESTABLISHED,FOOBAR)
Certain parameters take a string that optionally consists of multiple
words. The comment match is a good example:
>>> rule = iptc.Rule()
>>> rule.src = "127.0.0.1"
>>> rule.protocol = "udp"
>>> rule.target = rule.create_target("ACCEPT")
>>> match = rule.create_match("comment")
>>> match.comment = "this is a test comment"
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
Note that this is still just one parameter value.
However, when a match or a target takes multiple parameter values, that
needs to be passed in as a list. Let's assume you have created and set
up an `ipset` called `blacklist` via the `ipset` command. To create a
rule with a match for this set:
>>> rule = iptc.Rule()
>>> m = rule.create_match("set")
>>> m.match_set = ['blacklist', 'src']
Note how this time a list was used for the parameter value, since the
`set` match `match_set` parameter expects two values. See the `iptables`
manpages to find out what the extensions you use expect. See
[ipset](http://ipset.netfilter.org/) for more information.
When you are ready constructing your rule, add them to the chain you
want it to show up in:
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
This will put your rule into the INPUT chain in the filter table.
Chains and tables
-----------------
You can of course also check what a rule's source/destination address,
in/out inteface etc is. To print out all rules in the FILTER table:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> for chain in table.chains:
>>> print "======================="
>>> print "Chain ", chain.name
>>> for rule in chain.rules:
>>> print "Rule", "proto:", rule.protocol, "src:", rule.src, "dst:", \
>>> rule.dst, "in:", rule.in_interface, "out:", rule.out_interface,
>>> print "Matches:",
>>> for match in rule.matches:
>>> print match.name,
>>> print "Target:",
>>> print rule.target.name
>>> print "======================="
As you see in the code snippet above, rules are organized into chains,
and chains are in tables. You have a fixed set of tables; for IPv4:
- `FILTER`,
- `NAT`,
- `MANGLE` and
- `RAW`.
For IPv6 the tables are:
- `FILTER`,
- `MANGLE`,
- `RAW` and
- `SECURITY`.
To access a table:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> print table.name
filter
To create a new chain in the FILTER table:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = table.create_chain("testchain")
$ sudo iptables -L -n
[...]
Chain testchain (0 references)
target prot opt source destination
To access an existing chain:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, "INPUT")
>>> chain.name
'INPUT'
>>> len(chain.rules)
10
>>>
More about matches and targets
------------------------------
There are basic targets, such as `DROP` and `ACCEPT`. E.g. to reject
packets with source address `127.0.0.1/255.0.0.0` coming in on any of
the `eth` interfaces:
>>> import iptc
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> rule = iptc.Rule()
>>> rule.in_interface = "eth+"
>>> rule.src = "127.0.0.1/255.0.0.0"
>>> target = iptc.Target(rule, "DROP")
>>> rule.target = target
>>> chain.insert_rule(rule)
To instantiate a target or match, we can either create an object like
above, or use the `rule.create_target(target_name)` and
`rule.create_match(match_name)` methods. For example, in the code above
target could have been created as:
>>> target = rule.create_target("DROP")
instead of:
>>> target = iptc.Target(rule, "DROP")
>>> rule.target = target
The former also adds the match or target to the rule, saving a call.
Another example, using a target which takes parameters. Let's mark
packets going to `192.168.1.2` UDP port `1234` with `0xffff`:
>>> import iptc
>>> chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "PREROUTING")
>>> rule = iptc.Rule()
>>> rule.dst = "192.168.1.2"
>>> rule.protocol = "udp"
>>> match = iptc.Match(rule, "udp")
>>> match.dport = "1234"
>>> rule.add_match(match)
>>> target = iptc.Target(rule, "MARK")
>>> target.set_mark = "0xffff"
>>> rule.target = target
>>> chain.insert_rule(rule)
Matches are optional (specifying a target is mandatory). E.g. to insert
a rule to NAT TCP packets going out via `eth0`:
>>> import iptc
>>> chain = iptc.Chain(iptc.Table(iptc.Table.NAT), "POSTROUTING")
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> rule.out_interface = "eth0"
>>> target = iptc.Target(rule, "MASQUERADE")
>>> target.to_ports = "1234"
>>> rule.target = target
>>> chain.insert_rule(rule)
Here only the properties of the rule decide whether the rule will be
applied to a packet.
Matches are optional, but we can add multiple matches to a rule. In the
following example we will do that, using the `iprange` and the `tcp`
matches:
>>> import iptc
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> match = iptc.Match(rule, "tcp")
>>> match.dport = "22"
>>> rule.add_match(match)
>>> match = iptc.Match(rule, "iprange")
>>> match.src_range = "192.168.1.100-192.168.1.200"
>>> match.dst_range = "172.22.33.106"
>>> rule.add_match(match)
>>> rule.target = iptc.Target(rule, "DROP")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
This is the `python-iptables` equivalent of the following iptables
command:
# iptables -A INPUT -p tcp –destination-port 22 -m iprange –src-range 192.168.1.100-192.168.1.200 –dst-range 172.22.33.106 -j DROP
You can of course negate matches, just like when you use `!` in front of
a match with iptables. For example:
>>> import iptc
>>> rule = iptc.Rule()
>>> match = iptc.Match(rule, "mac")
>>> match.mac_source = "!00:11:22:33:44:55"
>>> rule.add_match(match)
>>> rule.target = iptc.Target(rule, "ACCEPT")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
This results in:
$ sudo iptables -L -n
Chain INPUT (policy ACCEPT)
target prot opt source destination
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 MAC ! 00:11:22:33:44:55
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
Counters
--------
You can query rule and chain counters, e.g.:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
However, the counters are only refreshed when the underlying low-level
iptables connection is refreshed in `Table` via `table.refresh()`. For
example:
>>> import time, sys
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
>>> print "Please send some traffic"
>>> sys.stdout.flush()
>>> time.sleep(3)
>>> for rule in chain.rules:
>>> # Here you will get back the same counter values as above
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
This will show you the same counter values even if there was traffic
hitting your rules. You have to refresh your table to get update your
counters:
>>> import time, sys
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
>>> print "Please send some traffic"
>>> sys.stdout.flush()
>>> time.sleep(3)
>>> table.refresh() # Here: refresh table to update rule counters
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
What is more, if you add:
iptables -A OUTPUT -p tcp --sport 80
iptables -A OUTPUT -p tcp --sport 22
you can query rule and chain counters together with the protocol and
sport(or dport), e.g.:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> for match in rule.matches:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes, match.name, match.sport
Autocommit
----------
`Python-iptables` by default automatically performs an iptables commit
after each operation. That is, after you add a rule in
`python-iptables`, that will take effect immediately.
It may happen that you want to batch together certain operations. A
typical use case is traversing a chain and removing rules matching a
specific criteria. If you do this with autocommit enabled, after the
first delete operation, your chain's state will change and you have to
restart the traversal. You can do something like this:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> removed = True
>>> chain = iptc.Chain(table, "FORWARD")
>>> while removed == True:
>>> removed = False
>>> for rule in chain.rules:
>>> if rule.out_interface and "eth0" in rule.out_interface:
>>> chain.delete_rule(rule)
>>> removed = True
>>> break
This is clearly not ideal and the code is not very readable. An
alternative is to disable autocommits, traverse the chain, removing one
or more rules, than commit it:
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> table.autocommit = False
>>> chain = iptc.Chain(table, "FORWARD")
>>> for rule in chain.rules:
>>> if rule.out_interface and "eth0" in rule.out_interface:
>>> chain.delete_rule(rule)
>>> table.commit()
>>> table.autocommit = True
The drawback is that Table is a singleton, and if you disable
autocommit, it will be disabled for all instances of that Table.
Easy rules with dictionaries
----------------------------
To simplify operations with ``python-iptables`` rules we have included support to define and convert Rules object into python dictionaries.
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, "INPUT")
>>> # Create an iptc.Rule object from dictionary
>>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}
>>> rule = iptc.easy.encode_iptc_rule(rule_d)
>>> # Obtain a dictionary representation from the iptc.Rule
>>> iptc.easy.decode_iptc_rule(rule)
{'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match tcp.22'}, 'target': 'ACCEPT'}
Known Issues
============
These issues are mainly caused by complex interaction with upstream's
Netfilter implementation, and will require quite significant effort to
fix. Workarounds are available.
- The `hashlimit` match requires explicitly setting
`hashlimit_htable_expire`. See [Issue
\#201](https://github.com/ldx/python-iptables/issues/201).
- The `NOTRACK` target is problematic; use `CT --notrack` instead. See
[Issue \#204](https://github.com/ldx/python-iptables/issues/204).
07070100000005000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001D00000000python-iptables-1.0.0/debian07070100000006000081A40000000000000000000000015EE6923E000000B9000000000000000000000000000000000000002400000000python-iptables-1.0.0/debian/READMEThe Debian Package python-iptables
----------------------------
Comments regarding the Package
-- Juliano Martinez <juliano.martinez@locaweb.com.br> Tue, 13 Dec 2011 10:10:44 -0200
07070100000007000081A40000000000000000000000015EE6923E000000D4000000000000000000000000000000000000002B00000000python-iptables-1.0.0/debian/README.Debianpython-iptables for Debian
--------------------------
<possible notes regarding this package - if none, delete this file>
-- Juliano Martinez <juliano.martinez@locaweb.com.br> Tue, 13 Dec 2011 10:10:44 -0200
07070100000008000081A40000000000000000000000015EE6923E000000D1000000000000000000000000000000000000002B00000000python-iptables-1.0.0/debian/README.sourcepython-iptables for Debian
--------------------------
<this file describes information about the source package, see Debian policy
manual section 4.14. You WILL either need to modify or delete this file>
07070100000009000081A40000000000000000000000015EE6923E0000020C000000000000000000000000000000000000002700000000python-iptables-1.0.0/debian/changelogpython-iptables (0.12.0) xenial; urgency=low
* update debian changelog
-- Dan Fuhry <df@datto.com> Fri, 11 Nov 2016 14:03:32 -0500
python-iptables (0.5-git-20140925) precise; urgency=low
* update debian/
remove cdbs
build python3 packages
build debug packages
-- Markus Kötter <koetter@rrzn.uni-hannover.de> Thu, 25 Sep 2014 10:48:41 +0200
python-iptables (0.1.1) unstable; urgency=low
* Initial Release.
-- Juliano Martinez <juliano.martinez@locaweb.com.br> Tue, 13 Dec 2011 10:10:44 -0200
0707010000000A000081A40000000000000000000000015EE6923E00000002000000000000000000000000000000000000002400000000python-iptables-1.0.0/debian/compat7
0707010000000B000081A40000000000000000000000015EE6923E0000067C000000000000000000000000000000000000002500000000python-iptables-1.0.0/debian/controlSource: python-iptables
Section: net
Priority: extra
Maintainer: Juliano Martinez <juliano@martinez.io>
Build-depends: python-all-dev (>= 2.7), python-all-dbg (>= 2.7), python3-all-dev (>= 3.2), python3-all-dbg, debhelper (>= 7)
X-Python-Version: >= 2.7
X-Python3-Version: >= 3.2
Standards-Version: 3.8.4
Homepage: https://github.com/ldx/python-iptables
Package: python-iptables
Architecture: any
Depends: ${python:Depends}, ${shlibs:Depends}, ${misc:Depends}
Provides: ${python:Provides}
Description: Python bindings for iptables
Python-iptables is a Python project that provides bindings to the iptables C libraries in Linux.
Interoperability with iptables is achieved using the iptables C libraries (libiptc, libxtables,
and iptables extensions), not calling the iptables executable and parsing its output as most other
iptables wrapper libraries do; this makes python-iptables faster and not prone to parsing errors,
at the same time leveraging all available iptables match and target extensions without further work.
Package: python-iptables-dbg
Section: debug
Priority: extra
Architecture: any
Depends: ${python:Depends}, ${shlibs:Depends}, ${misc:Depends}, python-iptables (= ${binary:Version})
Provides: ${python:Provides}
Description: Python bindings for iptables
Package: python3-iptables
Architecture: any
Depends: ${python3:Depends}, ${shlibs:Depends}, ${misc:Depends}
Description: Python3 bindings for iptables
Package: python3-iptables-dbg
Section: debug
Priority: extra
Architecture: any
Depends: ${python3:Depends}, ${shlibs:Depends}, ${misc:Depends}, python3-iptables (= ${binary:Version})
Description: Python3 bindings for iptables
0707010000000C000081A40000000000000000000000015EE6923E00000444000000000000000000000000000000000000002700000000python-iptables-1.0.0/debian/copyrightThis work was packaged for Debian by:
Juliano Martinez <juliano.martinez@locaweb.com.br> on Tue, 13 Dec 2011 10:10:44 -0200
It was downloaded from:
https://github.com/ldx/python-iptables
Upstream Author(s):
Vilmos Nebehaj -> https://github.com/ldx
Copyright:
Vilmos Nebehaj
License:
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
On Debian systems, the complete text of the Apache version 2.0 license
can be found in "/usr/share/common-licenses/Apache-2.0".
The Debian packaging is:
Copyright (C) 2011 Juliano Martinez a.k.a (ncode) <juliano@martinez.io>
0707010000000D000081A40000000000000000000000015EE6923E0000000A000000000000000000000000000000000000002200000000python-iptables-1.0.0/debian/docsREADME.md
0707010000000E000081A40000000000000000000000015EE6923E0000001A000000000000000000000000000000000000003700000000python-iptables-1.0.0/debian/python3-iptables.postinst#!/bin/sh
/sbin/ldconfig
0707010000000F000081ED0000000000000000000000015EE6923E000005ED000000000000000000000000000000000000002300000000python-iptables-1.0.0/debian/rules#!/usr/bin/make -f
# This file was automatically generated by stdeb 0.6.0+git at
# Thu, 14 Nov 2013 16:04:50 +0100
PY3VERS := $(shell py3versions -r)
PY2VERS := $(shell pyversions -r)
%:
dh $@ --with python2,python3 --buildsystem=python_distutils
.PHONY: override_dh_clean
override_dh_clean:
rm -rf build/*
dh_clean
.PHONY: override_dh_auto_install
override_dh_auto_install:
set -e; \
for py in $(PY3VERS); do \
$$py -B setup.py install --root debian/python3-iptables --install-layout deb; \
$$py-dbg -B setup.py install --root debian/python3-iptables-dbg --install-layout deb; \
done
# On Ubuntu 14.04 and possibly other versions, /usr/lib/python3/dist-packages is not
# included in the system library search path, so we add it here.
set -e; \
mkdir -p $(CURDIR)/debian/python3-iptables/etc/ld.so.conf.d; \
echo "/usr/lib/python3/dist-packages" > $(CURDIR)/debian/python3-iptables/etc/ld.so.conf.d/python3-dist-packages.conf
set -e; \
for py in $(PY2VERS); do \
$$py -B setup.py install --root debian/python-iptables --install-layout deb; \
$$py-dbg -B setup.py install --root debian/python-iptables-dbg --install-layout deb; \
done
.PHONY: override_dh_strip
override_dh_strip:
set -e; \
dh_strip -ppython-ev --dbg-package=python-iptables-dbg; \
dh_strip -ppython3-ev --dbg-package=python3-iptables-dbg;
override_dh_python2:
dh_python2 -ppython-iptables
dh_python2 -ppython-iptables-dbg
override_dh_python3:
dh_python3 -ppython3-iptables
dh_python3 -ppython3-iptables-dbg
07070100000010000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000002400000000python-iptables-1.0.0/debian/source07070100000011000081A40000000000000000000000015EE6923E0000000D000000000000000000000000000000000000002B00000000python-iptables-1.0.0/debian/source/format3.0 (native)
07070100000012000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001A00000000python-iptables-1.0.0/doc07070100000013000081A40000000000000000000000015EE6923E00000E14000000000000000000000000000000000000002300000000python-iptables-1.0.0/doc/Makefile# Makefile for Sphinx documentation
#
# You can set these variables from the command line.
SPHINXOPTS =
SPHINXBUILD = sphinx-build
PAPER =
BUILDDIR = _build
# Internal variables.
PAPEROPT_a4 = -D latex_paper_size=a4
PAPEROPT_letter = -D latex_paper_size=letter
ALLSPHINXOPTS = -d $(BUILDDIR)/doctrees $(PAPEROPT_$(PAPER)) $(SPHINXOPTS) .
GH_PAGES_SOURCES = ../doc ../iptc
.PHONY: help clean html dirhtml pickle json htmlhelp qthelp latex changes linkcheck doctest
help:
@echo "Please use \`make <target>' where <target> is one of"
@echo " html to make standalone HTML files"
@echo " dirhtml to make HTML files named index.html in directories"
@echo " pickle to make pickle files"
@echo " json to make JSON files"
@echo " htmlhelp to make HTML files and a HTML help project"
@echo " qthelp to make HTML files and a qthelp project"
@echo " latex to make LaTeX files, you can set PAPER=a4 or PAPER=letter"
@echo " changes to make an overview of all changed/added/deprecated items"
@echo " linkcheck to check all external links for integrity"
@echo " doctest to run all doctests embedded in the documentation (if enabled)"
clean:
-rm -rf $(BUILDDIR)/*
html:
$(SPHINXBUILD) -b html $(ALLSPHINXOPTS) $(BUILDDIR)/html
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/html."
dirhtml:
$(SPHINXBUILD) -b dirhtml $(ALLSPHINXOPTS) $(BUILDDIR)/dirhtml
@echo
@echo "Build finished. The HTML pages are in $(BUILDDIR)/dirhtml."
pickle:
$(SPHINXBUILD) -b pickle $(ALLSPHINXOPTS) $(BUILDDIR)/pickle
@echo
@echo "Build finished; now you can process the pickle files."
json:
$(SPHINXBUILD) -b json $(ALLSPHINXOPTS) $(BUILDDIR)/json
@echo
@echo "Build finished; now you can process the JSON files."
htmlhelp:
$(SPHINXBUILD) -b htmlhelp $(ALLSPHINXOPTS) $(BUILDDIR)/htmlhelp
@echo
@echo "Build finished; now you can run HTML Help Workshop with the" \
".hhp project file in $(BUILDDIR)/htmlhelp."
qthelp:
$(SPHINXBUILD) -b qthelp $(ALLSPHINXOPTS) $(BUILDDIR)/qthelp
@echo
@echo "Build finished; now you can run "qcollectiongenerator" with the" \
".qhcp project file in $(BUILDDIR)/qthelp, like this:"
@echo "# qcollectiongenerator $(BUILDDIR)/qthelp/python-iptables.qhcp"
@echo "To view the help file:"
@echo "# assistant -collectionFile $(BUILDDIR)/qthelp/python-iptables.qhc"
latex:
$(SPHINXBUILD) -b latex $(ALLSPHINXOPTS) $(BUILDDIR)/latex
@echo
@echo "Build finished; the LaTeX files are in $(BUILDDIR)/latex."
@echo "Run \`make all-pdf' or \`make all-ps' in that directory to" \
"run these through (pdf)latex."
changes:
$(SPHINXBUILD) -b changes $(ALLSPHINXOPTS) $(BUILDDIR)/changes
@echo
@echo "The overview file is in $(BUILDDIR)/changes."
linkcheck:
$(SPHINXBUILD) -b linkcheck $(ALLSPHINXOPTS) $(BUILDDIR)/linkcheck
@echo
@echo "Link check complete; look for any errors in the above output " \
"or in $(BUILDDIR)/linkcheck/output.txt."
doctest:
$(SPHINXBUILD) -b doctest $(ALLSPHINXOPTS) $(BUILDDIR)/doctest
@echo "Testing of doctests in the sources finished, look at the " \
"results in $(BUILDDIR)/doctest/output.txt."
gh-pages:
git checkout gh-pages
git checkout master $(GH_PAGES_SOURCES)
make html && cp -rf _build/html/* ../ && make clean
git add ../*.* ../_*
git commit -m "Generated gh-pages for `git log master -1 --pretty=short --abbrev-commit`" && git push origin gh-pages
git reset --hard
git clean -f ..
git checkout master
markdown: intro.rst examples.rst
pandoc -o ../README.md -f rst -t markdown intro.rst examples.rst
07070100000014000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000002500000000python-iptables-1.0.0/doc/_templates07070100000015000081A40000000000000000000000015EE6923E00000229000000000000000000000000000000000000003100000000python-iptables-1.0.0/doc/_templates/layout.html{% extends "!layout.html" %}
{% block footer %}
{{ super() }}
<script type="text/javascript">
var _gaq = _gaq || [];
_gaq.push(['_setAccount', 'UA-38352912-1']);
_gaq.push(['_trackPageview']);
(function() {
var ga = document.createElement('script'); ga.type = 'text/javascript'; ga.async = true;
ga.src = ('https:' == document.location.protocol ? 'https://ssl' : 'http://www') + '.google-analytics.com/ga.js';
var s = document.getElementsByTagName('script')[0]; s.parentNode.insertBefore(ga, s);
})();
</script>
{% endblock %}
07070100000016000081A40000000000000000000000015EE6923E00001A1B000000000000000000000000000000000000002200000000python-iptables-1.0.0/doc/conf.py# -*- coding: utf-8 -*-
#
# python-iptables documentation build configuration file, created by
# sphinx-quickstart on Wed Oct 27 21:54:51 2010.
#
# This file is execfile()d with the current directory set to its containing dir.
#
# Note that not all possible configuration values are present in this
# autogenerated file.
#
# All configuration values have a default; values that are commented out
# serve to show the default.
import sys, os
# If extensions (or modules to document with autodoc) are in another directory,
# add these directories to sys.path here. If the directory is relative to the
# documentation root, use os.path.abspath to make it absolute, like shown here.
#sys.path.append(os.path.abspath('.'))
sys.path.append(os.path.abspath('../'))
# -- General configuration -----------------------------------------------------
# Add any Sphinx extension module names here, as strings. They can be extensions
# coming with Sphinx (named 'sphinx.ext.*') or your custom ones.
extensions = ['sphinx.ext.autodoc', 'sphinx.ext.doctest', 'sphinx.ext.intersphinx', 'sphinx.ext.todo', 'sphinx.ext.coverage', 'sphinx.ext.ifconfig']
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
# The suffix of source filenames.
source_suffix = '.rst'
# The encoding of source files.
#source_encoding = 'utf-8'
# The master toctree document.
master_doc = 'index'
# General information about the project.
project = u'python-iptables'
copyright = u'2010-2014, Vilmos Nebehaj'
# The version info for the project you're documenting, acts as replacement for
# |version| and |release|, also used in various other places throughout the
# built documents.
#
# The short X.Y version.
version = '0.4.0'
# The full version, including alpha/beta/rc tags.
release = '0.4.0-dev'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
#language = None
# There are two options for replacing |today|: either, you set today to some
# non-false value, then it is used:
#today = ''
# Else, today_fmt is used as the format for a strftime call.
#today_fmt = '%B %d, %Y'
# List of documents that shouldn't be included in the build.
#unused_docs = []
# List of directories, relative to source directory, that shouldn't be searched
# for source files.
exclude_trees = ['_build']
# The reST default role (used for this markup: `text`) to use for all documents.
#default_role = None
# If true, '()' will be appended to :func: etc. cross-reference text.
add_function_parentheses = True
# If true, the current module name will be prepended to all description
# unit titles (such as .. function::).
#add_module_names = True
# If true, sectionauthor and moduleauthor directives will be shown in the
# output. They are ignored by default.
#show_authors = False
# The name of the Pygments (syntax highlighting) style to use.
pygments_style = 'sphinx'
# A list of ignored prefixes for module index sorting.
#modindex_common_prefix = []
# -- Options for HTML output ---------------------------------------------------
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
html_theme = 'default'
# Theme options are theme-specific and customize the look and feel of a theme
# further. For a list of options available for each theme, see the
# documentation.
#html_theme_options = {}
# Add any paths that contain custom themes here, relative to this directory.
#html_theme_path = []
# The name for this set of Sphinx documents. If None, it defaults to
# "<project> v<release> documentation".
#html_title = None
# A shorter title for the navigation bar. Default is the same as html_title.
#html_short_title = None
# The name of an image file (relative to this directory) to place at the top
# of the sidebar.
#html_logo = None
# The name of an image file (within the static path) to use as favicon of the
# docs. This file should be a Windows icon file (.ico) being 16x16 or 32x32
# pixels large.
#html_favicon = None
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
# If not '', a 'Last updated on:' timestamp is inserted at every page bottom,
# using the given strftime format.
#html_last_updated_fmt = '%b %d, %Y'
# If true, SmartyPants will be used to convert quotes and dashes to
# typographically correct entities.
#html_use_smartypants = True
# Custom sidebar templates, maps document names to template names.
#html_sidebars = {}
# Additional templates that should be rendered to pages, maps page names to
# template names.
#html_additional_pages = {}
# If false, no module index is generated.
#html_use_modindex = True
# If false, no index is generated.
#html_use_index = True
# If true, the index is split into individual pages for each letter.
#html_split_index = False
# If true, links to the reST sources are added to the pages.
#html_show_sourcelink = True
# If true, an OpenSearch description file will be output, and all pages will
# contain a <link> tag referring to it. The value of this option must be the
# base URL from which the finished HTML is served.
#html_use_opensearch = ''
# If nonempty, this is the file name suffix for HTML files (e.g. ".xhtml").
#html_file_suffix = ''
# Output file base name for HTML help builder.
htmlhelp_basename = 'python-iptablesdoc'
# -- Options for LaTeX output --------------------------------------------------
# The paper size ('letter' or 'a4').
#latex_paper_size = 'letter'
# The font size ('10pt', '11pt' or '12pt').
#latex_font_size = '10pt'
# Grouping the document tree into LaTeX files. List of tuples
# (source start file, target name, title, author, documentclass [howto/manual]).
latex_documents = [
('index', 'python-iptables.tex', u'python-iptables Documentation',
u'Vilmos Nebehaj', 'manual'),
]
# The name of an image file (relative to this directory) to place at the top of
# the title page.
#latex_logo = None
# For "manual" documents, if this is true, then toplevel headings are parts,
# not chapters.
#latex_use_parts = False
# Additional stuff for the LaTeX preamble.
#latex_preamble = ''
# Documents to append as an appendix to all manuals.
#latex_appendices = []
# If false, no module index is generated.
#latex_use_modindex = True
# Example configuration for intersphinx: refer to the Python standard library.
intersphinx_mapping = {'http://docs.python.org/': None}
autoclass_content="both"
07070100000017000081A40000000000000000000000015EE6923E00004195000000000000000000000000000000000000002700000000python-iptables-1.0.0/doc/examples.rstExamples
========
High level abstractions
-----------------------
``python-iptables`` implements a low-level interface that tries to closely
match the underlying C libraries. The module ``iptc.easy`` improves the
usability of the library by providing a rich set of high-level functions
designed to simplify the interaction with the library, for example:
>>> import iptc
>>> iptc.easy.dump_table('nat', ipv6=False)
{'INPUT': [], 'OUTPUT': [], 'POSTROUTING': [], 'PREROUTING': []}
>>> iptc.easy.dump_chain('filter', 'OUTPUT', ipv6=False)
[{'comment': {'comment': 'DNS traffic to Google'},
'counters': (1, 56),
'dst': '8.8.8.8/32',
'protocol': 'udp',
'target': 'ACCEPT',
'udp': {'dport': '53'}}]
>>> iptc.easy.add_chain('filter', 'TestChain')
True
>>> rule_d = {'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}
>>> iptc.easy.insert_rule('filter', 'TestChain', rule_d)
>>> iptc.easy.dump_chain('filter', 'TestChain')
[{'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}]
>>> iptc.easy.delete_chain('filter', 'TestChain', flush=True)
>>> # Example of goto rule // iptables -A FORWARD -p gre -g TestChainGoto
>>> iptc.easy.add_chain('filter', 'TestChainGoto')
>>> rule_goto_d = {'protocol': 'gre', 'target': {'goto': 'TestChainGoto'}}
>>> iptc.easy.insert_rule('filter', 'FORWARD', rule_goto_d)
Rules
-----
In ``python-iptables``, you usually first create a rule, and set any
source/destination address, in/out interface and protocol specifiers, for
example::
>>> import iptc
>>> rule = iptc.Rule()
>>> rule.in_interface = "eth0"
>>> rule.src = "192.168.1.0/255.255.255.0"
>>> rule.protocol = "tcp"
This creates a rule that will match TCP packets coming in on eth0, with a
source IP address of 192.168.1.0/255.255.255.0.
A rule may contain matches and a target. A match is like a filter matching
certain packet attributes, while a target tells what to do with the packet
(drop it, accept it, transform it somehow, etc). One can create a match or
target via a Rule::
>>> rule = iptc.Rule()
>>> m = rule.create_match("tcp")
>>> t = rule.create_target("DROP")
Match and target parameters can be changed after creating them. It is also
perfectly valid to create a match or target via instantiating them with
their constructor, but you still need a rule and you have to add the matches
and the target to their rule manually::
>>> rule = iptc.Rule()
>>> match = iptc.Match(rule, "tcp")
>>> target = iptc.Target(rule, "DROP")
>>> rule.add_match(match)
>>> rule.target = target
Any parameters a match or target might take can be set via the attributes of
the object. To set the destination port for a TCP match::
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> match = rule.create_match("tcp")
>>> match.dport = "80"
To set up a rule that matches packets marked with 0xff::
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> match = rule.create_match("mark")
>>> match.mark = "0xff"
Parameters are always strings. You can supply any string as the parameter
value, but note that most extensions validate their parameters. For example
this::
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> rule.target = iptc.Target(rule, "ACCEPT")
>>> match = iptc.Match(rule, "state")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> match.state = "RELATED,ESTABLISHED"
>>> rule.add_match(match)
>>> chain.insert_rule(rule)
will work. However, if you change the `state` parameter::
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> rule.target = iptc.Target(rule, "ACCEPT")
>>> match = iptc.Match(rule, "state")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> match.state = "RELATED,ESTABLISHED,FOOBAR"
>>> rule.add_match(match)
>>> chain.insert_rule(rule)
``python-iptables`` will throw an exception::
Traceback (most recent call last):
File "state.py", line 7, in <module>
match.state = "RELATED,ESTABLISHED,FOOBAR"
File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 369, in __setattr__
self.parse(name.replace("_", "-"), value)
File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 286, in parse
self._parse(argv, inv, entry)
File "/home/user/Projects/python-iptables/iptc/ip4tc.py", line 516, in _parse
ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p)))
File "/home/user/Projects/python-iptables/iptc/xtables.py", line 736, in new
ret = fn(*args)
File "/home/user/Projects/python-iptables/iptc/xtables.py", line 1031, in parse_match
argv[1]))
iptc.xtables.XTablesError: state: parameter error -2 (RELATED,ESTABLISHED,FOOBAR)
Certain parameters take a string that optionally consists of multiple words.
The comment match is a good example::
>>> rule = iptc.Rule()
>>> rule.src = "127.0.0.1"
>>> rule.protocol = "udp"
>>> rule.target = rule.create_target("ACCEPT")
>>> match = rule.create_match("comment")
>>> match.comment = "this is a test comment"
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
Note that this is still just one parameter value.
However, when a match or a target takes multiple parameter values, that needs
to be passed in as a list. Let's assume you have created and set up an
``ipset`` called ``blacklist`` via the ``ipset`` command. To create a rule
with a match for this set::
>>> rule = iptc.Rule()
>>> m = rule.create_match("set")
>>> m.match_set = ['blacklist', 'src']
Note how this time a list was used for the parameter value, since the ``set``
match ``match_set`` parameter expects two values. See the ``iptables``
manpages to find out what the extensions you use expect. See ipset_ for more
information.
.. _ipset: http://ipset.netfilter.org/
When you are ready constructing your rule, add them to the chain you want it
to show up in::
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
This will put your rule into the INPUT chain in the filter table.
Chains and tables
-----------------
You can of course also check what a rule's source/destination address,
in/out inteface etc is. To print out all rules in the FILTER table::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> for chain in table.chains:
>>> print "======================="
>>> print "Chain ", chain.name
>>> for rule in chain.rules:
>>> print "Rule", "proto:", rule.protocol, "src:", rule.src, "dst:", \
>>> rule.dst, "in:", rule.in_interface, "out:", rule.out_interface,
>>> print "Matches:",
>>> for match in rule.matches:
>>> print match.name,
>>> print "Target:",
>>> print rule.target.name
>>> print "======================="
As you see in the code snippet above, rules are organized into chains, and
chains are in tables. You have a fixed set of tables; for IPv4:
* ``FILTER``,
* ``NAT``,
* ``MANGLE`` and
* ``RAW``.
For IPv6 the tables are:
* ``FILTER``,
* ``MANGLE``,
* ``RAW`` and
* ``SECURITY``.
To access a table::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> print table.name
filter
To create a new chain in the FILTER table::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = table.create_chain("testchain")
$ sudo iptables -L -n
[...]
Chain testchain (0 references)
target prot opt source destination
To access an existing chain::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, "INPUT")
>>> chain.name
'INPUT'
>>> len(chain.rules)
10
>>>
More about matches and targets
------------------------------
There are basic targets, such as ``DROP`` and ``ACCEPT``. E.g. to reject
packets with source address ``127.0.0.1/255.0.0.0`` coming in on any of the
``eth`` interfaces::
>>> import iptc
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> rule = iptc.Rule()
>>> rule.in_interface = "eth+"
>>> rule.src = "127.0.0.1/255.0.0.0"
>>> target = iptc.Target(rule, "DROP")
>>> rule.target = target
>>> chain.insert_rule(rule)
To instantiate a target or match, we can either create an object like above,
or use the ``rule.create_target(target_name)`` and
``rule.create_match(match_name)`` methods. For example, in the code above
target could have been created as::
>>> target = rule.create_target("DROP")
instead of::
>>> target = iptc.Target(rule, "DROP")
>>> rule.target = target
The former also adds the match or target to the rule, saving a call.
Another example, using a target which takes parameters. Let's mark packets
going to ``192.168.1.2`` UDP port ``1234`` with ``0xffff``::
>>> import iptc
>>> chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "PREROUTING")
>>> rule = iptc.Rule()
>>> rule.dst = "192.168.1.2"
>>> rule.protocol = "udp"
>>> match = iptc.Match(rule, "udp")
>>> match.dport = "1234"
>>> rule.add_match(match)
>>> target = iptc.Target(rule, "MARK")
>>> target.set_mark = "0xffff"
>>> rule.target = target
>>> chain.insert_rule(rule)
Matches are optional (specifying a target is mandatory). E.g. to insert a rule
to NAT TCP packets going out via ``eth0``::
>>> import iptc
>>> chain = iptc.Chain(iptc.Table(iptc.Table.NAT), "POSTROUTING")
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> rule.out_interface = "eth0"
>>> target = iptc.Target(rule, "MASQUERADE")
>>> target.to_ports = "1234"
>>> rule.target = target
>>> chain.insert_rule(rule)
Here only the properties of the rule decide whether the rule will be applied
to a packet.
Matches are optional, but we can add multiple matches to a rule. In the
following example we will do that, using the ``iprange`` and the ``tcp``
matches::
>>> import iptc
>>> rule = iptc.Rule()
>>> rule.protocol = "tcp"
>>> match = iptc.Match(rule, "tcp")
>>> match.dport = "22"
>>> rule.add_match(match)
>>> match = iptc.Match(rule, "iprange")
>>> match.src_range = "192.168.1.100-192.168.1.200"
>>> match.dst_range = "172.22.33.106"
>>> rule.add_match(match)
>>> rule.target = iptc.Target(rule, "DROP")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
This is the ``python-iptables`` equivalent of the following iptables command::
# iptables -A INPUT -p tcp –destination-port 22 -m iprange –src-range 192.168.1.100-192.168.1.200 –dst-range 172.22.33.106 -j DROP
You can of course negate matches, just like when you use ``!`` in front of a
match with iptables. For example::
>>> import iptc
>>> rule = iptc.Rule()
>>> match = iptc.Match(rule, "mac")
>>> match.mac_source = "!00:11:22:33:44:55"
>>> rule.add_match(match)
>>> rule.target = iptc.Target(rule, "ACCEPT")
>>> chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "INPUT")
>>> chain.insert_rule(rule)
This results in::
$ sudo iptables -L -n
Chain INPUT (policy ACCEPT)
target prot opt source destination
ACCEPT all -- 0.0.0.0/0 0.0.0.0/0 MAC ! 00:11:22:33:44:55
Chain FORWARD (policy ACCEPT)
target prot opt source destination
Chain OUTPUT (policy ACCEPT)
target prot opt source destination
Counters
--------
You can query rule and chain counters, e.g.::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
However, the counters are only refreshed when the underlying low-level
iptables connection is refreshed in ``Table`` via ``table.refresh()``. For
example::
>>> import time, sys
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
>>> print "Please send some traffic"
>>> sys.stdout.flush()
>>> time.sleep(3)
>>> for rule in chain.rules:
>>> # Here you will get back the same counter values as above
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
This will show you the same counter values even if there was traffic hitting
your rules. You have to refresh your table to get update your counters::
>>> import time, sys
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
>>> print "Please send some traffic"
>>> sys.stdout.flush()
>>> time.sleep(3)
>>> table.refresh() # Here: refresh table to update rule counters
>>> for rule in chain.rules:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes
What is more, if you add::
iptables -A OUTPUT -p tcp --sport 80
iptables -A OUTPUT -p tcp --sport 22
you can query rule and chain counters together with the protocol and sport(or
dport), e.g.::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, 'OUTPUT')
>>> for rule in chain.rules:
>>> for match in rule.matches:
>>> (packets, bytes) = rule.get_counters()
>>> print packets, bytes, match.name, match.sport
Autocommit
----------
``Python-iptables`` by default automatically performs an iptables commit after
each operation. That is, after you add a rule in ``python-iptables``, that
will take effect immediately.
It may happen that you want to batch together certain operations. A typical
use case is traversing a chain and removing rules matching a specific
criteria. If you do this with autocommit enabled, after the first delete
operation, your chain's state will change and you have to restart the
traversal. You can do something like this::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> removed = True
>>> chain = iptc.Chain(table, "FORWARD")
>>> while removed == True:
>>> removed = False
>>> for rule in chain.rules:
>>> if rule.out_interface and "eth0" in rule.out_interface:
>>> chain.delete_rule(rule)
>>> removed = True
>>> break
This is clearly not ideal and the code is not very readable. An alternative is
to disable autocommits, traverse the chain, removing one or more rules, than
commit it::
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> table.autocommit = False
>>> chain = iptc.Chain(table, "FORWARD")
>>> for rule in chain.rules:
>>> if rule.out_interface and "eth0" in rule.out_interface:
>>> chain.delete_rule(rule)
>>> table.commit()
>>> table.autocommit = True
The drawback is that `Table` is a singleton, and if you disable autocommit, it
will be disabled for all instances of that `Table`.
Easy rules with dictionaries
----------------------------
To simplify operations with ``python-iptables`` rules we have included support to define and convert Rules object into python dictionaries.
>>> import iptc
>>> table = iptc.Table(iptc.Table.FILTER)
>>> chain = iptc.Chain(table, "INPUT")
>>> # Create an iptc.Rule object from dictionary
>>> rule_d = {'comment': {'comment': 'Match tcp.22'}, 'protocol': 'tcp', 'target': 'ACCEPT', 'tcp': {'dport': '22'}}
>>> rule = iptc.easy.encode_iptc_rule(rule_d)
>>> # Obtain a dictionary representation from the iptc.Rule
>>> iptc.easy.decode_iptc_rule(rule)
{'tcp': {'dport': '22'}, 'protocol': 'tcp', 'comment': {'comment': 'Match tcp.22'}, 'target': 'ACCEPT'}
Known Issues
============
These issues are mainly caused by complex interaction with upstream's
Netfilter implementation, and will require quite significant effort to
fix. Workarounds are available.
- The ``hashlimit`` match requires explicitly setting ``hashlimit_htable_expire``. See `Issue #201 <https://github.com/ldx/python-iptables/issues/201>`_.
- The ``NOTRACK`` target is problematic; use ``CT --notrack`` instead. See `Issue #204 <https://github.com/ldx/python-iptables/issues/204>`_.
07070100000018000081A40000000000000000000000015EE6923E000001DF000000000000000000000000000000000000002400000000python-iptables-1.0.0/doc/index.rst.. python-iptables documentation master file, created by
sphinx-quickstart on Wed Oct 27 21:54:51 2010.
You can adapt this file completely to your liking, but it should at least
contain the root `toctree` directive.
Welcome to python-iptables's documentation!
===========================================
Contents:
.. toctree::
:maxdepth: 2
intro
usage
examples
Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
07070100000019000081A40000000000000000000000015EE6923E0000142A000000000000000000000000000000000000002400000000python-iptables-1.0.0/doc/intro.rstIntroduction
============
About python-iptables
---------------------
**Iptables** is the tool that is used to manage **netfilter**, the standard
packet filtering and manipulation framework under Linux. As the iptables
manpage puts it:
Iptables is used to set up, maintain, and inspect the tables of IPv4
packet filter rules in the Linux kernel. Several different tables may be
defined.
Each table contains a number of built-in chains and may also contain
user- defined chains.
Each chain is a list of rules which can match a set of packets. Each
rule specifies what to do with a packet that matches. This is called a
`target`, which may be a jump to a user-defined chain in the same table.
``Python-iptables`` provides a pythonesque wrapper via python bindings to
iptables under Linux. Interoperability with iptables is achieved via using
the iptables C libraries (``libiptc``, ``libxtables``, and the iptables
extensions), not calling the iptables binary and parsing its output. It is
meant primarily for dynamic and/or complex routers and firewalls, where rules
are often updated or changed, or Python programs wish to interface with the
Linux iptables framework..
If you are looking for ``ebtables`` python bindings, check out
`python-ebtables <https://github.com/ldx/python-ebtables/>`_.
``Python-iptables`` supports Python 2.6, 2.7 and 3.4.
.. image:: http://api.flattr.com/button/flattr-badge-large.png
:target: https://flattr.com/submit/auto?user_id=ldx&url=https%3A%2F%2Fgithub.com%2Fldx%2Fpython-iptables
:alt: Flattr
.. image:: https://pypip.in/v/python-iptables/badge.png
:target: https://pypi.python.org/pypi/python-iptables
:alt: Latest Release
.. image:: https://travis-ci.org/ldx/python-iptables.png?branch=master
:target: https://travis-ci.org/ldx/python-iptables
:alt: Build Status
.. image:: https://coveralls.io/repos/ldx/python-iptables/badge.svg?branch=codecoverage
:target: https://coveralls.io/r/ldx/python-iptables?branch=codecoverage
:alt: Coverage Status
.. image:: https://landscape.io/github/ldx/python-iptables/codecoverage/landscape.svg
:target: https://landscape.io/github/ldx/python-iptables/codecoverage
:alt: Code Health
.. image:: https://pypip.in/d/python-iptables/badge.png
:target: https://pypi.python.org/pypi/python-iptables
:alt: Number of Downloads
.. image:: https://pypip.in/license/python-iptables/badge.png
:target: https://pypi.python.org/pypi/python-iptables
:alt: License
Installing via pip
------------------
The usual way::
pip install --upgrade python-iptables
Compiling from source
----------------------
First make sure you have iptables installed (most Linux distributions install
it by default). ``Python-iptables`` needs the shared libraries ``libiptc.so``
and ``libxtables.so`` coming with iptables, they are installed in ``/lib`` on
Ubuntu.
You can compile ``python-iptables`` in the usual distutils way::
% cd python-iptables
% python setup.py build
If you like, ``python-iptables`` can also be installed into a ``virtualenv``::
% mkvirtualenv python-iptables
% python setup.py install
If you install ``python-iptables`` as a system package, make sure the
directory where ``distutils`` installs shared libraries is in the dynamic
linker's search path (it's in ``/etc/ld.so.conf`` or in one of the files in
the folder ``/etc/ld.co.conf.d``). Under Ubuntu ``distutils`` by default
installs into ``/usr/local/lib``.
Now you can run the tests::
% sudo PATH=$PATH python setup.py test
WARNING: this test will manipulate iptables rules.
Don't do this on a production machine.
Would you like to continue? y/n y
[...]
The ``PATH=$PATH`` part is necessary after ``sudo`` if you have installed into
a ``virtualenv``, since ``sudo`` will reset your environment to a system
setting otherwise..
Once everything is in place you can fire up python to check whether the
package can be imported::
% sudo PATH=$PATH python
>>> import iptc
>>>
Of course you need to be root to be able to use iptables.
Using a custom iptables install
-------------------------------
If you are stuck on a system with an old version of ``iptables``, you can
install a more up to date version to a custom location, and ask
``python-iptables`` to use libraries at that location.
To install ``iptables`` to ``/tmp/iptables``::
% git clone git://git.netfilter.org/iptables && cd iptables
% ./autogen.sh
% ./configure --prefix=/tmp/iptables
% make
% make install
Make sure the dependencies ``iptables`` needs are installed.
Now you can point ``python-iptables`` to this install path via::
% sudo PATH=$PATH IPTABLES_LIBDIR=/tmp/iptables/lib XTABLES_LIBDIR=/tmp/iptables/lib/xtables python
>>> import iptc
>>>
What is supported
-----------------
The basic iptables framework and all the match/target extensions are supported
by ``python-iptables``, including IPv4 and IPv6 ones. All IPv4 and IPv6 tables
are supported as well.
Full documentation with API reference is available here_.
.. _here: http://ldx.github.com/python-iptables/
0707010000001A000081A40000000000000000000000015EE6923E00000900000000000000000000000000000000000000002400000000python-iptables-1.0.0/doc/usage.rstUsage
=====
The python API in ``python-iptables`` tries to mimic the logic of iptables.
You have
* **Tables**, **Table.FILTER**, **Table.NAT**, **Table.MANGLE** and
**Table.RAW** for IPv4; **Table6.FILTER**, **Table6.SECURITY**,
**Table6.MANGLE** and **Table6.RAW** for IPv6. They can be used to
filter packets, do network address translation or modify packets in
various ways.
* **Chains** inside tables. Each table has a few built-in chains, but you
can also create your own chains and jump into them from other chains.
When you create your chains you should also specify which table it will
be used in. **Chains** have **Policies**, which tell what to do when the
end of a chain is reached.
* Each chain has zero or more **rules**. A rule specifies what kind of
packets to match (matches, each rule can have zero, one or more matches)
and what to do with them (target, each rule has one of them). Iptables
implements a plethora of match and target extensions. For IPv4, the
class implementing this is called *Rule*, for IPv6 it is called *Rule6*.
* **Matches**, specifying when a rule needs to be applied to a packet. To
create a match object you also has to specify the rule to which it
belongs.
* **Targets**, specifying what to do when a rule is applied to a packet.
To create a target object you also has to specify the rule to which it
belongs.
The python API is quite high-level and hides the low-level details from the
user. Using only the classes *Table*, *Chain*, *Rule*, *Match* and *Target*
virtually anything can be achieved that you can do with iptables from the
command line.
.. currentmodule:: iptc
Table
-----
.. autoclass:: Table
:members:
Table6
------
.. autoclass:: Table6
:members:
:inherited-members:
Chain
-----
.. autoclass:: Chain
:members:
Policy
------
.. autoclass:: Policy
:members:
Match
-----
.. autoclass:: Match
:members:
:inherited-members:
Target
------
.. autoclass:: Target
:members:
:inherited-members:
Rule
----
.. autoclass:: Rule
:members:
:inherited-members:
Rule6
-----
.. autoclass:: Rule6
:members:
:inherited-members:
IPTCError
---------
.. autoexception:: IPTCError
0707010000001B000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001B00000000python-iptables-1.0.0/iptc0707010000001C000081A40000000000000000000000015EE6923E00000156000000000000000000000000000000000000002700000000python-iptables-1.0.0/iptc/__init__.py# -*- coding: utf-8 -*-
"""
.. module:: iptc
:synopsis: Python bindings for libiptc.
.. moduleauthor:: Vilmos Nebehaj
"""
from iptc.ip4tc import (is_table_available, Table, Chain, Rule, Match, Target, Policy, IPTCError)
from iptc.ip6tc import is_table6_available, Table6, Rule6
from iptc.errors import *
import iptc.easy
__all__ = []
0707010000001D000081A40000000000000000000000015EE6923E000043A4000000000000000000000000000000000000002300000000python-iptables-1.0.0/iptc/easy.py# -*- coding: utf-8 -*-
# TODO:
# - Add documentation
# - Add HowToUse examples
from .ip4tc import Rule, Table, Chain, IPTCError
from .ip6tc import Rule6, Table6
_BATCH_MODE = False
def flush_all(ipv6=False):
""" Flush all available tables """
for table in get_tables(ipv6):
flush_table(table, ipv6)
def flush_table(table, ipv6=False, raise_exc=True):
""" Flush a table """
try:
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.flush()
except Exception as e:
if raise_exc: raise
def flush_chain(table, chain, ipv6=False, raise_exc=True):
""" Flush a chain in table """
try:
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_chain.flush()
except Exception as e:
if raise_exc: raise
def zero_all(table, ipv6=False):
""" Zero all tables """
for table in get_tables(ipv6):
zero_table(table, ipv6)
def zero_table(table, ipv6=False):
""" Zero a table """
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.zero_entries()
def zero_chain(table, chain, ipv6=False):
""" Zero a chain in table """
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_chain.zero_counters()
def has_chain(table, chain, ipv6=False):
""" Return True if chain exists in table False otherwise """
return _iptc_gettable(table, ipv6).is_chain(chain)
def has_rule(table, chain, rule_d, ipv6=False):
""" Return True if rule exists in chain False otherwise """
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
return iptc_rule in iptc_chain.rules
def add_chain(table, chain, ipv6=False, raise_exc=True):
""" Return True if chain was added successfully to a table, raise Exception otherwise """
try:
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.create_chain(chain)
return True
except Exception as e:
if raise_exc: raise
return False
def add_rule(table, chain, rule_d, position=0, ipv6=False):
""" Add a rule to a chain in a given position. 0=append, 1=first, n=nth position """
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
if position == 0:
# Insert rule in last position -> append
iptc_chain.append_rule(iptc_rule)
elif position > 0:
# Insert rule in given position -> adjusted as iptables CLI
iptc_chain.insert_rule(iptc_rule, position - 1)
elif position < 0:
# Insert rule in given position starting from bottom -> not available in iptables CLI
nof_rules = len(iptc_chain.rules)
_position = position + nof_rules
# Insert at the top if the position has looped over
if _position <= 0:
_position = 0
iptc_chain.insert_rule(iptc_rule, _position)
def insert_rule(table, chain, rule_d, ipv6=False):
""" Add a rule to a chain in the 1st position """
add_rule(table, chain, rule_d, position=1, ipv6=ipv6)
def delete_chain(table, chain, ipv6=False, flush=False, raise_exc=True):
""" Delete a chain """
try:
if flush:
flush_chain(table, chain, ipv6, raise_exc)
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.delete_chain(chain)
except Exception as e:
if raise_exc: raise
def delete_rule(table, chain, rule_d, ipv6=False, raise_exc=True):
""" Delete a rule from a chain """
try:
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
iptc_chain.delete_rule(iptc_rule)
except Exception as e:
if raise_exc: raise
def get_tables(ipv6=False):
""" Get all tables """
iptc_tables = _iptc_gettables(ipv6)
return [t.name for t in iptc_tables]
def get_chains(table, ipv6=False):
""" Return the existing chains of a table """
iptc_table = _iptc_gettable(table, ipv6)
return [iptc_chain.name for iptc_chain in iptc_table.chains]
def get_rule(table, chain, position=0, ipv6=False, raise_exc=True):
""" Get a rule from a chain in a given position. 0=all rules, 1=first, n=nth position """
try:
if position == 0:
# Return all rules
return dump_chain(table, chain, ipv6)
elif position > 0:
# Return specific rule by position
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = iptc_chain.rules[position - 1]
return decode_iptc_rule(iptc_rule, ipv6)
elif position < 0:
# Return last rule -> not available in iptables CLI
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = iptc_chain.rules[position]
return decode_iptc_rule(iptc_rule, ipv6)
except Exception as e:
if raise_exc: raise
def replace_rule(table, chain, old_rule_d, new_rule_d, ipv6=False):
""" Replaces an existing rule of a chain """
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_old_rule = encode_iptc_rule(old_rule_d, ipv6)
iptc_new_rule = encode_iptc_rule(new_rule_d, ipv6)
iptc_chain.replace_rule(iptc_new_rule, iptc_chain.rules.index(iptc_old_rule))
def get_rule_counters(table, chain, rule_d, ipv6=False):
""" Return a tuple with the rule counters (numberOfBytes, numberOfPackets) """
if not has_rule(table, chain, rule_d, ipv6):
raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain, table, rule_d))
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
iptc_rule_index = iptc_chain.rules.index(iptc_rule)
return iptc_chain.rules[iptc_rule_index].get_counters()
def get_rule_position(table, chain, rule_d, ipv6=False):
""" Return the position of a rule within a chain """
if not has_rule(table, chain, rule_d):
raise AttributeError('Chain <{}@{}> has no rule <{}>'.format(chain, table, rule_d))
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
return iptc_chain.rules.index(iptc_rule)
def test_rule(rule_d, ipv6=False):
""" Return True if the rule is a well-formed dictionary, False otherwise """
try:
encode_iptc_rule(rule_d, ipv6)
return True
except:
return False
def test_match(name, value, ipv6=False):
""" Return True if the match is valid, False otherwise """
try:
iptc_rule = Rule6() if ipv6 else Rule()
_iptc_setmatch(iptc_rule, name, value)
return True
except:
return False
def test_target(name, value, ipv6=False):
""" Return True if the target is valid, False otherwise """
try:
iptc_rule = Rule6() if ipv6 else Rule()
_iptc_settarget(iptc_rule, {name:value})
return True
except:
return False
def get_policy(table, chain, ipv6=False):
""" Return the default policy of chain in a table """
iptc_chain = _iptc_getchain(table, chain, ipv6)
return iptc_chain.get_policy().name
def set_policy(table, chain, policy='ACCEPT', ipv6=False):
""" Set the default policy of chain in a table """
iptc_chain = _iptc_getchain(table, chain, ipv6)
iptc_chain.set_policy(policy)
def dump_all(ipv6=False):
""" Return a dictionary representation of all tables """
return {table: dump_table(table, ipv6) for table in get_tables(ipv6)}
def dump_table(table, ipv6=False):
""" Return a dictionary representation of a table """
return {chain: dump_chain(table, chain, ipv6) for chain in get_chains(table, ipv6)}
def dump_chain(table, chain, ipv6=False):
""" Return a list with the dictionary representation of the rules of a table """
iptc_chain = _iptc_getchain(table, chain, ipv6)
return [decode_iptc_rule(iptc_rule, ipv6) for iptc_rule in iptc_chain.rules]
def batch_begin(table = None, ipv6=False):
""" Disable autocommit on a table """
_BATCH_MODE = True
if table:
tables = (table, )
else:
tables = get_tables(ipv6)
for table in tables:
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.autocommit = False
def batch_end(table = None, ipv6=False):
""" Enable autocommit on table and commit changes """
_BATCH_MODE = False
if table:
tables = (table, )
else:
tables = get_tables(ipv6)
for table in tables:
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.autocommit = True
def batch_add_chains(table, chains, ipv6=False, flush=True):
""" Add multiple chains to a table """
iptc_table = _batch_begin_table(table, ipv6)
for chain in chains:
if iptc_table.is_chain(chain):
iptc_chain = Chain(iptc_table, chain)
else:
iptc_chain = iptc_table.create_chain(chain)
if flush:
iptc_chain.flush()
_batch_end_table(table, ipv6)
def batch_delete_chains(table, chains, ipv6=False):
""" Delete multiple chains of a table """
iptc_table = _batch_begin_table(table, ipv6)
for chain in chains:
if iptc_table.is_chain(chain):
iptc_chain = Chain(iptc_table, chain)
iptc_chain.flush()
iptc_table.delete_chain(chain)
_batch_end_table(table, ipv6)
def batch_add_rules(table, batch_rules, ipv6=False):
""" Add multiple rules to a table with format (chain, rule_d, position) """
iptc_table = _batch_begin_table(table, ipv6)
for (chain, rule_d, position) in batch_rules:
iptc_chain = Chain(iptc_table, chain)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
if position == 0:
# Insert rule in last position -> append
iptc_chain.append_rule(iptc_rule)
elif position > 0:
# Insert rule in given position -> adjusted as iptables CLI
iptc_chain.insert_rule(iptc_rule, position-1)
elif position < 0:
# Insert rule in given position starting from bottom -> not available in iptables CLI
nof_rules = len(iptc_chain.rules)
iptc_chain.insert_rule(iptc_rule, position + nof_rules)
_batch_end_table(table, ipv6)
def batch_delete_rules(table, batch_rules, ipv6=False, raise_exc=True):
""" Delete multiple rules from table with format (chain, rule_d) """
try:
iptc_table = _batch_begin_table(table, ipv6)
for (chain, rule_d) in batch_rules:
iptc_chain = Chain(iptc_table, chain)
iptc_rule = encode_iptc_rule(rule_d, ipv6)
iptc_chain.delete_rule(iptc_rule)
_batch_end_table(table, ipv6)
except Exception as e:
if raise_exc: raise
def encode_iptc_rule(rule_d, ipv6=False):
""" Return a Rule(6) object from the input dictionary """
# Sanity check
assert(isinstance(rule_d, dict))
# Basic rule attributes
rule_attr = ('src', 'dst', 'protocol', 'in-interface', 'out-interface', 'fragment')
iptc_rule = Rule6() if ipv6 else Rule()
# Set default target
rule_d.setdefault('target', '')
# Avoid issues with matches that require basic parameters to be configured first
for name in rule_attr:
if name in rule_d:
setattr(iptc_rule, name.replace('-', '_'), rule_d[name])
for name, value in rule_d.items():
try:
if name in rule_attr:
continue
elif name == 'counters':
_iptc_setcounters(iptc_rule, value)
elif name == 'target':
_iptc_settarget(iptc_rule, value)
else:
_iptc_setmatch(iptc_rule, name, value)
except Exception as e:
#print('Ignoring unsupported field <{}:{}>'.format(name, value))
continue
return iptc_rule
def decode_iptc_rule(iptc_rule, ipv6=False):
""" Return a dictionary representation of the Rule(6) object
Note: host IP addresses are appended their corresponding CIDR """
d = {}
if ipv6==False and iptc_rule.src != '0.0.0.0/0.0.0.0':
_ip, _netmask = iptc_rule.src.split('/')
_netmask = _netmask_v4_to_cidr(_netmask)
d['src'] = '{}/{}'.format(_ip, _netmask)
elif ipv6==True and iptc_rule.src != '::/0':
d['src'] = iptc_rule.src
if ipv6==False and iptc_rule.dst != '0.0.0.0/0.0.0.0':
_ip, _netmask = iptc_rule.dst.split('/')
_netmask = _netmask_v4_to_cidr(_netmask)
d['dst'] = '{}/{}'.format(_ip, _netmask)
elif ipv6==True and iptc_rule.dst != '::/0':
d['dst'] = iptc_rule.dst
if iptc_rule.protocol != 'ip':
d['protocol'] = iptc_rule.protocol
if iptc_rule.in_interface is not None:
d['in-interface'] = iptc_rule.in_interface
if iptc_rule.out_interface is not None:
d['out-interface'] = iptc_rule.out_interface
if ipv6 == False and iptc_rule.fragment:
d['fragment'] = iptc_rule.fragment
for m in iptc_rule.matches:
if m.name not in d:
d[m.name] = m.get_all_parameters()
elif isinstance(d[m.name], list):
d[m.name].append(m.get_all_parameters())
else:
d[m.name] = [d[m.name], m.get_all_parameters()]
if iptc_rule.target and iptc_rule.target.name and len(iptc_rule.target.get_all_parameters()):
name = iptc_rule.target.name.replace('-', '_')
d['target'] = {name:iptc_rule.target.get_all_parameters()}
elif iptc_rule.target and iptc_rule.target.name:
if iptc_rule.target.goto:
d['target'] = {'goto':iptc_rule.target.name}
else:
d['target'] = iptc_rule.target.name
# Get counters
d['counters'] = iptc_rule.counters
# Return a filtered dictionary
return _filter_empty_field(d)
### INTERNAL FUNCTIONS ###
def _iptc_table_available(table, ipv6=False):
""" Return True if the table is available, False otherwise """
try:
iptc_table = Table6(table) if ipv6 else Table(table)
return True
except:
return False
def _iptc_gettables(ipv6=False):
""" Return an updated view of all available iptc_table """
iptc_cls = Table6 if ipv6 else Table
return [_iptc_gettable(t, ipv6) for t in iptc_cls.ALL if _iptc_table_available(t, ipv6)]
def _iptc_gettable(table, ipv6=False):
""" Return an updated view of an iptc_table """
iptc_table = Table6(table) if ipv6 else Table(table)
if _BATCH_MODE is False:
iptc_table.commit()
iptc_table.refresh()
return iptc_table
def _iptc_getchain(table, chain, ipv6=False, raise_exc=True):
""" Return an iptc_chain of an updated table """
try:
iptc_table = _iptc_gettable(table, ipv6)
if not iptc_table.is_chain(chain):
raise AttributeError('Table <{}> has no chain <{}>'.format(table, chain))
return Chain(iptc_table, chain)
except Exception as e:
if raise_exc: raise
def _iptc_setcounters(iptc_rule, value):
# Value is a tuple (numberOfBytes, numberOfPackets)
iptc_rule.counters = value
def _iptc_setmatch(iptc_rule, name, value):
# Iterate list/tuple recursively
if isinstance(value, list) or isinstance(value, tuple):
for inner_value in value:
_iptc_setmatch(iptc_rule, name, inner_value)
# Assign dictionary value
elif isinstance(value, dict):
iptc_match = iptc_rule.create_match(name)
[iptc_match.set_parameter(k, v) for k, v in value.items()]
# Assign value directly
else:
iptc_match = iptc_rule.create_match(name)
iptc_match.set_parameter(name, value)
def _iptc_settarget(iptc_rule, value):
# Target is dictionary - Use only 1st pair key/value
if isinstance(value, dict):
t_name, t_value = next(iter(value.items()))
if t_name == 'goto':
iptc_target = iptc_rule.create_target(t_value, goto=True)
else:
iptc_target = iptc_rule.create_target(t_name)
[iptc_target.set_parameter(k, v) for k, v in t_value.items()]
# Simple target
else:
iptc_target = iptc_rule.create_target(value)
def _batch_begin_table(table, ipv6=False):
""" Disable autocommit on a table """
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.autocommit = False
return iptc_table
def _batch_end_table(table, ipv6=False):
""" Enable autocommit on table and commit changes """
iptc_table = _iptc_gettable(table, ipv6)
iptc_table.autocommit = True
return iptc_table
def _filter_empty_field(data_d):
"""
Remove empty lists from dictionary values
Before: {'target': {'CHECKSUM': {'checksum-fill': []}}}
After: {'target': {'CHECKSUM': {'checksum-fill': ''}}}
Before: {'tcp': {'dport': ['22']}}}
After: {'tcp': {'dport': '22'}}}
"""
for k, v in data_d.items():
if isinstance(v, dict):
data_d[k] = _filter_empty_field(v)
elif isinstance(v, list) and len(v) != 0:
v = [_filter_empty_field(_v) if isinstance(_v, dict) else _v for _v in v ]
if isinstance(v, list) and len(v) == 1:
data_d[k] = v.pop()
elif isinstance(v, list) and len(v) == 0:
data_d[k] = ''
return data_d
def _netmask_v4_to_cidr(netmask_addr):
# Implement Subnet Mask conversion without dependencies
return sum([bin(int(x)).count('1') for x in netmask_addr.split('.')])
### /INTERNAL FUNCTIONS ###
0707010000001E000081A40000000000000000000000015EE6923E00000093000000000000000000000000000000000000002500000000python-iptables-1.0.0/iptc/errors.py# -*- coding: utf-8 -*-
class XTablesError(Exception):
"""Raised when an xtables call fails for some reason."""
__all__ = ['XTablesError']
0707010000001F000081A40000000000000000000000015EE6923E00010274000000000000000000000000000000000000002400000000python-iptables-1.0.0/iptc/ip4tc.py# -*- coding: utf-8 -*-
import os
import re
import shlex
import sys
import ctypes as ct
import socket
import struct
import weakref
from .util import find_library, load_kernel, find_libc
from .xtables import (XT_INV_PROTO, NFPROTO_IPV4, XTablesError, xtables,
xt_align, xt_counters, xt_entry_target, xt_entry_match)
__all__ = ["Table", "Chain", "Rule", "Match", "Target", "Policy", "IPTCError"]
try:
load_kernel("ip_tables")
except:
pass
# Add IPPROTO_SCTP to socket module if not available
if not hasattr(socket, 'IPPROTO_SCTP'):
setattr(socket, 'IPPROTO_SCTP', 132)
_IFNAMSIZ = 16
_libc = find_libc()
_get_errno_loc = _libc.__errno_location
_get_errno_loc.restype = ct.POINTER(ct.c_int)
_malloc = _libc.malloc
_malloc.restype = ct.POINTER(ct.c_ubyte)
_malloc.argtypes = [ct.c_size_t]
_free = _libc.free
_free.restype = None
_free.argtypes = [ct.POINTER(ct.c_ubyte)]
# Make sure xt_params is set up.
xtables(NFPROTO_IPV4)
def is_table_available(name):
try:
Table(name)
return True
except IPTCError:
pass
return False
class in_addr(ct.Structure):
"""This class is a representation of the C struct in_addr."""
_fields_ = [("s_addr", ct.c_uint32)]
class ipt_ip(ct.Structure):
"""This class is a representation of the C struct ipt_ip."""
_fields_ = [("src", in_addr),
("dst", in_addr),
("smsk", in_addr),
("dmsk", in_addr),
("iniface", ct.c_char * _IFNAMSIZ),
("outiface", ct.c_char * _IFNAMSIZ),
("iniface_mask", ct.c_char * _IFNAMSIZ),
("outiface_mask", ct.c_char * _IFNAMSIZ),
("proto", ct.c_uint16),
("flags", ct.c_uint8),
("invflags", ct.c_uint8)]
# flags
IPT_F_FRAG = 0x01 # set if rule is a fragment rule
IPT_F_GOTO = 0x02 # set if jump is a goto
IPT_F_MASK = 0x03 # all possible flag bits mask
# invflags
IPT_INV_VIA_IN = 0x01 # invert the sense of IN IFACE
IPT_INV_VIA_OUT = 0x02 # invert the sense of OUT IFACE
IPT_INV_TOS = 0x04 # invert the sense of TOS
IPT_INV_SRCIP = 0x08 # invert the sense of SRC IP
IPT_INV_DSTIP = 0x10 # invert the sense of DST OP
IPT_INV_FRAG = 0x20 # invert the sense of FRAG
IPT_INV_PROTO = XT_INV_PROTO # invert the sense of PROTO (XT_INV_PROTO)
IPT_INV_MASK = 0x7F # all possible flag bits mask
def __init__(self):
# default: full netmask
self.smsk.s_addr = self.dmsk.s_addr = 0xffffffff
class ipt_entry(ct.Structure):
"""This class is a representation of the C struct ipt_entry."""
_fields_ = [("ip", ipt_ip),
("nfcache", ct.c_uint), # mark with fields that we care about
("target_offset", ct.c_uint16), # size of ipt_entry + matches
("next_offset", ct.c_uint16), # size of e + matches + target
("comefrom", ct.c_uint), # back pointer
("counters", xt_counters), # packet and byte counters
("elems", ct.c_ubyte * 0)] # any matches then the target
class IPTCError(Exception):
"""This exception is raised when a low-level libiptc error occurs.
It contains a short description about the error that occurred while
executing an iptables operation.
"""
_libiptc, _ = find_library("ip4tc", "iptc") # old iptables versions use iptc
class iptc(object):
"""This class contains all libiptc API calls."""
iptc_init = _libiptc.iptc_init
iptc_init.restype = ct.POINTER(ct.c_int)
iptc_init.argstype = [ct.c_char_p]
iptc_free = _libiptc.iptc_free
iptc_free.restype = None
iptc_free.argstype = [ct.c_void_p]
iptc_commit = _libiptc.iptc_commit
iptc_commit.restype = ct.c_int
iptc_commit.argstype = [ct.c_void_p]
iptc_builtin = _libiptc.iptc_builtin
iptc_builtin.restype = ct.c_int
iptc_builtin.argstype = [ct.c_char_p, ct.c_void_p]
iptc_first_chain = _libiptc.iptc_first_chain
iptc_first_chain.restype = ct.c_char_p
iptc_first_chain.argstype = [ct.c_void_p]
iptc_next_chain = _libiptc.iptc_next_chain
iptc_next_chain.restype = ct.c_char_p
iptc_next_chain.argstype = [ct.c_void_p]
iptc_is_chain = _libiptc.iptc_is_chain
iptc_is_chain.restype = ct.c_int
iptc_is_chain.argstype = [ct.c_char_p, ct.c_void_p]
iptc_create_chain = _libiptc.iptc_create_chain
iptc_create_chain.restype = ct.c_int
iptc_create_chain.argstype = [ct.c_char_p, ct.c_void_p]
iptc_delete_chain = _libiptc.iptc_delete_chain
iptc_delete_chain.restype = ct.c_int
iptc_delete_chain.argstype = [ct.c_char_p, ct.c_void_p]
iptc_rename_chain = _libiptc.iptc_rename_chain
iptc_rename_chain.restype = ct.c_int
iptc_rename_chain.argstype = [ct.c_char_p, ct.c_char_p, ct.c_void_p]
iptc_flush_entries = _libiptc.iptc_flush_entries
iptc_flush_entries.restype = ct.c_int
iptc_flush_entries.argstype = [ct.c_char_p, ct.c_void_p]
iptc_zero_entries = _libiptc.iptc_zero_entries
iptc_zero_entries.restype = ct.c_int
iptc_zero_entries.argstype = [ct.c_char_p, ct.c_void_p]
# get the policy of a given built-in chain
iptc_get_policy = _libiptc.iptc_get_policy
iptc_get_policy.restype = ct.c_char_p
iptc_get_policy.argstype = [ct.c_char_p, ct.POINTER(xt_counters),
ct.c_void_p]
# Set the policy of a chain
iptc_set_policy = _libiptc.iptc_set_policy
iptc_set_policy.restype = ct.c_int
iptc_set_policy.argstype = [ct.c_char_p, ct.c_char_p,
ct.POINTER(xt_counters), ct.c_void_p]
# Get first rule in the given chain: NULL for empty chain.
iptc_first_rule = _libiptc.iptc_first_rule
iptc_first_rule.restype = ct.POINTER(ipt_entry)
iptc_first_rule.argstype = [ct.c_char_p, ct.c_void_p]
# Returns NULL when rules run out.
iptc_next_rule = _libiptc.iptc_next_rule
iptc_next_rule.restype = ct.POINTER(ipt_entry)
iptc_next_rule.argstype = [ct.POINTER(ipt_entry), ct.c_void_p]
# Returns a pointer to the target name of this entry.
iptc_get_target = _libiptc.iptc_get_target
iptc_get_target.restype = ct.c_char_p
iptc_get_target.argstype = [ct.POINTER(ipt_entry), ct.c_void_p]
# These functions return TRUE for OK or 0 and set errno. If errno ==
# 0, it means there was a version error (ie. upgrade libiptc).
# Rule numbers start at 1 for the first rule.
# Insert the entry `e' in chain `chain' into position `rulenum'.
iptc_insert_entry = _libiptc.iptc_insert_entry
iptc_insert_entry.restype = ct.c_int
iptc_insert_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry),
ct.c_int, ct.c_void_p]
# Atomically replace rule `rulenum' in `chain' with `e'.
iptc_replace_entry = _libiptc.iptc_replace_entry
iptc_replace_entry.restype = ct.c_int
iptc_replace_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry),
ct.c_int, ct.c_void_p]
# Append entry `e' to chain `chain'. Equivalent to insert with
# rulenum = length of chain.
iptc_append_entry = _libiptc.iptc_append_entry
iptc_append_entry.restype = ct.c_int
iptc_append_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry),
ct.c_void_p]
# Delete the first rule in `chain' which matches `e', subject to
# matchmask (array of length == origfw)
iptc_delete_entry = _libiptc.iptc_delete_entry
iptc_delete_entry.restype = ct.c_int
iptc_delete_entry.argstype = [ct.c_char_p, ct.POINTER(ipt_entry),
ct.POINTER(ct.c_ubyte), ct.c_void_p]
# Delete the rule in position `rulenum' in `chain'.
iptc_delete_num_entry = _libiptc.iptc_delete_num_entry
iptc_delete_num_entry.restype = ct.c_int
iptc_delete_num_entry.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p]
# Check the packet `e' on chain `chain'. Returns the verdict, or
# NULL and sets errno.
# iptc_check_packet = _libiptc.iptc_check_packet
# iptc_check_packet.restype = ct.c_char_p
# iptc_check_packet.argstype = [ct.c_char_p, ct.POINTER(ipt), ct.c_void_p]
# Get the number of references to this chain
iptc_get_references = _libiptc.iptc_get_references
iptc_get_references.restype = ct.c_int
iptc_get_references.argstype = [ct.c_uint, ct.c_char_p, ct.c_void_p]
# read packet and byte counters for a specific rule
iptc_read_counter = _libiptc.iptc_read_counter
iptc_read_counter.restype = ct.POINTER(xt_counters)
iptc_read_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p]
# zero packet and byte counters for a specific rule
iptc_zero_counter = _libiptc.iptc_zero_counter
iptc_zero_counter.restype = ct.c_int
iptc_zero_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p]
# set packet and byte counters for a specific rule
iptc_set_counter = _libiptc.iptc_set_counter
iptc_set_counter.restype = ct.c_int
iptc_set_counter.argstype = [ct.c_char_p, ct.c_uint,
ct.POINTER(xt_counters), ct.c_void_p]
# Translates errno numbers into more human-readable form than strerror.
iptc_strerror = _libiptc.iptc_strerror
iptc_strerror.restype = ct.c_char_p
iptc_strerror.argstype = [ct.c_int]
class IPTCModule(object):
"""Superclass for Match and Target."""
pattern = re.compile(
'\s*(!)?\s*--([-\w]+)\s+(!)?\s*"?([^"]*?)"?(?=\s*(?:!?\s*--|$))')
def __init__(self):
self._name = None
self._rule = None
self._module = None
self._revision = None
self._ptr = None
self._ptrptr = None
raise NotImplementedError()
def set_parameter(self, parameter, value=None):
"""
Set a parameter for target or match extension, with an optional value.
@param parameter: name of the parameter to set
@type parameter: C{str}
@param value: optional value of the parameter, defaults to C{None}
@type value: C{str} or a C{list} of C{str}
"""
if value is None:
value = ""
return self.parse(parameter.replace("_", "-"), value)
def parse(self, parameter, value):
# Parameter name must always be a string.
parameter = parameter.encode()
# Check if we are dealing with an inverted parameter value.
inv = ct.c_int(0)
if len(value) > 0 and value[0] == "!":
inv = ct.c_int(1)
value = value[1:]
# Value can be either a string, or a list of strings, e.g. "8888",
# "!0:65535" or ["!", "example_set", "dst"].
args = []
is_str = isinstance(value, str)
try:
if not is_str:
is_str = isinstance(value, unicode)
except:
pass
if is_str:
args = [value.encode()]
else:
try:
args = [val.encode() for val in value]
except:
raise TypeError("Invalid parameter value: "
"must be string or list of strings")
if not self._module.extra_opts and not self._module.x6_options:
raise AttributeError("%s: invalid parameter %s" %
(self._module.name, parameter))
parameter = parameter.strip()
N = len(args)
argv = (ct.c_char_p * (N + 1))()
argv[0] = parameter
for i in range(N):
argv[i + 1] = args[i]
entry = self._rule.entry and ct.pointer(self._rule.entry) or None
self._parse(argv, inv, entry)
def _parse(self, argv, inv, entry):
raise NotImplementedError()
def final_check(self):
if self._module:
self._update_parameters()
self._final_check() # subclasses override this
def _final_check(self):
raise NotImplementedError()
def _get_saved_buf(self, ip):
if not self._module or not self._module.save:
return None
# redirect C stdout to a pipe and read back the output of m->save
# Flush stdout to avoid getting buffered results
sys.stdout.flush()
# Save the current C stdout.
stdout = os.dup(1)
try:
# Create a pipe and use the write end to replace the original C
# stdout.
pipes = os.pipe()
os.dup2(pipes[1], 1)
self._xt.save(self._module, ip, self._ptr)
# Use the read end to read whatever was written.
buf = os.read(pipes[0], 1024)
# Clean up the pipe.
os.close(pipes[0])
os.close(pipes[1])
return buf
finally:
# Put the original C stdout back in place.
os.dup2(stdout, 1)
# Clean up the copy we made.
os.close(stdout)
def save(self, name):
return self._save(name, self.rule.get_ip())
def _save(self, name, ip):
buf = self._get_saved_buf(ip).decode()
if buf is None:
return None
if not self._module or not self._module.save:
return None
if name:
return self._get_value(buf, name)
else:
return self._get_all_values(buf)
def _get_all_values(self, buf):
table = {} # variable -> (value, inverted)
res = re.findall(IPTCModule.pattern, buf)
for x in res:
value, invert = (x[3], x[0] or x[2])
table[x[1].replace("-", "_")] = "%s%s" % (invert and "!" or "",
value)
return table
def _get_value(self, buf, name):
table = {} # variable -> (value, inverted)
res = re.findall(IPTCModule.pattern, buf)
for x in res:
table[x[1]] = (x[3], x[0] or x[2])
try:
value, invert = table[name]
return "%s%s" % (invert and "!" or "", value)
except KeyError:
return None
def get_all_parameters(self):
params = {}
ip = self.rule.get_ip()
buf = self._get_saved_buf(ip)
if buf is None:
return params
if type(buf) != str:
# In Python3, string and bytes are different types.
buf = buf.decode()
res = shlex.split(buf)
res.reverse()
inv = False
key = None
while len(res) > 0:
x = res.pop()
if x == '!':
# Next parameter is negated.
inv = True
continue
if x.startswith('--'): # This is a parameter name.
key = x[2:]
if inv:
params[key] = ['!']
else:
params[key] = []
inv = False
continue
# At this point key should be set, unless the output from save is
# not formatted right. Let's be defensive, since some users
# reported that problem.
if key is not None:
params[key].append(x) # This is a parameter value.
return params
def _update_parameters(self):
params = self.get_all_parameters().items()
self.reset()
for k, v in params:
self.set_parameter(k, v)
def _get_alias_name(self):
if not self._module or not self._ptr:
return None
alias = getattr(self._module, 'alias', None)
if not alias:
return None
return self._module.alias(self._ptr).decode()
def __setattr__(self, name, value):
if not name.startswith('_') and name not in dir(self):
self.parse(name.replace("_", "-"), value)
else:
object.__setattr__(self, name, value)
def __getattr__(self, name):
if not name.startswith('_'):
return self.save(name.replace("_", "-"))
def _get_parameters(self):
return self.save(None)
parameters = property(_get_parameters)
"""Dictionary with all parameters in the form of name -> value. A match or
target might have default parameters as well, so this dictionary will
contain those set by the module by default too."""
def _get_name(self):
alias = self._get_alias_name()
return alias and alias or self._name
name = property(_get_name)
"""Name of this target or match."""
def _get_rule(self):
return self._rule
def _set_rule(self, rule):
self._rule = rule
rule = property(_get_rule, _set_rule)
"""The rule this target or match belong to."""
class _Buffer(object):
def __init__(self, size=0):
if size > 0:
self.buffer = _malloc(size)
if self.buffer is None:
raise Exception("Can't allocate buffer")
else:
self.buffer = None
def __del__(self):
if self.buffer is not None:
_free(self.buffer)
class Match(IPTCModule):
"""Matches are extensions which can match for special header fields or
other attributes of a packet.
Target and match extensions in iptables have parameters. These parameters
are implemented as instance attributes in python. However, to make the
names of parameters legal attribute names they have to be converted. The
rule is to cut the leading double dash from the name, and replace
dashes in parameter names with underscores so they are accepted by
python as attribute names. E.g. the *TOS* target has parameters
*--set-tos*, *--and-tos*, *--or-tos* and *--xor-tos*; they become
*target.set_tos*, *target.and_tos*, *target.or_tos* and *target.xor_tos*,
respectively. The value of a parameter is always a string, if a parameter
does not take any value in the iptables extension, an empty string *""*
should be used.
"""
def __init__(self, rule, name=None, match=None, revision=None):
"""
*rule* is the Rule object this match belongs to; it can be changed
later via *set_rule()*. *name* is the name of the iptables match
extension (in lower case), *match* is the raw buffer of the match
structure if the caller has it. Either *name* or *match* must be
provided. *revision* is the revision number of the extension that
should be used; different revisions use different structures in C and
they usually only work with certain kernel versions. Python-iptables
by default will use the latest revision available.
"""
if not name and not match:
raise ValueError("can't create match based on nothing")
if not name:
name = match.u.user.name.decode()
self._name = name
self._rule = rule
self._orig_parse = None
self._orig_options = None
self._xt = xtables(rule.nfproto)
module = self._xt.find_match(name)
real_name = module and getattr(module[0], 'real_name', None) or None
if real_name:
# Alias name, look up real module.
self._name = real_name.decode()
self._orig_parse = getattr(module[0], 'x6_parse', None)
self._orig_options = getattr(module[0], 'x6_options', None)
module = self._xt.find_match(real_name)
if not module:
raise XTablesError("can't find match %s" % (name))
self._module = module[0]
self._module.mflags = 0
if revision is not None:
self._revision = revision
else:
self._revision = self._module.revision
self._match_buf = (ct.c_ubyte * self.size)()
if match:
ct.memmove(ct.byref(self._match_buf), ct.byref(match), self.size)
self._update_pointers()
self._check_alias()
else:
self.reset()
def _check_alias(self):
name = self._get_alias_name()
if name is None:
return
alias_module = self._xt.find_match(name)
if alias_module is None:
return
self._alias_module = alias_module[0]
self._orig_parse = getattr(self._alias_module, 'x6_parse', None)
self._orig_options = getattr(self._alias_module, 'x6_options', None)
def __eq__(self, match):
basesz = ct.sizeof(xt_entry_match)
if (self.name == match.name and
self.match_buf[basesz:self.usersize] ==
match.match_buf[basesz:match.usersize]):
return True
return False
def __hash__(self):
return (hash(self.match.u.match_size) ^
hash(self.match.u.user.name) ^
hash(self.match.u.user.revision) ^
hash(bytes(self.match_buf)))
def __ne__(self, match):
return not self.__eq__(match)
def _final_check(self):
self._xt.final_check_match(self._module)
def _parse(self, argv, inv, entry):
self._xt.parse_match(argv, inv, self._module, entry,
ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p)),
self._orig_parse, self._orig_options)
def _get_size(self):
return xt_align(self._module.size + ct.sizeof(xt_entry_match))
size = property(_get_size)
"""This is the full size of the underlying C structure."""
def _get_user_size(self):
return self._module.userspacesize + ct.sizeof(xt_entry_match)
usersize = property(_get_user_size)
"""This is the size of the part of the underlying C structure that is used
in userspace."""
def _update_pointers(self):
self._ptr = ct.cast(ct.byref(self._match_buf),
ct.POINTER(xt_entry_match))
self._ptrptr = ct.cast(ct.pointer(self._ptr),
ct.POINTER(ct.POINTER(xt_entry_match)))
self._module.m = self._ptr
self._update_name()
def _update_name(self):
m = self._ptr[0]
m.u.user.name = self.name.encode()
def reset(self):
"""Reset the match.
Parameters are set to their default values, any flags are cleared."""
ct.memset(ct.byref(self._match_buf), 0, self.size)
self._update_pointers()
m = self._ptr[0]
m.u.match_size = self.size
m.u.user.revision = self._revision
if self._module.init:
self._module.init(self._ptr)
self._module.mflags = 0
udata_size = getattr(self._module, 'udata_size', 0)
if udata_size > 0:
udata_buf = (ct.c_ubyte * udata_size)()
self._module.udata = ct.cast(ct.byref(udata_buf), ct.c_void_p)
def _get_match(self):
return ct.cast(ct.byref(self.match_buf), ct.POINTER(xt_entry_match))[0]
match = property(_get_match)
"""This is the C structure used by the extension."""
def _get_match_buf(self):
return self._match_buf
match_buf = property(_get_match_buf)
"""This is the buffer holding the C structure used by the extension."""
class Target(IPTCModule):
"""Targets specify what to do with a packet when a match is found while
traversing the list of rule entries in a chain.
Target and match extensions in iptables have parameters. These parameters
are implemented as instance attributes in python. However, to make the
names of parameters legal attribute names they have to be converted. The
rule is to cut the leading double dash from the name, and replace
dashes in parameter names with underscores so they are accepted by
python as attribute names. E.g. the *TOS* target has parameters
*--set-tos*, *--and-tos*, *--or-tos* and *--xor-tos*; they become
*target.set_tos*, *target.and_tos*, *target.or_tos* and *target.xor_tos*,
respectively. The value of a parameter is always a string, if a parameter
does not take any value in the iptables extension, an empty string i.e. ""
should be used.
"""
def __init__(self, rule, name=None, target=None, revision=None, goto=None):
"""
*rule* is the Rule object this match belongs to; it can be changed
later via *set_rule()*. *name* is the name of the iptables target
extension (in upper case), *target* is the raw buffer of the target
structure if the caller has it. Either *name* or *target* must be
provided. *revision* is the revision number of the extension that
should be used; different revisions use different structures in C and
they usually only work with certain kernel versions. Python-iptables
by default will use the latest revision available.
If goto is True, then it converts '-j' to '-g'.
"""
if name is None and target is None:
raise ValueError("can't create target based on nothing")
if name is None:
name = target.u.user.name.decode()
self._name = name
self._rule = rule
self._orig_parse = None
self._orig_options = None
# NOTE:
# get_ip() returns the 'ip' structure that contains (1)the 'flags' field, and
# (2)the value for the GOTO flag.
# We *must* use get_ip() because the actual name of the field containing the
# structure apparently differs between implementation
ipstruct = rule.get_ip()
f_goto_attrs = [a for a in dir(ipstruct) if a.endswith('_F_GOTO')]
if len(f_goto_attrs) == 0:
raise RuntimeError('What kind of struct is this? It does not have "*_F_GOTO" constant!')
_F_GOTO = getattr(ipstruct, f_goto_attrs[0])
if target is not None or goto is None:
# We are 'decoding' existing Target
self._goto = bool(ipstruct.flags & _F_GOTO)
if goto is not None:
assert isinstance(goto, bool)
self._goto = goto
if goto:
ipstruct.flags |= _F_GOTO
else:
ipstruct.flags &= ~_F_GOTO
self._xt = xtables(rule.nfproto)
module = (self._is_standard_target() and
self._xt.find_target('') or
self._xt.find_target(name))
real_name = module and getattr(module[0], 'real_name', None) or None
if real_name:
# Alias name, look up real module.
self._name = real_name.decode()
self._orig_parse = getattr(module[0], 'x6_parse', None)
self._orig_options = getattr(module[0], 'x6_options', None)
module = self._xt.find_target(real_name)
if not module:
raise XTablesError("can't find target %s" % (name))
self._module = module[0]
self._module.tflags = 0
if revision is not None:
self._revision = revision
else:
self._revision = self._module.revision
self._create_buffer(target)
if self._is_standard_target():
self.standard_target = name
elif target:
self._check_alias()
def _check_alias(self):
name = self._get_alias_name()
if name is None:
return
alias_module = self._xt.find_target(name)
if alias_module is None:
return
self._alias_module = alias_module[0]
self._orig_parse = getattr(self._alias_module, 'x6_parse', None)
self._orig_options = getattr(self._alias_module, 'x6_options', None)
def __eq__(self, targ):
basesz = ct.sizeof(xt_entry_target)
if (self.target.u.target_size != targ.target.u.target_size or
self.target.u.user.name != targ.target.u.user.name or
self.target.u.user.revision != targ.target.u.user.revision):
return False
if (self.target.u.user.name == b"" or
self.target.u.user.name == b"standard" or
self.target.u.user.name == b"ACCEPT" or
self.target.u.user.name == b"DROP" or
self.target.u.user.name == b"RETURN" or
self.target.u.user.name == b"ERROR" or
self._is_standard_target()):
return True
if (self._target_buf[basesz:self.usersize] ==
targ._target_buf[basesz:targ.usersize]):
return True
return False
def __ne__(self, target):
return not self.__eq__(target)
def _create_buffer(self, target):
self._buffer = _Buffer(self.size)
self._target_buf = self._buffer.buffer
if target:
ct.memmove(self._target_buf, ct.byref(target), self.size)
self._update_pointers()
else:
self.reset()
def _is_standard_target(self):
for t in self._rule.tables:
if t.is_chain(self._name):
return True
return False
def _final_check(self):
self._xt.final_check_target(self._module)
def _parse(self, argv, inv, entry):
self._xt.parse_target(argv, inv, self._module, entry,
ct.cast(self._ptrptr, ct.POINTER(ct.c_void_p)),
self._orig_parse, self._orig_options)
self._target_buf = ct.cast(self._module.t, ct.POINTER(ct.c_ubyte))
if self._buffer.buffer != self._target_buf:
self._buffer.buffer = self._target_buf
self._update_pointers()
def _get_size(self):
return xt_align(self._module.size + ct.sizeof(xt_entry_target))
size = property(_get_size)
"""This is the full size of the underlying C structure."""
def _get_user_size(self):
return self._module.userspacesize + ct.sizeof(xt_entry_target)
usersize = property(_get_user_size)
"""This is the size of the part of the underlying C structure that is used
in userspace."""
def _get_standard_target(self):
t = self._ptr[0]
return t.u.user.name.decode()
def _set_standard_target(self, name):
t = self._ptr[0]
if isinstance(name, str):
name = name.encode()
t.u.user.name = name
if isinstance(name, bytes):
name = name.decode()
self._name = name
standard_target = property(_get_standard_target, _set_standard_target)
"""This attribute is used for standard targets. It can be set to
*ACCEPT*, *DROP*, *RETURN* or to a name of a chain the rule should jump
into."""
def _update_pointers(self):
self._ptr = ct.cast(self._target_buf, ct.POINTER(xt_entry_target))
self._ptrptr = ct.cast(ct.pointer(self._ptr),
ct.POINTER(ct.POINTER(xt_entry_target)))
self._module.t = self._ptr
self._update_name()
def _update_name(self):
m = self._ptr[0]
m.u.user.name = self.name.encode()
def reset(self):
"""Reset the target. Parameters are set to their default values, any
flags are cleared."""
ct.memset(self._target_buf, 0, self.size)
self._update_pointers()
t = self._ptr[0]
t.u.target_size = self.size
t.u.user.revision = self._revision
if self._module.init:
self._module.init(self._ptr)
self._module.tflags = 0
udata_size = getattr(self._module, 'udata_size', 0)
if udata_size > 0:
udata_buf = (ct.c_ubyte * udata_size)()
self._module.udata = ct.cast(ct.byref(udata_buf), ct.c_void_p)
def _get_target(self):
return self._ptr[0]
target = property(_get_target)
"""This is the C structure used by the extension."""
def _get_goto(self):
return self._goto
goto = property(_get_goto)
class Policy(object):
"""
If the end of a built-in chain is reached or a rule in a built-in chain
with target RETURN is matched, the target specified by the chain policy
determines the fate of the packet.
"""
ACCEPT = "ACCEPT"
"""If no matching rule has been found so far then accept the packet."""
DROP = "DROP"
"""If no matching rule has been found so far then drop the packet."""
QUEUE = "QUEUE"
"""If no matching rule has been found so far then queue the packet to
userspace."""
RETURN = "RETURN"
"""Return to calling chain."""
_cache = weakref.WeakValueDictionary()
def __new__(cls, name):
obj = Policy._cache.get(name, None)
if not obj:
obj = object.__new__(cls)
Policy._cache[name] = obj
return obj
def __init__(self, name):
self.name = name
def _a_to_i(addr):
return struct.unpack("I", addr)[0]
def _i_to_a(ip):
return struct.pack("I", int(ip.s_addr))
class Rule(object):
"""Rules are entries in chains.
Each rule has three parts:
* An entry with protocol family attributes like source and destination
address, transport protocol, etc. If the packet does not match the
attributes set here, then processing continues with the next rule or
the chain policy is applied at the end of the chain.
* Any number of matches. They are optional, and make it possible to
match for further packet attributes.
* One target. This determines what happens with the packet if it is
matched.
"""
protocols = {0: "all",
socket.IPPROTO_AH: "ah",
socket.IPPROTO_DSTOPTS: "dstopts",
socket.IPPROTO_EGP: "egp",
socket.IPPROTO_ESP: "esp",
socket.IPPROTO_FRAGMENT: "fragment",
socket.IPPROTO_GRE: "gre",
socket.IPPROTO_HOPOPTS: "hopopts",
socket.IPPROTO_ICMP: "icmp",
socket.IPPROTO_ICMPV6: "icmpv6",
socket.IPPROTO_IDP: "idp",
socket.IPPROTO_IGMP: "igmp",
socket.IPPROTO_IP: "ip",
socket.IPPROTO_IPIP: "ipip",
socket.IPPROTO_IPV6: "ipv6",
socket.IPPROTO_NONE: "none",
socket.IPPROTO_PIM: "pim",
socket.IPPROTO_PUP: "pup",
socket.IPPROTO_RAW: "raw",
socket.IPPROTO_ROUTING: "routing",
socket.IPPROTO_RSVP: "rsvp",
socket.IPPROTO_SCTP: "sctp",
socket.IPPROTO_TCP: "tcp",
socket.IPPROTO_TP: "tp",
socket.IPPROTO_UDP: "udp",
}
def __init__(self, entry=None, chain=None):
"""
*entry* is the ipt_entry buffer or None if the caller does not have
it. *chain* is the chain object this rule belongs to.
"""
self.nfproto = NFPROTO_IPV4
self._matches = []
self._target = None
self.chain = chain
self.rule = entry
def __eq__(self, rule):
if self._target != rule._target:
return False
if len(self._matches) != len(rule._matches):
return False
if set(rule._matches) != set([x for x in rule._matches if x in
self._matches]):
return False
if (self.src == rule.src and self.dst == rule.dst and
self.protocol == rule.protocol and
self.fragment == rule.fragment and
self.in_interface == rule.in_interface and
self.out_interface == rule.out_interface):
return True
return False
def __ne__(self, rule):
return not self.__eq__(rule)
def _get_tables(self):
return [Table(t) for t in Table.ALL if is_table_available(t)]
tables = property(_get_tables)
"""This is the list of tables for our protocol."""
def final_check(self):
"""Do a final check on the target and the matches."""
if self.target:
self.target.final_check()
for match in self.matches:
match.final_check()
def create_match(self, name, revision=None):
"""Create a *match*, and add it to the list of matches in this rule.
*name* is the name of the match extension, *revision* is the revision
to use."""
match = Match(self, name=name, revision=revision)
self.add_match(match)
return match
def create_target(self, name, revision=None, goto=False):
"""Create a new *target*, and set it as this rule's target. *name* is
the name of the target extension, *revision* is the revision to
use. *goto* determines if target uses '-j' (default) or '-g'."""
target = Target(self, name=name, revision=revision, goto=goto)
self.target = target
return target
def add_match(self, match):
"""Adds a match to the rule. One can add any number of matches."""
match.rule = self
self._matches.append(match)
def remove_match(self, match):
"""Removes *match* from the list of matches."""
self._matches.remove(match)
def get_ip(self):
return self.entry.ip
def _get_matches(self):
return self._matches[:] # return a copy
matches = property(_get_matches)
"""This is the list of matches held in this rule."""
def _get_target(self):
return self._target
def _set_target(self, target):
target.rule = self
self._target = target
target = property(_get_target, _set_target)
"""This is the target of the rule."""
def get_src(self):
src = ""
if self.entry.ip.invflags & ipt_ip.IPT_INV_SRCIP:
src = "".join([src, "!"])
paddr = _i_to_a(self.entry.ip.src)
try:
addr = socket.inet_ntop(socket.AF_INET, paddr)
except socket.error:
raise IPTCError("error in internal state: invalid address")
src = "".join([src, addr, "/"])
paddr = _i_to_a(self.entry.ip.smsk)
try:
netmask = socket.inet_ntop(socket.AF_INET, paddr)
except socket.error:
raise IPTCError("error in internal state: invalid netmask")
src = "".join([src, netmask])
return src
def set_src(self, src):
if src[0] == "!":
self.entry.ip.invflags |= ipt_ip.IPT_INV_SRCIP
src = src[1:]
else:
self.entry.ip.invflags &= (~ipt_ip.IPT_INV_SRCIP &
ipt_ip.IPT_INV_MASK)
slash = src.find("/")
if slash == -1:
addr = src
netm = "255.255.255.255"
else:
addr = src[:slash]
netm = src[slash + 1:]
try:
saddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr))
except socket.error:
raise ValueError("invalid address %s" % (addr))
if not netm.isdigit():
try:
nmask = _a_to_i(socket.inet_pton(socket.AF_INET, netm))
except socket.error:
raise ValueError("invalid netmask %s" % (netm))
else:
imask = int(netm)
if imask > 32 or imask < 0:
raise ValueError("invalid netmask %s" % (netm))
nmask = socket.htonl((2 ** imask - 1) << (32 - imask))
neta = in_addr()
neta.s_addr = ct.c_uint32(nmask)
self.entry.ip.smsk = neta
# Apply subnet mask to IP address
ina = in_addr()
ina.s_addr = ct.c_uint32(saddr & nmask)
self.entry.ip.src = ina
src = property(get_src, set_src)
"""This is the source network address with an optional network mask in
string form."""
def get_dst(self):
dst = ""
if self.entry.ip.invflags & ipt_ip.IPT_INV_DSTIP:
dst = "".join([dst, "!"])
paddr = _i_to_a(self.entry.ip.dst)
try:
addr = socket.inet_ntop(socket.AF_INET, paddr)
except socket.error:
raise IPTCError("error in internal state: invalid address")
dst = "".join([dst, addr, "/"])
paddr = _i_to_a(self.entry.ip.dmsk)
try:
netmask = socket.inet_ntop(socket.AF_INET, paddr)
except socket.error:
raise IPTCError("error in internal state: invalid netmask")
dst = "".join([dst, netmask])
return dst
def set_dst(self, dst):
if dst[0] == "!":
self.entry.ip.invflags |= ipt_ip.IPT_INV_DSTIP
dst = dst[1:]
else:
self.entry.ip.invflags &= (~ipt_ip.IPT_INV_DSTIP &
ipt_ip.IPT_INV_MASK)
slash = dst.find("/")
if slash == -1:
addr = dst
netm = "255.255.255.255"
else:
addr = dst[:slash]
netm = dst[slash + 1:]
try:
daddr = _a_to_i(socket.inet_pton(socket.AF_INET, addr))
except socket.error:
raise ValueError("invalid address %s" % (addr))
if not netm.isdigit():
try:
nmask = _a_to_i(socket.inet_pton(socket.AF_INET, netm))
except socket.error:
raise ValueError("invalid netmask %s" % (netm))
else:
imask = int(netm)
if imask > 32 or imask < 0:
raise ValueError("invalid netmask %s" % (netm))
nmask = socket.htonl((2 ** imask - 1) << (32 - imask))
neta = in_addr()
neta.s_addr = ct.c_uint32(nmask)
self.entry.ip.dmsk = neta
# Apply subnet mask to IP address
ina = in_addr()
ina.s_addr = ct.c_uint32(daddr & nmask)
self.entry.ip.dst = ina
dst = property(get_dst, set_dst)
"""This is the destination network address with an optional network mask
in string form."""
def get_in_interface(self):
intf = ""
if self.entry.ip.invflags & ipt_ip.IPT_INV_VIA_IN:
intf = "!"
iface = self.entry.ip.iniface.decode()
mask = self.entry.ip.iniface_mask
if len(mask) == 0:
return None
intf += iface
if len(iface) == len(mask):
intf += '+'
intf = intf[:_IFNAMSIZ]
return intf
def set_in_interface(self, intf):
if intf[0] == "!":
self.entry.ip.invflags |= ipt_ip.IPT_INV_VIA_IN
intf = intf[1:]
else:
self.entry.ip.invflags &= (~ipt_ip.IPT_INV_VIA_IN &
ipt_ip.IPT_INV_MASK)
if len(intf) >= _IFNAMSIZ:
raise ValueError("interface name %s too long" % (intf))
masklen = len(intf) + 1
if intf[len(intf) - 1] == "+":
intf = intf[:-1]
masklen -= 2
self.entry.ip.iniface = b"".join([intf.encode(),
b'\x00' * (_IFNAMSIZ - len(intf))])
self.entry.ip.iniface_mask = b"".join([b'\xff' * masklen,
b'\x00' * (_IFNAMSIZ -
masklen)])
in_interface = property(get_in_interface, set_in_interface)
"""This is the input network interface e.g. *eth0*. A wildcard match can
be achieved via *+* e.g. *ppp+* matches any *ppp* interface."""
def get_out_interface(self):
intf = ""
if self.entry.ip.invflags & ipt_ip.IPT_INV_VIA_OUT:
intf = "!"
iface = self.entry.ip.outiface.decode()
mask = self.entry.ip.outiface_mask
if len(mask) == 0:
return None
intf += iface
if len(iface) == len(mask):
intf += '+'
intf = intf[:_IFNAMSIZ]
return intf
def set_out_interface(self, intf):
if intf[0] == "!":
self.entry.ip.invflags |= ipt_ip.IPT_INV_VIA_OUT
intf = intf[1:]
else:
self.entry.ip.invflags &= (~ipt_ip.IPT_INV_VIA_OUT &
ipt_ip.IPT_INV_MASK)
if len(intf) >= _IFNAMSIZ:
raise ValueError("interface name %s too long" % (intf))
masklen = len(intf) + 1
if intf[len(intf) - 1] == "+":
intf = intf[:-1]
masklen -= 2
self.entry.ip.outiface = b"".join([intf.encode(),
b'\x00' * (_IFNAMSIZ - len(intf))])
self.entry.ip.outiface_mask = b"".join([b'\xff' * masklen,
b'\x00' * (_IFNAMSIZ -
masklen)])
out_interface = property(get_out_interface, set_out_interface)
"""This is the output network interface e.g. *eth0*. A wildcard match can
be achieved via *+* e.g. *ppp+* matches any *ppp* interface."""
def get_fragment(self):
frag = bool(self.entry.ip.flags & ipt_ip.IPT_F_FRAG)
if self.entry.ip.invflags & ipt_ip.IPT_INV_FRAG:
frag = not frag
return frag
def set_fragment(self, frag):
self.entry.ip.invflags &= ~ipt_ip.IPT_INV_FRAG & ipt_ip.IPT_INV_MASK
if frag:
self.entry.ip.flags |= ipt_ip.IPT_F_FRAG
else:
self.entry.ip.flags &= ~ipt_ip.IPT_F_FRAG
fragment = property(get_fragment, set_fragment)
"""This means that the rule refers to the second and further fragments of
fragmented packets. It can be *True* or *False*."""
def get_protocol(self):
if self.entry.ip.invflags & ipt_ip.IPT_INV_PROTO:
proto = "!"
else:
proto = ""
proto = "".join([proto, self.protocols.get(self.entry.ip.proto, str(self.entry.ip.proto))])
return proto
def set_protocol(self, proto):
proto = str(proto)
if proto[0] == "!":
self.entry.ip.invflags |= ipt_ip.IPT_INV_PROTO
proto = proto[1:]
else:
self.entry.ip.invflags &= (~ipt_ip.IPT_INV_PROTO &
ipt_ip.IPT_INV_MASK)
if proto.isdigit():
self.entry.ip.proto = int(proto)
return
for p in self.protocols.items():
if proto.lower() == p[1]:
self.entry.ip.proto = p[0]
return
raise ValueError("invalid protocol %s" % (proto))
protocol = property(get_protocol, set_protocol)
"""This is the transport layer protocol."""
def get_counters(self):
"""This method returns a tuple pair of the packet and byte counters of
the rule."""
counters = self.entry.counters
return counters.pcnt, counters.bcnt
def set_counters(self, counters):
"""This method set a tuple pair of the packet and byte counters of
the rule."""
self.entry.counters.pcnt = counters[0]
self.entry.counters.bcnt = counters[1]
counters = property(get_counters, set_counters)
"""This is the packet and byte counters of the rule."""
# override the following three for the IPv6 subclass
def _entry_size(self):
return xt_align(ct.sizeof(ipt_entry))
def _entry_type(self):
return ipt_entry
def _new_entry(self):
return ipt_entry()
def _get_rule(self):
if not self.entry or not self._target or not self._target.target:
return None
entrysz = self._entry_size()
matchsz = 0
for m in self._matches:
matchsz += xt_align(m.size)
targetsz = xt_align(self._target.size)
self.entry.target_offset = entrysz + matchsz
self.entry.next_offset = entrysz + matchsz + targetsz
# allocate array of full length (entry + matches + target)
buf = (ct.c_ubyte * (entrysz + matchsz + targetsz))()
# copy entry to buf
ptr = ct.cast(ct.pointer(self.entry), ct.POINTER(ct.c_ubyte))
buf[:entrysz] = ptr[:entrysz]
# copy matches to buf at offset of entrysz + match size
offset = 0
for m in self._matches:
sz = xt_align(m.size)
buf[entrysz + offset:entrysz + offset + sz] = m.match_buf[:sz]
offset += sz
# copy target to buf at offset of entrysz + matchsz
ptr = ct.cast(ct.pointer(self._target.target), ct.POINTER(ct.c_ubyte))
buf[entrysz + matchsz:entrysz + matchsz + targetsz] = ptr[:targetsz]
return buf
def _set_rule(self, entry):
if not entry:
self.entry = self._new_entry()
return
else:
self.entry = ct.cast(ct.pointer(entry),
ct.POINTER(self._entry_type()))[0]
if not isinstance(entry, self._entry_type()):
raise TypeError("Invalid rule type %s; expected %s" %
(entry, self._entry_type()))
entrysz = self._entry_size()
matchsz = entry.target_offset - entrysz
# targetsz = entry.next_offset - entry.target_offset
# iterate over matches to create blob
if matchsz:
off = 0
while entrysz + off < entry.target_offset:
match = ct.cast(ct.byref(entry.elems, off),
ct.POINTER(xt_entry_match))[0]
m = Match(self, match=match)
self.add_match(m)
off += m.size
target = ct.cast(ct.byref(entry, entry.target_offset),
ct.POINTER(xt_entry_target))[0]
self.target = Target(self, target=target)
jump = self.chain.table.get_target(entry) # standard target is special
if jump:
self._target.standard_target = jump
rule = property(_get_rule, _set_rule)
"""This is the raw rule buffer as iptables expects and returns it."""
def _get_mask(self):
if not self.entry:
return None
entrysz = self._entry_size()
matchsz = self.entry.target_offset - entrysz
targetsz = self.entry.next_offset - self.entry.target_offset
# allocate array for mask
mask = (ct.c_ubyte * (entrysz + matchsz + targetsz))()
# fill it out
pos = 0
for i in range(pos, pos + entrysz):
mask[i] = 0xff
pos += entrysz
for m in self._matches:
for i in range(pos, pos + m.usersize):
mask[i] = 0xff
pos += m.size
for i in range(pos, pos + self._target.usersize):
mask[i] = 0xff
return mask
mask = property(_get_mask)
"""This is the raw mask buffer as iptables uses it when removing rules."""
class Chain(object):
"""Rules are contained by chains.
*iptables* has built-in chains for every table, and users can also create
additional chains. Rule targets can specify to jump into another chain
and continue processing its rules, or return to the caller chain.
"""
_cache = weakref.WeakValueDictionary()
def __new__(cls, table, name):
table_name = type(table).__name__ + "." + table.name
obj = Chain._cache.get(table_name + "." + name, None)
if not obj:
obj = object.__new__(cls)
Chain._cache[table_name + "." + name] = obj
return obj
def __init__(self, table, name):
"""*table* is the table this chain belongs to, *name* is the chain's
name.
If a chain already exists with *name* in *table* it is returned.
"""
self.name = name
self.table = table
def delete(self):
"""Delete chain from its table."""
self.table.delete_chain(self.name)
def rename(self, new_name):
"""Rename chain to *new_name*."""
self.table.rename_chain(self.name, new_name)
def flush(self):
"""Flush all rules from the chain."""
self.table.flush_entries(self.name)
def get_counters(self):
"""This method returns a tuple pair of the packet and byte counters of
the chain."""
policy, counters = self.table.get_policy(self.name)
return counters
def zero_counters(self):
"""This method zeroes the packet and byte counters of the chain."""
self.table.zero_entries(self.name)
def set_policy(self, policy, counters=None):
"""Set the chain policy to *policy*, which should either be a string
or a Policy object. If *counters* is not *None*, the chain counters
are also adjusted. *Counters* is a list or tuple with two elements."""
if isinstance(policy, Policy):
policy = policy.name
self.table.set_policy(self.name, policy, counters)
def get_policy(self):
"""Returns the policy of the chain as a Policy object."""
policy, counters = self.table.get_policy(self.name)
return policy
def is_builtin(self):
"""Returns whether the chain is a built-in one."""
return self.table.builtin_chain(self.name)
def append_rule(self, rule):
"""Append *rule* to the end of the chain."""
rule.final_check()
rbuf = rule.rule
if not rbuf:
raise ValueError("invalid rule")
self.table.append_entry(self.name, rbuf)
def insert_rule(self, rule, position=0):
"""Insert *rule* as the first entry in the chain if *position* is 0 or
not specified, else *rule* is inserted in the given position."""
rule.final_check()
rbuf = rule.rule
if not rbuf:
raise ValueError("invalid rule")
self.table.insert_entry(self.name, rbuf, position)
def replace_rule(self, rule, position=0):
"""Replace existing rule in the chain at *position* with given
*rule*"""
rbuf = rule.rule
if not rbuf:
raise ValueError("invalid rule")
self.table.replace_entry(self.name, rbuf, position)
def delete_rule(self, rule):
"""Removes *rule* from the chain."""
rule.final_check()
rbuf = rule.rule
if not rbuf:
raise ValueError("invalid rule")
self.table.delete_entry(self.name, rbuf, rule.mask)
def get_target(self, rule):
"""This method returns the target of *rule* if it is a standard
target, or *None* if it is not."""
rbuf = rule.rule
if not rbuf:
raise ValueError("invalid rule")
return self.table.get_target(rbuf)
def _get_rules(self):
entries = []
entry = self.table.first_rule(self.name)
while entry:
entries.append(entry)
entry = self.table.next_rule(entry)
return [self.table.create_rule(e, self) for e in entries]
rules = property(_get_rules)
"""This is the list of rules currently in the chain.
The indexes of the Rule items produced from this list *should* correspond
to the IPTables --line-numbers value minus one. Keeping in mind that
iptables rules are 1-indexed whereas the Python list is 0-indexed
"""
def autocommit(fn):
def new(*args):
obj = args[0]
ret = fn(*args)
if obj.autocommit:
obj.refresh()
return ret
return new
class Table(object):
"""A table is the most basic building block in iptables.
There are four fixed tables:
* **Table.FILTER**, the filter table,
* **Table.NAT**, the NAT table,
* **Table.MANGLE**, the mangle table and
* **Table.RAW**, the raw table.
The four tables are cached, so if you create a new Table, and it has been
instantiated before, then it will be reused. To get access to e.g. the
filter table:
>>> table = iptc.Table(iptc.Table.FILTER)
The interface provided by *Table* is rather low-level, in fact it maps to
*libiptc* API calls one by one, and take low-level iptables structs as
parameters. It is encouraged to, when possible, use Chain, Rule, Match
and Target to achieve what is wanted instead, since they hide the
low-level details from the user.
"""
FILTER = "filter"
"""This is the constant for the filter table."""
MANGLE = "mangle"
"""This is the constant for the mangle table."""
RAW = "raw"
"""This is the constant for the raw table."""
NAT = "nat"
"""This is the constant for the nat table."""
SECURITY = "security"
"""This is the constant for the security table."""
ALL = ["filter", "mangle", "raw", "nat", "security"]
"""This is the constant for all tables."""
_cache = dict()
def __new__(cls, name, autocommit=None):
obj = Table._cache.get(name, None)
if not obj:
obj = object.__new__(cls)
if autocommit is None:
autocommit = True
obj._init(name, autocommit)
Table._cache[name] = obj
elif autocommit is not None:
obj.autocommit = autocommit
return obj
def _init(self, name, autocommit):
"""
*name* is the name of the table, if it already exists it is returned.
*autocommit* specifies that any iptables operation that changes a
rule, chain or table should be committed immediately.
"""
self.name = name
self.autocommit = autocommit
self._iptc = iptc() # to keep references to functions
self._handle = None
self.refresh()
def __del__(self):
self.close()
def close(self):
"""Close the underlying connection handle to iptables."""
if self._handle:
self._free()
def commit(self):
"""Commit any pending operation."""
rv = self._iptc.iptc_commit(self._handle)
if rv != 1:
raise IPTCError("can't commit: %s" % (self.strerror()))
def _free(self, ignore_exc=True):
if self._handle is None:
raise IPTCError("table is not initialized")
try:
if self.autocommit:
self.commit()
except IPTCError as e:
if not ignore_exc:
raise e
finally:
self._iptc.iptc_free(self._handle)
self._handle = None
def refresh(self):
"""Commit any pending operation and refresh the status of iptables."""
if self._handle:
self._free()
handle = self._iptc.iptc_init(self.name.encode())
if not handle:
raise IPTCError("can't initialize %s: %s" % (self.name,
self.strerror()))
self._handle = handle
def is_chain(self, chain):
"""Returns *True* if *chain* exists as a chain."""
if isinstance(chain, Chain):
chain = chain.name
if self._iptc.iptc_is_chain(chain.encode(), self._handle):
return True
else:
return False
def builtin_chain(self, chain):
"""Returns *True* if *chain* is a built-in chain."""
if isinstance(chain, Chain):
chain = chain.name
if self._iptc.iptc_builtin(chain.encode(), self._handle):
return True
else:
return False
def strerror(self):
"""Returns any pending iptables error from the previous operation."""
errno = _get_errno_loc()[0]
if errno == 0:
return "libiptc version error"
return self._iptc.iptc_strerror(errno)
@autocommit
def create_chain(self, chain):
"""Create a new chain *chain*."""
if isinstance(chain, Chain):
chain = chain.name
rv = self._iptc.iptc_create_chain(chain.encode(), self._handle)
if rv != 1:
raise IPTCError("can't create chain %s: %s" % (chain,
self.strerror()))
return Chain(self, chain)
@autocommit
def delete_chain(self, chain):
"""Delete chain *chain* from the table."""
if isinstance(chain, Chain):
chain = chain.name
rv = self._iptc.iptc_delete_chain(chain.encode(), self._handle)
if rv != 1:
raise IPTCError("can't delete chain %s: %s" % (chain,
self.strerror()))
@autocommit
def rename_chain(self, chain, new_name):
"""Rename chain *chain* to *new_name*."""
if isinstance(chain, Chain):
chain = chain.name
rv = self._iptc.iptc_rename_chain(chain.encode(), new_name.encode(),
self._handle)
if rv != 1:
raise IPTCError("can't rename chain %s: %s" % (chain,
self.strerror()))
@autocommit
def flush_entries(self, chain):
"""Flush all rules from *chain*."""
if isinstance(chain, Chain):
chain = chain.name
rv = self._iptc.iptc_flush_entries(chain.encode(), self._handle)
if rv != 1:
raise IPTCError("can't flush chain %s: %s" % (chain,
self.strerror()))
@autocommit
def zero_entries(self, chain):
"""Zero the packet and byte counters of *chain*."""
if isinstance(chain, Chain):
chain = chain.name
rv = self._iptc.iptc_zero_entries(chain.encode(), self._handle)
if rv != 1:
raise IPTCError("can't zero chain %s counters: %s" %
(chain, self.strerror()))
@autocommit
def set_policy(self, chain, policy, counters=None):
"""Set the policy of *chain* to *policy*, and also update chain
counters if *counters* is specified."""
if isinstance(chain, Chain):
chain = chain.name
if isinstance(policy, Policy):
policy = policy.name
if counters:
cntrs = xt_counters()
cntrs.pcnt = counters[0]
cntrs.bcnt = counters[1]
cntrs = ct.pointer(cntrs)
else:
cntrs = None
rv = self._iptc.iptc_set_policy(chain.encode(), policy.encode(),
cntrs, self._handle)
if rv != 1:
raise IPTCError("can't set policy %s on chain %s: %s" %
(policy, chain, self.strerror()))
@autocommit
def get_policy(self, chain):
"""Returns the policy of *chain* as a string."""
if isinstance(chain, Chain):
chain = chain.name
if not self.builtin_chain(chain):
return None, None
cntrs = xt_counters()
pol = self._iptc.iptc_get_policy(chain.encode(), ct.pointer(cntrs),
self._handle).decode()
if not pol:
raise IPTCError("can't get policy on chain %s: %s" %
(chain, self.strerror()))
return Policy(pol), (cntrs.pcnt, cntrs.bcnt)
@autocommit
def append_entry(self, chain, entry):
"""Appends rule *entry* to *chain*."""
rv = self._iptc.iptc_append_entry(chain.encode(),
ct.cast(entry, ct.c_void_p),
self._handle)
if rv != 1:
raise IPTCError("can't append entry to chain %s: %s" %
(chain, self.strerror()))
@autocommit
def insert_entry(self, chain, entry, position):
"""Inserts rule *entry* into *chain* at position *position*."""
rv = self._iptc.iptc_insert_entry(chain.encode(),
ct.cast(entry, ct.c_void_p),
position, self._handle)
if rv != 1:
raise IPTCError("can't insert entry into chain %s: %s" %
(chain, self.strerror()))
@autocommit
def replace_entry(self, chain, entry, position):
"""Replace existing rule in *chain* at *position* with given *rule*."""
rv = self._iptc.iptc_replace_entry(chain.encode(),
ct.cast(entry, ct.c_void_p),
position, self._handle)
if rv != 1:
raise IPTCError("can't replace entry in chain %s: %s" %
(chain, self.strerror()))
@autocommit
def delete_entry(self, chain, entry, mask):
"""Removes rule *entry* with *mask* from *chain*."""
rv = self._iptc.iptc_delete_entry(chain.encode(),
ct.cast(entry, ct.c_void_p),
mask, self._handle)
if rv != 1:
raise IPTCError("can't delete entry from chain %s: %s" %
(chain, self.strerror()))
def first_rule(self, chain):
"""Returns the first rule in *chain* or *None* if it is empty."""
rule = self._iptc.iptc_first_rule(chain.encode(), self._handle)
if rule:
return rule[0]
else:
return rule
def next_rule(self, prev_rule):
"""Returns the next rule after *prev_rule*."""
rule = self._iptc.iptc_next_rule(ct.pointer(prev_rule), self._handle)
if rule:
return rule[0]
else:
return rule
def get_target(self, entry):
"""Returns the standard target in *entry*."""
t = self._iptc.iptc_get_target(ct.pointer(entry), self._handle)
# t can be NULL if standard target has a "simple" verdict e.g. ACCEPT
return t
def _get_chains(self):
chains = []
chain = self._iptc.iptc_first_chain(self._handle)
while chain:
chain = chain.decode()
chains.append(Chain(self, chain))
chain = self._iptc.iptc_next_chain(self._handle)
return chains
chains = property(_get_chains)
"""List of chains in the table."""
def flush(self):
"""Flush and delete all non-builtin chains the table."""
for chain in self.chains:
chain.flush()
for chain in self.chains:
if not self.builtin_chain(chain):
self.delete_chain(chain)
def create_rule(self, entry=None, chain=None):
return Rule(entry, chain)
07070100000020000081A40000000000000000000000015EE6923E00005680000000000000000000000000000000000000002400000000python-iptables-1.0.0/iptc/ip6tc.py# -*- coding: utf-8 -*-
import ctypes as ct
import socket
from .ip4tc import Rule, Table, IPTCError
from .util import find_library, load_kernel
from .xtables import (XT_INV_PROTO, NFPROTO_IPV6, xt_align, xt_counters)
__all__ = ["Table6", "Rule6"]
try:
load_kernel("ip6_tables")
except:
pass
_IFNAMSIZ = 16
def is_table6_available(name):
try:
Table6(name)
return True
except IPTCError:
pass
return False
class in6_addr(ct.Structure):
"""This class is a representation of the C struct in6_addr."""
_fields_ = [("s6_addr", ct.c_uint8 * 16)] # IPv6 address
class ip6t_ip6(ct.Structure):
"""This class is a representation of the C struct ip6t_ip6."""
_fields_ = [("src", in6_addr), # Source and destination IP6 addr
("dst", in6_addr), # Mask for src and dest IP6 addr
("smsk", in6_addr),
("dmsk", in6_addr),
("iniface", ct.c_char * _IFNAMSIZ),
("outiface", ct.c_char * _IFNAMSIZ),
("iniface_mask", ct.c_char * _IFNAMSIZ),
("outiface_mask", ct.c_char * _IFNAMSIZ),
("proto", ct.c_uint16), # Upper protocol number
("tos", ct.c_uint8), # TOS, match iff flags & IP6T_F_TOS
("flags", ct.c_uint8), # Flags word
("invflags", ct.c_uint8)] # Inverse flags
# flags
IP6T_F_PROTO = 0x01 # Set if rule cares about upper protocols
IP6T_F_TOS = 0x02 # Match the TOS
IP6T_F_GOTO = 0x04 # Set if jump is a goto
IP6T_F_MASK = 0x07 # All possible flag bits mask
# invflags
IP6T_INV_VIA_IN = 0x01 # Invert the sense of IN IFACE
IP6T_INV_VIA_OUT = 0x02 # Invert the sense of OUT IFACE
IP6T_INV_TOS = 0x04 # Invert the sense of TOS
IP6T_INV_SRCIP = 0x08 # Invert the sense of SRC IP
IP6T_INV_DSTIP = 0x10 # Invert the sense of DST OP
IP6T_INV_FRAG = 0x20 # Invert the sense of FRAG
IP6T_INV_PROTO = XT_INV_PROTO
IP6T_INV_MASK = 0x7F # All possible flag bits mask
def __init__(self):
# default: full netmask
self.smsk.s6_addr = self.dmsk.s6_addr = 0xff * 16
class ip6t_entry(ct.Structure):
"""This class is a representation of the C struct ip6t_entry."""
_fields_ = [("ipv6", ip6t_ip6),
("nfcache", ct.c_uint), # fields that we care about
("target_offset", ct.c_uint16), # size of ip6t_entry + matches
("next_offset", ct.c_uint16), # size of e + matches + target
("comefrom", ct.c_uint), # back pointer
("counters", xt_counters), # packet and byte counters
("elems", ct.c_ubyte * 0)] # the matches then the target
_libiptc, _ = find_library("ip6tc", "iptc") # old iptables versions use iptc
class ip6tc(object):
"""This class contains all libip6tc API calls."""
iptc_init = _libiptc.ip6tc_init
iptc_init.restype = ct.POINTER(ct.c_int)
iptc_init.argstype = [ct.c_char_p]
iptc_free = _libiptc.ip6tc_free
iptc_free.restype = None
iptc_free.argstype = [ct.c_void_p]
iptc_commit = _libiptc.ip6tc_commit
iptc_commit.restype = ct.c_int
iptc_commit.argstype = [ct.c_void_p]
iptc_builtin = _libiptc.ip6tc_builtin
iptc_builtin.restype = ct.c_int
iptc_builtin.argstype = [ct.c_char_p, ct.c_void_p]
iptc_first_chain = _libiptc.ip6tc_first_chain
iptc_first_chain.restype = ct.c_char_p
iptc_first_chain.argstype = [ct.c_void_p]
iptc_next_chain = _libiptc.ip6tc_next_chain
iptc_next_chain.restype = ct.c_char_p
iptc_next_chain.argstype = [ct.c_void_p]
iptc_is_chain = _libiptc.ip6tc_is_chain
iptc_is_chain.restype = ct.c_int
iptc_is_chain.argstype = [ct.c_char_p, ct.c_void_p]
iptc_create_chain = _libiptc.ip6tc_create_chain
iptc_create_chain.restype = ct.c_int
iptc_create_chain.argstype = [ct.c_char_p, ct.c_void_p]
iptc_delete_chain = _libiptc.ip6tc_delete_chain
iptc_delete_chain.restype = ct.c_int
iptc_delete_chain.argstype = [ct.c_char_p, ct.c_void_p]
iptc_rename_chain = _libiptc.ip6tc_rename_chain
iptc_rename_chain.restype = ct.c_int
iptc_rename_chain.argstype = [ct.c_char_p, ct.c_char_p, ct.c_void_p]
iptc_flush_entries = _libiptc.ip6tc_flush_entries
iptc_flush_entries.restype = ct.c_int
iptc_flush_entries.argstype = [ct.c_char_p, ct.c_void_p]
iptc_zero_entries = _libiptc.ip6tc_zero_entries
iptc_zero_entries.restype = ct.c_int
iptc_zero_entries.argstype = [ct.c_char_p, ct.c_void_p]
# Get the policy of a given built-in chain
iptc_get_policy = _libiptc.ip6tc_get_policy
iptc_get_policy.restype = ct.c_char_p
iptc_get_policy.argstype = [ct.c_char_p, ct.POINTER(xt_counters),
ct.c_void_p]
# Set the policy of a chain
iptc_set_policy = _libiptc.ip6tc_set_policy
iptc_set_policy.restype = ct.c_int
iptc_set_policy.argstype = [ct.c_char_p, ct.c_char_p,
ct.POINTER(xt_counters), ct.c_void_p]
# Get first rule in the given chain: NULL for empty chain.
iptc_first_rule = _libiptc.ip6tc_first_rule
iptc_first_rule.restype = ct.POINTER(ip6t_entry)
iptc_first_rule.argstype = [ct.c_char_p, ct.c_void_p]
# Returns NULL when rules run out.
iptc_next_rule = _libiptc.ip6tc_next_rule
iptc_next_rule.restype = ct.POINTER(ip6t_entry)
iptc_next_rule.argstype = [ct.POINTER(ip6t_entry), ct.c_void_p]
# Returns a pointer to the target name of this entry.
iptc_get_target = _libiptc.ip6tc_get_target
iptc_get_target.restype = ct.c_char_p
iptc_get_target.argstype = [ct.POINTER(ip6t_entry), ct.c_void_p]
# These functions return TRUE for OK or 0 and set errno. If errno ==
# 0, it means there was a version error (ie. upgrade libiptc).
# Rule numbers start at 1 for the first rule.
# Insert the entry `e' in chain `chain' into position `rulenum'.
iptc_insert_entry = _libiptc.ip6tc_insert_entry
iptc_insert_entry.restype = ct.c_int
iptc_insert_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry),
ct.c_int, ct.c_void_p]
# Atomically replace rule `rulenum' in `chain' with `e'.
iptc_replace_entry = _libiptc.ip6tc_replace_entry
iptc_replace_entry.restype = ct.c_int
iptc_replace_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry),
ct.c_int, ct.c_void_p]
# Append entry `e' to chain `chain'. Equivalent to insert with
# rulenum = length of chain.
iptc_append_entry = _libiptc.ip6tc_append_entry
iptc_append_entry.restype = ct.c_int
iptc_append_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry),
ct.c_void_p]
# Delete the first rule in `chain' which matches `e', subject to
# matchmask (array of length == origfw)
iptc_delete_entry = _libiptc.ip6tc_delete_entry
iptc_delete_entry.restype = ct.c_int
iptc_delete_entry.argstype = [ct.c_char_p, ct.POINTER(ip6t_entry),
ct.POINTER(ct.c_ubyte), ct.c_void_p]
# Delete the rule in position `rulenum' in `chain'.
iptc_delete_num_entry = _libiptc.ip6tc_delete_num_entry
iptc_delete_num_entry.restype = ct.c_int
iptc_delete_num_entry.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p]
# Check the packet `e' on chain `chain'. Returns the verdict, or
# NULL and sets errno.
# iptc_check_packet = _libiptc.ip6tc_check_packet
# iptc_check_packet.restype = ct.c_char_p
# iptc_check_packet.argstype = [ct.c_char_p, ct.POINTER(ipt), ct.c_void_p]
# Get the number of references to this chain
iptc_get_references = _libiptc.ip6tc_get_references
iptc_get_references.restype = ct.c_int
iptc_get_references.argstype = [ct.c_uint, ct.c_char_p, ct.c_void_p]
# read packet and byte counters for a specific rule
iptc_read_counter = _libiptc.ip6tc_read_counter
iptc_read_counter.restype = ct.POINTER(xt_counters)
iptc_read_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p]
# zero packet and byte counters for a specific rule
iptc_zero_counter = _libiptc.ip6tc_zero_counter
iptc_zero_counter.restype = ct.c_int
iptc_zero_counter.argstype = [ct.c_char_p, ct.c_uint, ct.c_void_p]
# set packet and byte counters for a specific rule
iptc_set_counter = _libiptc.ip6tc_set_counter
iptc_set_counter.restype = ct.c_int
iptc_set_counter.argstype = [ct.c_char_p, ct.c_uint,
ct.POINTER(xt_counters), ct.c_void_p]
# Translates errno numbers into more human-readable form than strerror.
iptc_strerror = _libiptc.ip6tc_strerror
iptc_strerror.restype = ct.c_char_p
iptc_strerror.argstype = [ct.c_int]
class Rule6(Rule):
"""This is an IPv6 rule."""
def __init__(self, entry=None, chain=None):
self.nfproto = NFPROTO_IPV6
self._matches = []
self._target = None
self.chain = chain
self.rule = entry
def __eq__(self, rule):
if self._target != rule._target:
return False
if len(self._matches) != len(rule._matches):
return False
if set(rule._matches) != set([x for x in rule._matches
if x in self._matches]):
return False
if (self.src == rule.src and self.dst == rule.dst and
self.protocol == rule.protocol and
self.in_interface == rule.in_interface and
self.out_interface == rule.out_interface):
return True
return False
def save(self, name):
return self._save(name, self.entry.ipv6)
def _get_tables(self):
return [Table6(t) for t in Table6.ALL if is_table6_available(t)]
tables = property(_get_tables)
"""This is the list of tables for our protocol."""
def _count_bits(self, n):
bits = 0
while n > 0:
if n & 1:
bits += 1
n = n >> 1
return bits
def _create_mask(self, plen):
mask = []
for i in range(16):
if plen >= 8:
mask.append(0xff)
elif plen > 0:
mask.append(0xff>>(8-plen)<<(8-plen))
else:
mask.append(0x00)
plen -= 8
return mask
def get_src(self):
src = ""
if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_SRCIP:
src = "".join([src, "!"])
try:
addr = socket.inet_ntop(socket.AF_INET6,
self.entry.ipv6.src.s6_addr)
except socket.error:
raise IPTCError("error in internal state: invalid address")
src = "".join([src, addr, "/"])
# create prefix length from mask in smsk
plen = 0
for x in self.entry.ipv6.smsk.s6_addr:
if x == 0xff:
plen += 8
else:
plen += self._count_bits(x)
break
src = "".join([src, str(plen)])
return src
def _get_address_netmask(self, a):
slash = a.find("/")
if slash == -1:
addr = a
netm = "ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff"
else:
addr = a[:slash]
netm = a[slash + 1:]
return addr, netm
def _addr2in6addr(self, addr):
arr = ct.c_uint8 * 16
ina = in6_addr()
try:
ina.s6_addr = arr.from_buffer_copy(
socket.inet_pton(socket.AF_INET6, addr))
except socket.error:
raise ValueError("invalid address %s" % (addr))
return arr, ina
def set_src(self, src):
if src[0] == "!":
self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_SRCIP
src = src[1:]
else:
self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_SRCIP &
ip6t_ip6.IP6T_INV_MASK)
addr, netm = self._get_address_netmask(src)
arr, self.entry.ipv6.src = self._addr2in6addr(addr)
# if we got a numeric prefix length
if netm.isdigit():
plen = int(netm)
if plen < 0 or plen > 128:
raise ValueError("invalid prefix length %d" % (plen))
self.entry.ipv6.smsk.s6_addr = arr(*self._create_mask(plen))
return
# nope, we got an IPv6 address-style prefix
neta = in6_addr()
try:
neta.s6_addr = arr.from_buffer_copy(
socket.inet_pton(socket.AF_INET6, netm))
except socket.error:
raise ValueError("invalid netmask %s" % (netm))
self.entry.ipv6.smsk = neta
src = property(get_src, set_src)
"""This is the source network address with an optional prefix length in
string form."""
def get_dst(self):
dst = ""
if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_DSTIP:
dst = "".join([dst, "!"])
try:
addr = socket.inet_ntop(socket.AF_INET6,
self.entry.ipv6.dst.s6_addr)
except socket.error:
raise IPTCError("error in internal state: invalid address")
dst = "".join([dst, addr, "/"])
# create prefix length from mask in dmsk
plen = 0
for x in self.entry.ipv6.dmsk.s6_addr:
if x & 0xff == 0xff:
plen += 8
else:
plen += self._count_bits(x)
break
dst = "".join([dst, str(plen)])
return dst
def set_dst(self, dst):
if dst[0] == "!":
self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_DSTIP
dst = dst[1:]
else:
self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_DSTIP &
ip6t_ip6.IP6T_INV_MASK)
addr, netm = self._get_address_netmask(dst)
arr, self.entry.ipv6.dst = self._addr2in6addr(addr)
# if we got a numeric prefix length
if netm.isdigit():
plen = int(netm)
if plen < 0 or plen > 128:
raise ValueError("invalid prefix length %d" % (plen))
self.entry.ipv6.dmsk.s6_addr = arr(*self._create_mask(plen))
return
# nope, we got an IPv6 address-style prefix
neta = in6_addr()
try:
neta.s6_addr = arr.from_buffer_copy(
socket.inet_pton(socket.AF_INET6, netm))
except socket.error:
raise ValueError("invalid netmask %s" % (netm))
self.entry.ipv6.dmsk = neta
dst = property(get_dst, set_dst)
"""This is the destination network address with an optional network mask
in string form."""
def get_in_interface(self):
intf = ""
if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_VIA_IN:
intf = "".join(["!", intf])
iface = bytearray(_IFNAMSIZ)
iface[:len(self.entry.ipv6.iniface)] = self.entry.ipv6.iniface
mask = bytearray(_IFNAMSIZ)
mask[:len(self.entry.ipv6.iniface_mask)] = self.entry.ipv6.iniface_mask
if mask[0] == 0:
return None
for i in range(_IFNAMSIZ):
if mask[i] != 0:
intf = "".join([intf, chr(iface[i])])
else:
if iface[i - 1] != 0:
intf = "".join([intf, "+"])
else:
intf = intf[:-1]
break
return intf
def set_in_interface(self, intf):
if intf[0] == "!":
self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_VIA_IN
intf = intf[1:]
else:
self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_VIA_IN &
ip6t_ip6.IP6T_INV_MASK)
if len(intf) >= _IFNAMSIZ:
raise ValueError("interface name %s too long" % (intf))
masklen = len(intf) + 1
if intf[len(intf) - 1] == "+":
intf = intf[:-1]
masklen -= 2
self.entry.ipv6.iniface = ("".join(
[intf, '\x00' * (_IFNAMSIZ - len(intf))])).encode()
self.entry.ipv6.iniface_mask = ("".join(
['\x01' * masklen, '\x00' * (_IFNAMSIZ - masklen)])).encode()
in_interface = property(get_in_interface, set_in_interface)
"""This is the input network interface e.g. *eth0*. A wildcard match can
be achieved via *+* e.g. *ppp+* matches any *ppp* interface."""
def get_out_interface(self):
intf = ""
if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_VIA_OUT:
intf = "".join(["!", intf])
iface = bytearray(_IFNAMSIZ)
iface[:len(self.entry.ipv6.outiface)] = self.entry.ipv6.outiface
mask = bytearray(_IFNAMSIZ)
mask[:len(self.entry.ipv6.outiface_mask)] = \
self.entry.ipv6.outiface_mask
if mask[0] == 0:
return None
for i in range(_IFNAMSIZ):
if mask[i] != 0:
intf = "".join([intf, chr(iface[i])])
else:
if iface[i - 1] != 0:
intf = "".join([intf, "+"])
else:
intf = intf[:-1]
break
return intf
def set_out_interface(self, intf):
if intf[0] == "!":
self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_VIA_OUT
intf = intf[1:]
else:
self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_VIA_OUT &
ip6t_ip6.IP6T_INV_MASK)
if len(intf) >= _IFNAMSIZ:
raise ValueError("interface name %s too long" % (intf))
masklen = len(intf) + 1
if intf[len(intf) - 1] == "+":
intf = intf[:-1]
masklen -= 2
self.entry.ipv6.outiface = ("".join(
[intf, '\x00' * (_IFNAMSIZ - len(intf))])).encode()
self.entry.ipv6.outiface_mask = ("".join(
['\x01' * masklen, '\x00' * (_IFNAMSIZ - masklen)])).encode()
out_interface = property(get_out_interface, set_out_interface)
"""This is the output network interface e.g. *eth0*. A wildcard match can
be achieved via *+* e.g. *ppp+* matches any *ppp* interface."""
def get_protocol(self):
if self.entry.ipv6.invflags & ip6t_ip6.IP6T_INV_PROTO:
proto = "!"
else:
proto = ""
proto = "".join([proto, self.protocols.get(self.entry.ipv6.proto, str(self.entry.ipv6.proto))])
return proto
def set_protocol(self, proto):
proto = str(proto)
if proto[0] == "!":
self.entry.ipv6.invflags |= ip6t_ip6.IP6T_INV_PROTO
self.entry.ipv6.flags &= (~ip6t_ip6.IP6T_F_PROTO &
ip6t_ip6.IP6T_F_MASK)
proto = proto[1:]
else:
self.entry.ipv6.invflags &= (~ip6t_ip6.IP6T_INV_PROTO &
ip6t_ip6.IP6T_INV_MASK)
self.entry.ipv6.flags |= ip6t_ip6.IP6T_F_PROTO
if proto.isdigit():
self.entry.ipv6.proto = int(proto)
return
for p in self.protocols.items():
if proto.lower() == p[1]:
self.entry.ipv6.proto = p[0]
return
raise ValueError("invalid protocol %s" % (proto))
protocol = property(get_protocol, set_protocol)
"""This is the transport layer protocol."""
def get_ip(self):
return self.entry.ipv6
def _entry_size(self):
return xt_align(ct.sizeof(ip6t_entry))
def _entry_type(self):
return ip6t_entry
def _new_entry(self):
return ip6t_entry()
class Table6(Table):
"""The IPv6 version of Table.
There are four fixed tables:
* **Table.FILTER**, the filter table,
* **Table.MANGLE**, the mangle table,
* **Table.RAW**, the raw table and
* **Table.SECURITY**, the security table.
The four tables are cached, so if you create a new Table, and it has been
instantiated before, then it will be reused. To get access to e.g. the
filter table:
>>> import iptc
>>> table = iptc.Table6(iptc.Table6.FILTER)
The interface provided by *Table* is rather low-level, in fact it maps to
*libiptc* API calls one by one, and take low-level iptables structs as
parameters. It is encouraged to, when possible, use Chain, Rule, Match
and Target to achieve what is wanted instead, since they hide the
low-level details from the user.
"""
FILTER = "filter"
"""This is the constant for the filter table."""
MANGLE = "mangle"
"""This is the constant for the mangle table."""
RAW = "raw"
"""This is the constant for the raw table."""
NAT = "nat"
"""This is the constant for the nat table."""
SECURITY = "security"
"""This is the constant for the security table."""
ALL = ["filter", "mangle", "raw", "nat", "security"]
"""This is the constant for all tables."""
_cache = dict()
def __new__(cls, name, autocommit=None):
obj = Table6._cache.get(name, None)
if not obj:
obj = object.__new__(cls)
if autocommit is None:
autocommit = True
obj._init(name, autocommit)
Table6._cache[name] = obj
elif autocommit is not None:
obj.autocommit = autocommit
return obj
def _init(self, name, autocommit):
"""
Here *name* is the name of the table to instantiate, if it has already
been instantiated the existing cached object is returned.
*Autocommit* specifies that any low-level iptables operation should be
committed immediately, making changes visible in the kernel.
"""
self._iptc = ip6tc() # to keep references to functions
self._handle = None
self.name = name
self.autocommit = autocommit
self.refresh()
def create_rule(self, entry=None, chain=None):
return Rule6(entry, chain)
07070100000021000081A40000000000000000000000015EE6923E00000DCD000000000000000000000000000000000000002300000000python-iptables-1.0.0/iptc/util.pyimport re
import os
import sys
import ctypes
import ctypes.util
from distutils.sysconfig import get_python_lib
from itertools import product
from subprocess import Popen, PIPE
from sys import version_info
try:
from sysconfig import get_config_var
except ImportError:
def get_config_var(name):
if name == 'SO':
return '.so'
raise Exception('Not implemented')
def _insert_ko(modprobe, modname):
p = Popen([modprobe, modname], stderr=PIPE)
p.wait()
return (p.returncode, p.stderr.read(1024))
def _load_ko(modname):
# only try to load modules on kernels that support them
if not os.path.exists("/proc/modules"):
return (0, None)
# this will return the full path for the modprobe binary
modprobe = "/sbin/modprobe"
try:
proc = open("/proc/sys/kernel/modprobe")
modprobe = proc.read(1024)
except:
pass
if modprobe[-1] == '\n':
modprobe = modprobe[:-1]
return _insert_ko(modprobe, modname)
# Load a kernel module. If it is already loaded modprobe will just return 0.
def load_kernel(name, exc_if_failed=False):
rc, err = _load_ko(name)
if rc:
if not err:
err = "Failed to load the %s kernel module." % (name)
if err[-1] == "\n":
err = err[:-1]
if exc_if_failed:
raise Exception(err)
def _do_find_library(name):
if '/' in name:
try:
return ctypes.CDLL(name, mode=ctypes.RTLD_GLOBAL)
except Exception:
return None
p = ctypes.util.find_library(name)
if p:
lib = ctypes.CDLL(p, mode=ctypes.RTLD_GLOBAL)
return lib
# probably we have been installed in a virtualenv
try:
lib = ctypes.CDLL(os.path.join(get_python_lib(), name),
mode=ctypes.RTLD_GLOBAL)
return lib
except:
pass
for p in sys.path:
try:
lib = ctypes.CDLL(os.path.join(p, name), mode=ctypes.RTLD_GLOBAL)
return lib
except:
pass
return None
def _find_library(*names):
exts = []
if version_info >= (3, 3):
exts.append(get_config_var("EXT_SUFFIX"))
else:
exts.append(get_config_var('SO'))
if version_info >= (3, 5):
exts.append('.so')
for name in names:
libnames = [name, "lib" + name]
for ext in exts:
libnames += [name + ext, "lib" + name + ext]
libdir = os.environ.get('IPTABLES_LIBDIR', None)
if libdir is not None:
libdirs = libdir.split(':')
libs = [os.path.join(*p) for p in product(libdirs, libnames)]
libs.extend(libnames)
else:
libs = libnames
for n in libs:
while os.path.islink(n):
n = os.path.realpath(n)
lib = _do_find_library(n)
if lib is not None:
yield lib
def find_library(*names):
for lib in _find_library(*names):
major = 0
m = re.search(r"\.so\.(\d+).?", lib._name)
if m:
major = int(m.group(1))
return lib, major
return None, None
def find_libc():
lib = ctypes.util.find_library('c')
if lib is not None:
return ctypes.CDLL(lib, mode=ctypes.RTLD_GLOBAL)
libnames = ['libc.so.6', 'libc.so.0', 'libc.so']
for name in libnames:
try:
lib = ctypes.CDLL(name, mode=ctypes.RTLD_GLOBAL)
return lib
except:
pass
return None
07070100000022000081A40000000000000000000000015EE6923E0000004F000000000000000000000000000000000000002600000000python-iptables-1.0.0/iptc/version.py# -*- coding: utf-8 -*-
__pkgname__ = "python-iptables"
__version__ = "1.0.0"
07070100000023000081A40000000000000000000000015EE6923E0000CABE000000000000000000000000000000000000002600000000python-iptables-1.0.0/iptc/xtables.py# -*- coding: utf-8 -*-
import ctypes as ct
import os
import sys
import weakref
from . import version
from .util import find_library, find_libc
from .errors import *
XT_INV_PROTO = 0x40 # invert the sense of PROTO
NFPROTO_UNSPEC = 0
NFPROTO_IPV4 = 2
NFPROTO_ARP = 3
NFPROTO_BRIDGE = 7
NFPROTO_IPV6 = 10
NFPROTO_DECNET = 12
NFPROTO_NUMPROTO = 6
XTF_DONT_LOAD = 0x00
XTF_DURING_LOAD = 0x01
XTF_TRY_LOAD = 0x02
XTF_LOAD_MUST_SUCCEED = 0x03
XTOPT_INVERT = 1 << 0
XTOPT_MAND = 1 << 1
XTOPT_MULTI = 1 << 2
XTOPT_PUT = 1 << 3
XTOPT_NBO = 1 << 4
_WORDLEN = ct.sizeof(ct.c_long)
_XT_FUNCTION_MAXNAMELEN = 30
def xt_align(sz):
return ((sz + (_WORDLEN - 1)) & ~(_WORDLEN - 1))
class xt_counters(ct.Structure):
"""This class is a representation of the C struct xt_counters."""
_fields_ = [("pcnt", ct.c_uint64), # packet counter
("bcnt", ct.c_uint64)] # byte counter
class xt_entry_target_user(ct.Structure):
_fields_ = [("target_size", ct.c_uint16),
("name", ct.c_char * (_XT_FUNCTION_MAXNAMELEN - 1)),
("revision", ct.c_uint8)]
class xt_entry_target_u(ct.Union):
_fields_ = [("user", xt_entry_target_user),
("target_size", ct.c_uint16)] # full length
class xt_entry_target(ct.Structure):
"""This class is a representation of the C struct xt_entry_target."""
_fields_ = [("u", xt_entry_target_u),
("data", ct.c_ubyte * 0)]
class xt_entry_match_user(ct.Structure):
_fields_ = [("match_size", ct.c_uint16),
("name", ct.c_char * (_XT_FUNCTION_MAXNAMELEN - 1)),
("revision", ct.c_uint8)]
class xt_entry_match_u(ct.Union):
_fields_ = [("user", xt_entry_match_user),
("match_size", ct.c_uint16)] # full length
class xt_entry_match(ct.Structure):
"""This class is a representation of the C struct xt_entry_match."""
_fields_ = [("u", xt_entry_match_u),
("data", ct.c_ubyte * 0)]
class xtables_globals(ct.Structure):
_fields_ = [("option_offset", ct.c_uint),
("program_name", ct.c_char_p),
("program_version", ct.c_char_p),
("orig_opts", ct.c_void_p),
("opts", ct.c_void_p),
("exit_err", ct.CFUNCTYPE(None, ct.c_int, ct.c_char_p)),
("compat_rev", ct.CFUNCTYPE(ct.c_int, ct.c_char_p, ct.c_uint8,
ct.c_int))]
# struct used by getopt()
class option(ct.Structure):
_fields_ = [("name", ct.c_char_p),
("has_arg", ct.c_int),
("flag", ct.POINTER(ct.c_int)),
("val", ct.c_int)]
class xt_option_entry(ct.Structure):
_fields_ = [("name", ct.c_char_p),
("type", ct.c_int),
("id", ct.c_uint),
("excl", ct.c_uint),
("also", ct.c_uint),
("flags", ct.c_uint),
("ptroff", ct.c_uint),
("size", ct.c_size_t),
("min", ct.c_uint),
("max", ct.c_uint)]
class _U1(ct.Union):
_fields_ = [("match", ct.POINTER(ct.POINTER(xt_entry_match))),
("target", ct.POINTER(ct.POINTER(xt_entry_target)))]
class nf_inet_addr(ct.Union):
_fields_ = [("all", ct.c_uint32 * 4),
("ip", ct.c_uint32),
("ip6", ct.c_uint32 * 4),
("in", ct.c_uint32),
("in6", ct.c_uint8 * 16)]
class _S1(ct.Structure):
_fields_ = [("haddr", nf_inet_addr),
("hmask", nf_inet_addr),
("hlen", ct.c_uint8)]
class _S2(ct.Structure):
_fields_ = [("tos_value", ct.c_uint8),
("tos_mask", ct.c_uint8)]
class _S3(ct.Structure):
_fields_ = [("mark", ct.c_uint32),
("mask", ct.c_uint32)]
class _U_val(ct.Union):
_anonymous_ = ("s1", "s2", "s3")
_fields_ = [("u8", ct.c_uint8),
("u8_range", ct.c_uint8 * 2),
("syslog_level", ct.c_uint8),
("protocol", ct.c_uint8),
("u16", ct.c_uint16),
("u16_range", ct.c_uint16 * 2),
("port", ct.c_uint16),
("port_range", ct.c_uint16 * 2),
("u32", ct.c_uint32),
("u32_range", ct.c_uint32 * 2),
("u64", ct.c_uint64),
("u64_range", ct.c_uint64 * 2),
("double", ct.c_double),
("s1", _S1),
("s2", _S2),
("s3", _S3),
("ethermac", ct.c_uint8 * 6)]
class xt_option_call(ct.Structure):
_anonymous_ = ("u",)
_fields_ = [("arg", ct.c_char_p),
("ext_name", ct.c_char_p),
("entry", ct.POINTER(xt_option_entry)),
("data", ct.c_void_p),
("xflags", ct.c_uint),
("invert", ct.c_uint8),
("nvals", ct.c_uint8),
("val", _U_val),
("u", _U1),
("xt_entry", ct.c_void_p),
("udata", ct.c_void_p)]
class xt_fcheck_call(ct.Structure):
_fields_ = [("ext_name", ct.c_char_p),
("data", ct.c_void_p),
("udata", ct.c_void_p),
("xflags", ct.c_uint)]
class _xtables_match_v1(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert, unsigned int
# *flags, const void *entry, struct xt_entry_match **match)
("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_match)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the match iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match), ct.c_int)),
# saves the match info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
("option_offset", ct.c_uint),
("m", ct.POINTER(xt_entry_match)),
("mflags", ct.c_uint),
("loaded", ct.c_uint)]
x6_parse = None
x6_fcheck = None
x6_options = None
_xtables_match_v2 = _xtables_match_v1
_xtables_match_v4 = _xtables_match_v1
_xtables_match_v5 = _xtables_match_v1
class _xtables_match_v6(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert, unsigned int
# *flags, const void *entry, struct xt_entry_match **match)
("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_match)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the match iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match), ct.c_int)),
# saves the match info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
("option_offset", ct.c_uint),
("m", ct.POINTER(xt_entry_match)),
("mflags", ct.c_uint),
("loaded", ct.c_uint)]
class _xtables_match_v7(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert, unsigned int
# *flags, const void *entry, struct xt_entry_match **match)
("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_match)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the match iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match), ct.c_int)),
# saves the match info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("m", ct.POINTER(xt_entry_match)),
("mflags", ct.c_uint),
("loaded", ct.c_uint)]
class _xtables_match_v9(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("real_name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert, unsigned int
# *flags, const void *entry, struct xt_entry_match **match)
("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_match)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the match iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match), ct.c_int)),
# saves the match info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("m", ct.POINTER(xt_entry_match)),
("mflags", ct.c_uint),
("loaded", ct.c_uint)]
class _xtables_match_v10(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("real_name", ct.c_char_p),
("revision", ct.c_uint8),
("ext_flags", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert, unsigned int
# *flags, const void *entry, struct xt_entry_match **match)
("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_match)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the match iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match), ct.c_int)),
# saves the match info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match))),
# Print match name or alias
("alias", ct.CFUNCTYPE(ct.c_char_p,
ct.POINTER(xt_entry_match))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("m", ct.POINTER(xt_entry_match)),
("mflags", ct.c_uint),
("loaded", ct.c_uint)]
_xtables_match_v11 = _xtables_match_v10
class _xtables_match_v12(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("real_name", ct.c_char_p),
("revision", ct.c_uint8),
("ext_flags", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_match))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert, unsigned int
# *flags, const void *entry, struct xt_entry_match **match)
("parse", ct.CFUNCTYPE(ct.c_int, ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_match)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the match iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match), ct.c_int)),
# saves the match info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_match))),
# Print match name or alias
("alias", ct.CFUNCTYPE(ct.c_char_p,
ct.POINTER(xt_entry_match))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
('xt_xlate', ct.c_int),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("m", ct.POINTER(xt_entry_match)),
("mflags", ct.c_uint),
("loaded", ct.c_uint)]
class xtables_match(ct.Union):
_fields_ = [("v1", _xtables_match_v1),
("v2", _xtables_match_v2),
# Apparently v3 was skipped
("v4", _xtables_match_v4),
("v5", _xtables_match_v5),
("v6", _xtables_match_v6),
("v7", _xtables_match_v7),
# Apparently v8 was skipped
("v9", _xtables_match_v9),
("v10", _xtables_match_v10),
("v11", _xtables_match_v11),
("v12", _xtables_match_v12)]
class _xtables_target_v1(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert,
# unsigned int *flags, const void *entry,
# struct xt_entry_target **target)
("parse", ct.CFUNCTYPE(ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_target)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the target iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target), ct.c_int)),
# saves the target info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
("option_offset", ct.c_uint),
("t", ct.POINTER(xt_entry_target)),
("tflags", ct.c_uint),
("used", ct.c_uint),
("loaded", ct.c_uint)]
x6_parse = None
x6_fcheck = None
x6_options = None
_xtables_target_v2 = _xtables_target_v1
_xtables_target_v4 = _xtables_target_v1
_xtables_target_v5 = _xtables_target_v1
class _xtables_target_v6(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert,
# unsigned int *flags, const void *entry,
# struct xt_entry_target **target)
("parse", ct.CFUNCTYPE(ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_target)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the target iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target), ct.c_int)),
# saves the target info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
("option_offset", ct.c_uint),
("t", ct.POINTER(xt_entry_target)),
("tflags", ct.c_uint),
("used", ct.c_uint),
("loaded", ct.c_uint)]
class _xtables_target_v7(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert,
# unsigned int *flags, const void *entry,
# struct xt_entry_target **target)
("parse", ct.CFUNCTYPE(ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_target)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the target iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target), ct.c_int)),
# saves the target info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("t", ct.POINTER(xt_entry_target)),
("tflags", ct.c_uint),
("used", ct.c_uint),
("loaded", ct.c_uint)]
class _xtables_target_v9(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("real_name", ct.c_char_p),
("revision", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert,
# unsigned int *flags, const void *entry,
# struct xt_entry_target **target)
("parse", ct.CFUNCTYPE(ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_target)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the target iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target), ct.c_int)),
# saves the target info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("t", ct.POINTER(xt_entry_target)),
("tflags", ct.c_uint),
("used", ct.c_uint),
("loaded", ct.c_uint)]
class _xtables_target_v10(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("real_name", ct.c_char_p),
("revision", ct.c_uint8),
("ext_flags", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert,
# unsigned int *flags, const void *entry,
# struct xt_entry_target **target)
("parse", ct.CFUNCTYPE(ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_target)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the target iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target), ct.c_int)),
# saves the target info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target))),
# Print target name or alias
("alias", ct.CFUNCTYPE(ct.c_char_p,
ct.POINTER(xt_entry_target))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("t", ct.POINTER(xt_entry_target)),
("tflags", ct.c_uint),
("used", ct.c_uint),
("loaded", ct.c_uint)]
_xtables_target_v11 = _xtables_target_v10
class _xtables_target_v12(ct.Structure):
_fields_ = [("version", ct.c_char_p),
("next", ct.c_void_p),
("name", ct.c_char_p),
("real_name", ct.c_char_p),
("revision", ct.c_uint8),
("ext_flags", ct.c_uint8),
("family", ct.c_uint16),
("size", ct.c_size_t),
("userspacesize", ct.c_size_t),
("help", ct.CFUNCTYPE(None)),
("init", ct.CFUNCTYPE(None, ct.POINTER(xt_entry_target))),
# fourth parameter entry is struct ipt_entry for example
# int (*parse)(int c, char **argv, int invert,
# unsigned int *flags, const void *entry,
# struct xt_entry_target **target)
("parse", ct.CFUNCTYPE(ct.c_int,
ct.POINTER(ct.c_char_p), ct.c_int,
ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.POINTER(
xt_entry_target)))),
("final_check", ct.CFUNCTYPE(None, ct.c_uint)),
# prints out the target iff non-NULL: put space at end
# first parameter ip is struct ipt_ip * for example
("print", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target), ct.c_int)),
# saves the target info in parsable form to stdout.
# first parameter ip is struct ipt_ip * for example
("save", ct.CFUNCTYPE(None, ct.c_void_p,
ct.POINTER(xt_entry_target))),
# Print target name or alias
("alias", ct.CFUNCTYPE(ct.c_char_p,
ct.POINTER(xt_entry_target))),
# pointer to list of extra command-line options
("extra_opts", ct.POINTER(option)),
# introduced with the new iptables API
("x6_parse", ct.CFUNCTYPE(None, ct.POINTER(xt_option_call))),
("x6_fcheck", ct.CFUNCTYPE(None, ct.POINTER(xt_fcheck_call))),
("x6_options", ct.POINTER(xt_option_entry)),
('xt_xlate', ct.c_int),
# size of per-extension instance extra "global" scratch space
("udata_size", ct.c_size_t),
# ignore these men behind the curtain:
("udata", ct.c_void_p),
("option_offset", ct.c_uint),
("t", ct.POINTER(xt_entry_target)),
("tflags", ct.c_uint),
("used", ct.c_uint),
("loaded", ct.c_uint)]
class xtables_target(ct.Union):
_fields_ = [("v1", _xtables_target_v1),
("v2", _xtables_target_v2),
# Apparently v3 was skipped
("v4", _xtables_target_v4),
("v5", _xtables_target_v5),
("v6", _xtables_target_v6),
("v7", _xtables_target_v7),
# Apparently v8 was skipped
("v9", _xtables_target_v9),
("v10", _xtables_target_v10),
("v11", _xtables_target_v11),
("v12", _xtables_target_v12)]
_libc = find_libc()
_optind = ct.c_long.in_dll(_libc, "optind")
_optarg = ct.c_char_p.in_dll(_libc, "optarg")
xtables_version = os.getenv("PYTHON_IPTABLES_XTABLES_VERSION")
if xtables_version:
_searchlib = "libxtables.so.%s" % (xtables_version,)
else:
_searchlib = "xtables"
_lib_xtables, xtables_version = find_library(_searchlib)
_xtables_libdir = os.getenv("XTABLES_LIBDIR")
if _xtables_libdir is None:
import re
ldconfig_path_regex = re.compile('^(/.*):$')
import subprocess
ldconfig = subprocess.Popen(
('/sbin/ldconfig', '-N', '-v'),
stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True
)
ldconfig_out, ldconfig_err = ldconfig.communicate()
if ldconfig.returncode != 0:
raise XTablesError("ldconfig failed, please set XTABLES_LIBDIR")
for ldconfig_out_line in ldconfig_out.splitlines():
ldconfig_path_regex_match = ldconfig_path_regex.match(ldconfig_out_line)
if ldconfig_path_regex_match is not None:
ldconfig_path = os.path.join(ldconfig_path_regex_match.group(1), 'xtables')
if os.path.isdir(ldconfig_path):
_xtables_libdir = ldconfig_path
break
if _xtables_libdir is None:
raise XTablesError("can't find directory with extensions; "
"please set XTABLES_LIBDIR")
_lib_xtwrapper, _ = find_library("xtwrapper")
_throw = _lib_xtwrapper.throw_exception
_wrap_parse = _lib_xtwrapper.wrap_parse
_wrap_parse.restype = ct.c_int
_wrap_parse.argtypes = [ct.c_void_p, ct.c_int, ct.POINTER(ct.c_char_p),
ct.c_int, ct.POINTER(ct.c_uint), ct.c_void_p,
ct.POINTER(ct.c_void_p)]
_wrap_save = _lib_xtwrapper.wrap_save
_wrap_save.restype = ct.c_void_p
_wrap_save.argtypes = [ct.c_void_p, ct.c_void_p, ct.c_void_p]
_wrap_uintfn = _lib_xtwrapper.wrap_uintfn
_wrap_uintfn.restype = ct.c_int
_wrap_uintfn.argtypes = [ct.c_void_p, ct.c_uint]
_wrap_voidfn = _lib_xtwrapper.wrap_voidfn
_wrap_voidfn.restype = ct.c_int
_wrap_voidfn.argtypes = [ct.c_void_p]
_wrap_x6fn = _lib_xtwrapper.wrap_x6fn
_wrap_x6fn.restype = ct.c_int
_wrap_x6fn.argtypes = [ct.c_void_p, ct.c_void_p]
_kernel_version = ct.c_int.in_dll(_lib_xtwrapper, 'kernel_version')
_get_kernel_version = _lib_xtwrapper.get_kernel_version
_get_kernel_version()
def _xt_exit(status, *args):
_throw(status)
_EXIT_FN = ct.CFUNCTYPE(None, ct.c_int, ct.c_char_p)
_xt_exit = _EXIT_FN(_xt_exit)
def set_nfproto(fn):
def new(*args):
xtobj = args[0]
xtables._xtables_set_nfproto(xtobj.proto)
return fn(*args)
return new
_xt_globals = xtables_globals()
_xt_globals.option_offset = 0
_xt_globals.program_name = version.__pkgname__.encode()
_xt_globals.program_version = version.__version__.encode()
_xt_globals.orig_opts = None
_xt_globals.opts = None
_xt_globals.exit_err = _xt_exit
if xtables_version > 10:
_COMPAT_REV_FN = ct.CFUNCTYPE(ct.c_int, ct.c_char_p, ct.c_uint8, ct.c_int)
_xt_compat_rev = _COMPAT_REV_FN(_lib_xtables.xtables_compatible_revision)
_xt_globals.compat_rev = _xt_compat_rev
_loaded_exts = {}
class xtables(object):
_xtables_init_all = _lib_xtables.xtables_init_all
_xtables_init_all.restype = ct.c_int
_xtables_init_all.argtypes = [ct.POINTER(xtables_globals), ct.c_uint8]
_xtables_find_match = _lib_xtables.xtables_find_match
_xtables_find_match.restype = ct.POINTER(xtables_match)
_xtables_find_match.argtypes = [ct.c_char_p, ct.c_int, ct.c_void_p]
_xtables_find_target = _lib_xtables.xtables_find_target
_xtables_find_target.restype = ct.POINTER(xtables_target)
_xtables_find_target.argtypes = [ct.c_char_p, ct.c_int]
_xtables_set_nfproto = _lib_xtables.xtables_set_nfproto
_xtables_set_nfproto.restype = None
_xtables_set_nfproto.argtypes = [ct.c_uint8]
_xtables_xt_params = ct.c_void_p.in_dll(_lib_xtables, "xt_params")
_xtables_matches = (ct.c_void_p.in_dll(_lib_xtables, "xtables_matches"))
try:
_xtables_pending_matches = (ct.c_void_p.in_dll(
_lib_xtables, "xtables_pending_matches"))
except ValueError:
_xtables_pending_matches = ct.POINTER(None)
_xtables_targets = (ct.c_void_p.in_dll(_lib_xtables, "xtables_targets"))
try:
_xtables_pending_targets = (ct.c_void_p.in_dll(
_lib_xtables, "xtables_pending_targets"))
except ValueError:
_xtables_pending_targets = ct.POINTER(None)
_real_name = {
'state': 'conntrack',
'NOTRACK': 'CT'
}
_cache = weakref.WeakValueDictionary()
def __new__(cls, proto):
obj = xtables._cache.get(proto, None)
if not obj:
obj = object.__new__(cls)
xtables._cache[proto] = obj
obj._xtinit(proto)
return obj
def _xtinit(self, proto, no_alias_check=False):
self.proto = proto
self.no_alias_check = no_alias_check
thismodule = sys.modules[__name__]
matchname = "_xtables_match_v%d" % (xtables_version)
targetname = "_xtables_target_v%d" % (xtables_version)
try:
self._match_struct = getattr(thismodule, matchname)
self._target_struct = getattr(thismodule, targetname)
except:
raise XTablesError("unknown xtables version %d" %
(xtables_version))
rv = xtables._xtables_init_all(ct.pointer(_xt_globals), proto)
if rv:
raise XTablesError("xtables_init_all() failed: %d" % (rv))
def __repr__(self):
return "XTables for protocol %d" % (self.proto)
def _check_extname(self, name):
if name in [b"", b"ACCEPT", b"DROP", b"QUEUE", b"RETURN"]:
name = b"standard"
return name
def _loaded(self, name, ext):
_loaded_exts['%s___%s' % (self.proto, name)] = ext
def _get_initfn_from_lib(self, name, lib):
try:
initfn = getattr(lib, "libxt_%s_init" % (name))
except AttributeError:
prefix = self._get_prefix()
initfn = getattr(lib, "%s%s_init" % (prefix, name), None)
if initfn is None and not self.no_alias_check:
if name in xtables._real_name:
name = xtables._real_name[name]
initfn = self._get_initfn_from_lib(name, lib)
return initfn
def _try_extinit(self, name, lib):
try:
if type(lib) != ct.CDLL:
lib = ct.CDLL(lib)
fn = self._get_initfn_from_lib(name, lib)
if fn:
_wrap_voidfn(fn)
return True
except:
pass
return False
def _get_prefix(self):
if self.proto == NFPROTO_IPV4:
return "libipt_"
elif self.proto == NFPROTO_IPV6:
return "libip6t_"
else:
raise XTablesError("Unknown protocol %d" % (self.proto))
def _try_register(self, name):
if isinstance(name, bytes):
name = name.decode()
if self._try_extinit(name, _lib_xtables):
return
prefix = self._get_prefix()
libs = [os.path.join(_xtables_libdir, "libxt_" + name + ".so"),
os.path.join(_xtables_libdir, prefix + name + ".so")]
for lib in libs:
if self._try_extinit(name, lib):
return
def _get_loaded_ext(self, name):
ext = _loaded_exts.get('%s___%s' % (self.proto, name), None)
return ext
@set_nfproto
def find_match(self, name):
if isinstance(name, str):
name = name.encode()
name = self._check_extname(name)
ext = self._get_loaded_ext(name)
if ext is not None:
return ext
match = xtables._xtables_find_match(name, XTF_TRY_LOAD, None)
if not match:
self._try_register(name)
match = xtables._xtables_find_match(name, XTF_DONT_LOAD, None)
if not match:
return match
m = ct.cast(match, ct.POINTER(self._match_struct))
self._loaded(m[0].name, m)
return m
@set_nfproto
def find_target(self, name):
if isinstance(name, str):
name = name.encode()
name = self._check_extname(name)
ext = self._get_loaded_ext(name)
if ext is not None:
return ext
target = xtables._xtables_find_target(name, XTF_TRY_LOAD)
if not target:
self._try_register(name)
target = xtables._xtables_find_target(name, XTF_DONT_LOAD)
if not target:
return target
t = ct.cast(target, ct.POINTER(self._target_struct))
self._loaded(t[0].name, t)
return t
@set_nfproto
def save(self, module, ip, ptr):
_wrap_save(module.save, ct.cast(ct.pointer(ip), ct.c_void_p), ptr)
def _option_lookup(self, entries, name):
for e in entries:
if not e.name:
break
if e.name == name:
return e
return None
def _parse(self, module, argv, inv, flags, entry, ptr):
for opt in module.extra_opts:
if opt.name == argv[0]:
rv = _wrap_parse(module.parse, opt.val, argv, inv, flags,
entry, ptr)
if rv != 1:
raise ValueError("invalid value %s" % (argv[1]))
return
elif not opt.name:
break
raise AttributeError("invalid parameter %s" % (argv[0]))
# Dispatch arguments to the appropriate parse function, based upon the
# extension's choice of API.
@set_nfproto
def parse_target(self, argv, invert, t, fw, ptr, x6_parse, x6_options):
_optarg.value = len(argv) > 1 and argv[1] or None
_optind.value = len(argv) - 1
try:
# new API?
if x6_options is None:
x6_options = t.x6_options
if x6_parse is None:
x6_parse = t.x6_parse
except AttributeError:
pass
if x6_options and x6_parse:
# new API
entry = self._option_lookup(x6_options, argv[0])
if not entry:
raise XTablesError("%s: no such parameter %s" % (t.name,
argv[0]))
cb = xt_option_call()
cb.entry = ct.pointer(entry)
cb.arg = _optarg
cb.invert = ct.c_uint8(invert.value)
cb.ext_name = t.name
cb.data = ct.cast(t.t[0].data, ct.c_void_p)
cb.xflags = 0
cb.target = ct.pointer(t.t)
cb.xt_entry = ct.cast(fw, ct.c_void_p)
cb.udata = t.udata
rv = _wrap_x6fn(x6_parse, ct.pointer(cb))
if rv != 0:
raise XTablesError("%s: parameter error %d (%s)" % (t.name, rv,
argv[1]))
t.tflags |= cb.xflags
return
# old API
flags = ct.pointer(ct.c_uint(0))
self._parse(t, argv, invert, flags, fw, ptr)
t.tflags |= flags[0]
# Dispatch arguments to the appropriate parse function, based upon the
# extension's choice of API.
@set_nfproto
def parse_match(self, argv, invert, m, fw, ptr, x6_parse, x6_options):
_optarg.value = len(argv) > 1 and argv[1] or None
_optind.value = len(argv) - 1
try:
# new API?
if x6_options is None:
x6_options = m.x6_options
if x6_parse is None:
x6_parse = m.x6_parse
except AttributeError:
pass
if x6_options and x6_parse:
# new API
entry = self._option_lookup(x6_options, argv[0])
if not entry:
raise XTablesError("%s: no such parameter %s" % (m.name,
argv[0]))
cb = xt_option_call()
cb.entry = ct.pointer(entry)
cb.arg = _optarg
cb.invert = ct.c_uint8(invert.value)
cb.ext_name = m.name
cb.data = ct.cast(m.m[0].data, ct.c_void_p)
cb.xflags = 0
cb.match = ct.pointer(m.m)
cb.xt_entry = ct.cast(fw, ct.c_void_p)
cb.udata = m.udata
rv = _wrap_x6fn(x6_parse, ct.pointer(cb))
if rv != 0:
raise XTablesError("%s: parameter '%s' error %d" % (
m.name, len(argv) > 1 and argv[1] or "", rv))
m.mflags |= cb.xflags
return
# old API
flags = ct.pointer(ct.c_uint(0))
self._parse(m, argv, invert, flags, fw, ptr)
m.mflags |= flags[0]
# Check that all option constraints have been met. This effectively
# replaces ->final_check of the older API.
def _options_fcheck(self, name, xflags, table):
for entry in table:
if entry.name is None:
break
if entry.flags & XTOPT_MAND and not xflags & (1 << entry.id):
raise XTablesError("%s: --%s must be specified" % (name,
entry.name))
if not xflags & (1 << entry.id):
continue
# XXX: check for conflicting options
def _fcheck_target_old(self, target):
# old API
if not target.final_check:
return
rv = _wrap_uintfn(target.final_check, target.tflags)
if rv:
raise XTablesError("%s.final_check() has failed" %
(target.name))
def _fcheck_target_new(self, target):
# new API
cb = xt_fcheck_call()
cb.ext_name = target.name
cb.data = ct.cast(target.t[0].data, ct.c_void_p)
cb.xflags = target.tflags
cb.udata = target.udata
rv = _wrap_x6fn(target.x6_fcheck, ct.pointer(cb))
if rv:
raise XTablesError("%s.x6_fcheck has failed" % (target.name))
if target.x6_options:
self._options_fcheck(target.name, target.tflags,
target.x6_options)
# Dispatch arguments to the appropriate final_check function, based upon
# the extension's choice of API.
@set_nfproto
def final_check_target(self, target):
x6_fcheck = None
try:
# new API?
x6_fcheck = target.x6_fcheck
except AttributeError:
# old API
pass
if x6_fcheck:
self._fcheck_target_new(target)
else:
self._fcheck_target_old(target)
def _fcheck_match_old(self, match):
# old API
if not match.final_check:
return
rv = _wrap_uintfn(match.final_check, match.mflags)
if rv:
raise XTablesError("%s.final_check() has failed" %
(match.name))
def _fcheck_match_new(self, match):
# new API
cb = xt_fcheck_call()
cb.ext_name = match.name
cb.data = ct.cast(match.m[0].data, ct.c_void_p)
cb.xflags = match.mflags
cb.udata = match.udata
rv = _wrap_x6fn(match.x6_fcheck, ct.pointer(cb))
if rv:
raise XTablesError("%s.x6_fcheck has failed" % (match.name))
if match.x6_options:
self._options_fcheck(match.name, match.mflags,
match.x6_options)
# Dispatch arguments to the appropriate final_check function, based upon
# the extension's choice of API.
@set_nfproto
def final_check_match(self, match):
x6_fcheck = None
try:
# new API?
x6_fcheck = match.x6_fcheck
except AttributeError:
# old API
pass
if x6_fcheck:
self._fcheck_match_new(match)
else:
self._fcheck_match_old(match)
07070100000024000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000002300000000python-iptables-1.0.0/libxtwrapper07070100000025000081A40000000000000000000000015EE6923E000006B1000000000000000000000000000000000000002D00000000python-iptables-1.0.0/libxtwrapper/wrapper.c#include <errno.h>
#include <setjmp.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/utsname.h>
int kernel_version;
#define LINUX_VERSION(x,y,z) (0x10000*(x) + 0x100*(y) + z)
void get_kernel_version(void)
{
static struct utsname uts;
int x = 0, y = 0, z = 0;
if (uname(&uts) == -1) {
fprintf(stderr, "Unable to retrieve kernel version.\n");
return;
}
sscanf(uts.release, "%d.%d.%d", &x, &y, &z);
kernel_version = LINUX_VERSION(x, y, z);
}
static jmp_buf env;
void throw_exception(int err)
{
longjmp(env, err);
}
int wrap_parse(int (*fn)(int, char **, int, unsigned int *, void *, void **),
int i, char **argv, int inv, unsigned int *flags, void *p,
void **mptr)
{
int rv = -1;
int err;
if ((err = setjmp(env)) == 0) {
rv = fn(i, argv, inv, flags, p, mptr);
} else {
errno = err;
}
return rv;
}
struct ipt_ip;
void wrap_save(int (*fn)(const void *, const void *),
const void *ip, const void *m)
{
fn(ip, m);
fprintf(stdout, "\n"); /* make sure something is written to stdout */
fflush(stdout);
}
int wrap_x6fn(void (*fn)(void *), void *data)
{
int err;
if ((err = setjmp(env)) == 0) {
fn(data);
} else {
errno = err;
return -err;
}
return 0;
}
int wrap_uintfn(void (*fn)(unsigned int), unsigned int data)
{
int err;
if ((err = setjmp(env)) == 0) {
fn(data);
} else {
errno = err;
return -err;
}
return 0;
}
int wrap_voidfn(void (*fn)(void))
{
int err;
if ((err = setjmp(env)) == 0) {
fn();
} else {
errno = err;
return -err;
}
return 0;
}
07070100000026000081A40000000000000000000000015EE6923E000006AE000000000000000000000000000000000000001F00000000python-iptables-1.0.0/setup.py#!/usr/bin/env python
"""python-iptables setup script"""
from setuptools import setup, Extension
#from distutils.core import setup, Extension
# make pyflakes happy
__pkgname__ = None
__version__ = None
exec(open("iptc/version.py").read())
# build/install python-iptables
setup(
name=__pkgname__,
version=__version__,
description="Python bindings for iptables",
author="Vilmos Nebehaj",
author_email="v.nebehaj@gmail.com",
url="https://github.com/ldx/python-iptables",
packages=["iptc"],
package_dir={"iptc": "iptc"},
ext_modules=[Extension("libxtwrapper",
["libxtwrapper/wrapper.c"])],
test_suite="tests",
classifiers=[
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
"Intended Audience :: Developers",
"Intended Audience :: Information Technology",
"Intended Audience :: System Administrators",
"Intended Audience :: Telecommunications Industry",
"License :: OSI Approved :: Apache Software License",
"Natural Language :: English",
"Operating System :: POSIX :: Linux",
"Programming Language :: Python",
"Topic :: Software Development :: Libraries",
"Topic :: System :: Networking :: Firewalls",
"Topic :: System :: Systems Administration",
"Programming Language :: Python :: 2",
"Programming Language :: Python :: 2.7",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.3",
"Programming Language :: Python :: 3.4",
"Programming Language :: Python :: Implementation :: CPython",
],
license="Apache License, Version 2.0",
)
07070100000027000041ED0000000000000000000000015EE6923E00000000000000000000000000000000000000000000001C00000000python-iptables-1.0.0/tests07070100000028000081A40000000000000000000000015EE6923E00000000000000000000000000000000000000000000002800000000python-iptables-1.0.0/tests/__init__.py07070100000029000081ED0000000000000000000000015EE6923E000089DF000000000000000000000000000000000000002900000000python-iptables-1.0.0/tests/test_iptc.py# -*- coding: utf-8 -*-
import unittest
import iptc
is_table_available = iptc.is_table_available
is_table6_available = iptc.is_table6_available
def _check_chains(testcase, *chains):
for chain in chains:
if chain is None:
continue
for ch in [c for c in chains if c != chain and c is not None]:
testcase.assertNotEquals(id(chain), id(ch))
class TestTable6(unittest.TestCase):
def setUp(self):
self.autocommit = iptc.Table(iptc.Table.FILTER).autocommit
def tearDown(self):
iptc.Table(iptc.Table.FILTER, self.autocommit)
def test_table6(self):
filt = None
if is_table6_available(iptc.Table6.FILTER):
filt = iptc.Table6("filter")
self.assertEquals(id(filt), id(iptc.Table6(iptc.Table6.FILTER)))
security = None
if is_table6_available(iptc.Table6.SECURITY):
security = iptc.Table6("security")
self.assertEquals(id(security),
id(iptc.Table6(iptc.Table6.SECURITY)))
mangle = None
if is_table6_available(iptc.Table6.MANGLE):
mangle = iptc.Table6("mangle")
self.assertEquals(id(mangle), id(iptc.Table6(iptc.Table6.MANGLE)))
raw = None
if is_table6_available(iptc.Table6.RAW):
raw = iptc.Table6("raw")
self.assertEquals(id(raw), id(iptc.Table6(iptc.Table6.RAW)))
_check_chains(self, filt, security, mangle, raw)
def test_table6_autocommit(self):
table = iptc.Table(iptc.Table.FILTER, False)
self.assertEquals(table.autocommit, False)
rule = iptc.Rule()
rule.src = "1.2.3.4"
rule.dst = "2.3.4.5"
rule.protocol = "tcp"
self.assertEquals(table.autocommit, False)
rule.create_target('DROP')
self.assertEquals(table.autocommit, False)
match = rule.create_match('tcp')
match.dport = "80:90"
self.assertEquals(table.autocommit, False)
class TestTable(unittest.TestCase):
def setUp(self):
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_chain")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
iptc.Table(iptc.Table.FILTER).flush()
def test_table(self):
filt = None
if is_table_available(iptc.Table.FILTER):
filt = iptc.Table("filter")
self.assertEquals(id(filt), id(iptc.Table(iptc.Table.FILTER)))
nat = None
if is_table_available(iptc.Table.NAT):
nat = iptc.Table("nat")
self.assertEquals(id(nat), id(iptc.Table(iptc.Table.NAT)))
mangle = None
if is_table_available(iptc.Table.MANGLE):
mangle = iptc.Table("mangle")
self.assertEquals(id(mangle), id(iptc.Table(iptc.Table.MANGLE)))
raw = None
if is_table_available(iptc.Table.RAW):
raw = iptc.Table("raw")
self.assertEquals(id(raw), id(iptc.Table(iptc.Table.RAW)))
_check_chains(self, filt, nat, mangle, raw)
def test_refresh(self):
rule = iptc.Rule()
match = iptc.Match(rule, "tcp")
match.dport = "1234"
rule.add_match(match)
try:
self.chain.insert_rule(rule)
iptc.Table(iptc.Table.FILTER).delete_chain(self.chain)
self.fail("inserted invalid rule")
except:
pass
iptc.Table(iptc.Table.FILTER).refresh()
target = iptc.Target(rule, "ACCEPT")
rule.target = target
rule.protocol = "tcp"
self.chain.insert_rule(rule)
self.chain.delete_rule(rule)
def test_flush_user_chains(self):
chain1 = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_flush_chain1")
chain2 = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_flush_chain2")
iptc.Table(iptc.Table.FILTER).create_chain(chain1)
iptc.Table(iptc.Table.FILTER).create_chain(chain2)
rule = iptc.Rule()
rule.target = iptc.Target(rule, chain2.name)
chain1.append_rule(rule)
rule = iptc.Rule()
rule.target = iptc.Target(rule, chain1.name)
chain2.append_rule(rule)
self.assertEquals(len(chain1.rules), 1)
self.assertEquals(len(chain2.rules), 1)
filter_table = iptc.Table(iptc.Table.FILTER)
filter_table.flush()
self.assertTrue(not filter_table.is_chain(chain1.name))
self.assertTrue(not filter_table.is_chain(chain2.name))
def test_flush_builtin(self):
filter_table = iptc.Table(iptc.Table.FILTER)
output_rule_count = len(iptc.Chain(filter_table, "OUTPUT").rules)
rule = iptc.Rule()
rule.target = iptc.Target(rule, "ACCEPT")
iptc.Chain(filter_table, "OUTPUT").append_rule(rule)
self.assertEquals(len(iptc.Chain(filter_table, "OUTPUT").rules),
output_rule_count + 1)
filter_table.flush()
self.assertEquals(len(iptc.Chain(filter_table, "OUTPUT").rules), 0)
class TestChain(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_chain(self):
table = iptc.Table(iptc.Table.FILTER)
input1 = iptc.Chain(table, "INPUT")
input2 = iptc.Chain(table, "INPUT")
forward1 = iptc.Chain(table, "FORWARD")
forward2 = iptc.Chain(table, "FORWARD")
output1 = iptc.Chain(table, "OUTPUT")
output2 = iptc.Chain(table, "OUTPUT")
self.assertEquals(id(input1), id(input2))
self.assertEquals(id(output1), id(output2))
self.assertEquals(id(forward1), id(forward2))
self.assertNotEquals(id(input1), id(output1))
self.assertNotEquals(id(input1), id(output2))
self.assertNotEquals(id(input1), id(forward1))
self.assertNotEquals(id(input1), id(forward2))
self.assertNotEquals(id(input2), id(output1))
self.assertNotEquals(id(input2), id(output2))
self.assertNotEquals(id(input2), id(forward1))
self.assertNotEquals(id(input2), id(forward2))
self.assertNotEquals(id(output1), id(forward1))
self.assertNotEquals(id(output1), id(forward2))
self.assertNotEquals(id(output2), id(forward1))
self.assertNotEquals(id(output2), id(forward2))
def test_is_chain(self):
if is_table_available(iptc.Table.FILTER):
table = iptc.Table(iptc.Table.FILTER)
self.assertTrue(table.is_chain("INPUT"))
self.assertTrue(table.is_chain("FORWARD"))
self.assertTrue(table.is_chain("OUTPUT"))
if is_table_available(iptc.Table.NAT):
table = iptc.Table(iptc.Table.NAT)
self.assertTrue(table.is_chain("PREROUTING"))
self.assertTrue(table.is_chain("POSTROUTING"))
self.assertTrue(table.is_chain("OUTPUT"))
if is_table_available(iptc.Table.MANGLE):
table = iptc.Table(iptc.Table.MANGLE)
self.assertTrue(table.is_chain("INPUT"))
self.assertTrue(table.is_chain("PREROUTING"))
self.assertTrue(table.is_chain("FORWARD"))
self.assertTrue(table.is_chain("POSTROUTING"))
self.assertTrue(table.is_chain("OUTPUT"))
if is_table_available(iptc.Table.RAW):
table = iptc.Table(iptc.Table.RAW)
self.assertTrue(table.is_chain("PREROUTING"))
self.assertTrue(table.is_chain("OUTPUT"))
def test_builtin_chain(self):
if is_table_available(iptc.Table.FILTER):
table = iptc.Table(iptc.Table.FILTER)
self.assertTrue(table.builtin_chain("INPUT"))
self.assertTrue(table.builtin_chain("FORWARD"))
self.assertTrue(table.builtin_chain("OUTPUT"))
if is_table_available(iptc.Table.NAT):
table = iptc.Table(iptc.Table.NAT)
self.assertTrue(table.builtin_chain("PREROUTING"))
self.assertTrue(table.builtin_chain("POSTROUTING"))
self.assertTrue(table.builtin_chain("OUTPUT"))
if is_table_available(iptc.Table.MANGLE):
table = iptc.Table(iptc.Table.MANGLE)
self.assertTrue(table.builtin_chain("INPUT"))
self.assertTrue(table.builtin_chain("PREROUTING"))
self.assertTrue(table.builtin_chain("FORWARD"))
self.assertTrue(table.builtin_chain("POSTROUTING"))
self.assertTrue(table.builtin_chain("OUTPUT"))
if is_table_available(iptc.Table.RAW):
table = iptc.Table(iptc.Table.RAW)
self.assertTrue(table.builtin_chain("PREROUTING"))
self.assertTrue(table.builtin_chain("OUTPUT"))
def test_chain_filter(self):
if is_table_available(iptc.Table.FILTER):
table = iptc.Table(iptc.Table.FILTER)
table.autocommit = True
self.assertTrue(len(table.chains) >= 3)
for chain in table.chains:
if chain.name not in ["INPUT", "FORWARD", "OUTPUT"]:
self.failIf(chain.is_builtin())
def test_chain_nat(self):
if is_table_available(iptc.Table.NAT):
table = iptc.Table(iptc.Table.NAT)
table.autocommit = True
self.assertTrue(len(table.chains) >= 3)
for chain in table.chains:
if chain.name not in ["INPUT", "PREROUTING", "POSTROUTING",
"OUTPUT"]:
self.failIf(chain.is_builtin())
def test_chain_mangle(self):
if is_table_available(iptc.Table.MANGLE):
table = iptc.Table(iptc.Table.MANGLE)
table.autocommit = True
self.assertTrue(len(table.chains) >= 5)
for chain in table.chains:
if chain.name not in ["PREROUTING", "POSTROUTING", "INPUT",
"FORWARD", "OUTPUT"]:
self.failIf(chain.is_builtin())
def test_chain_raw(self):
if is_table_available(iptc.Table.RAW):
table = iptc.Table(iptc.Table.RAW)
table.autocommit = True
self.assertTrue(len(table.chains) >= 2)
for chain in table.chains:
if chain.name not in ["PREROUTING", "OUTPUT"]:
self.failIf(chain.is_builtin())
def _get_tables(self):
tables = []
if is_table_available(iptc.Table.FILTER):
tables.append(iptc.Table(iptc.Table.FILTER))
if is_table_available(iptc.Table.NAT):
tables.append(iptc.Table(iptc.Table.NAT))
if is_table_available(iptc.Table.MANGLE):
tables.append(iptc.Table(iptc.Table.MANGLE))
if is_table_available(iptc.Table.RAW):
tables.append(iptc.Table(iptc.Table.RAW))
return tables
def test_chain_counters(self):
tables = self._get_tables()
for chain in (chain for table in tables for chain in table.chains):
counters = chain.get_counters()
fails = 0
for x in range(3): # try 3 times
chain.zero_counters()
counters = chain.get_counters()
if counters: # only built-in chains
if counters[0] != 0 or counters[1] != 0:
fails += 1
self.failIf(fails > 2)
def test_create_chain(self):
chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_chain")
iptc.Table(iptc.Table.FILTER).create_chain(chain)
self.failUnless(iptc.Table(iptc.Table.FILTER).is_chain(chain))
iptc.Table(iptc.Table.FILTER).delete_chain(chain)
self.failIf(iptc.Table(iptc.Table.FILTER).is_chain(chain))
def test_filter_policy(self):
if is_table_available(iptc.Table.FILTER):
table = iptc.Table(iptc.Table.FILTER)
input_chain = iptc.Chain(table, "INPUT")
pol = iptc.Policy("DROP")
input_chain.set_policy(pol)
rpol = input_chain.get_policy()
self.assertEquals(id(pol), id(rpol))
pol = iptc.Policy("ACCEPT")
input_chain.set_policy(pol)
rpol = input_chain.get_policy()
self.assertEquals(id(pol), id(rpol))
pol = iptc.Policy("RETURN")
try:
input_chain.set_policy(pol)
except iptc.IPTCError:
pass
else:
self.fail("managed to set INPUT policy to RETURN")
def test_nat_policy(self):
if is_table_available(iptc.Table.NAT):
table = iptc.Table(iptc.Table.NAT)
prerouting_chain = iptc.Chain(table, "PREROUTING")
pol = iptc.Policy("DROP")
prerouting_chain.set_policy(pol)
rpol = prerouting_chain.get_policy()
self.assertEquals(id(pol), id(rpol))
pol = iptc.Policy("ACCEPT")
prerouting_chain.set_policy(pol)
rpol = prerouting_chain.get_policy()
self.assertEquals(id(pol), id(rpol))
pol = iptc.Policy("RETURN")
try:
prerouting_chain.set_policy(pol)
except iptc.IPTCError:
pass
else:
self.fail("managed to set PREROUTING policy to RETURN")
if is_table_available(iptc.Table.MANGLE):
table = iptc.Table(iptc.Table.MANGLE)
forward_chain = iptc.Chain(table, "FORWARD")
pol = iptc.Policy("DROP")
forward_chain.set_policy(pol)
rpol = forward_chain.get_policy()
self.assertEquals(id(pol), id(rpol))
pol = iptc.Policy("ACCEPT")
forward_chain.set_policy(pol)
rpol = forward_chain.get_policy()
self.assertEquals(id(pol), id(rpol))
pol = iptc.Policy("RETURN")
try:
forward_chain.set_policy(pol)
except iptc.IPTCError:
pass
else:
self.fail("managed to set FORWARD policy to RETURN")
class TestRule6(unittest.TestCase):
def setUp(self):
self.chain = iptc.Chain(iptc.Table6(iptc.Table6.FILTER),
"iptc_test_chain")
iptc.Table6(iptc.Table6.FILTER).create_chain(self.chain)
def tearDown(self):
self.chain.flush()
self.chain.delete()
def test_create_mask(self):
rule = iptc.Rule6()
# Mask /10 should return \xff\xc0\x00...
mask = rule._create_mask(10)
self.assertEquals(mask[0], 0xff)
self.assertEquals(mask[1], 0xc0)
self.assertEquals(mask[2:], [0x00]*14)
# Mask /27 should return \xff\xff\xff\xe0...
mask = rule._create_mask(27)
self.assertEquals(mask[:3], [0xff, 0xff, 0xff])
self.assertEquals(mask[3], 0xe0)
self.assertEquals(mask[4:], [0x00]*12)
def test_rule_address(self):
# valid addresses
rule = iptc.Rule6()
for addr in ["::/128", "!2000::1/16", "2001::/64", "!2001::1/48"]:
rule.src = addr
self.assertEquals(rule.src, addr)
rule.dst = addr
self.assertEquals(rule.dst, addr)
addr = "::1"
rule.src = addr
self.assertEquals("::1/128", rule.src)
rule.dst = addr
self.assertEquals("::1/128", rule.dst)
# invalid addresses
for addr in ["2001:fg::/::", "2001/ffff::", "2001::/-1", "2001::/129",
"::1/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff",
"::1/ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff:"]:
try:
rule.src = addr
except ValueError:
pass
else:
self.fail("rule accepted invalid address %s" % (addr))
try:
rule.dst = addr
except ValueError:
pass
else:
self.fail("rule accepted invalid address %s" % (addr))
def test_rule_interface(self):
# valid interfaces
rule = iptc.Rule6()
for intf in ["eth0", "eth+", "ip6tnl1", "ip6tnl+", "!ppp0", "!ppp+"]:
rule.in_interface = intf
self.assertEquals(intf, rule.in_interface)
rule.out_interface = intf
self.assertEquals(intf, rule.out_interface)
# invalid interfaces
for intf in ["itsaverylonginterfacename"]:
try:
rule.out_interface = intf
except ValueError:
pass
else:
self.fail("rule accepted invalid interface name %s" % (intf))
try:
rule.in_interface = intf
except ValueError:
pass
else:
self.fail("rule accepted invalid interface name %s" % (intf))
def test_rule_protocol(self):
rule = iptc.Rule6()
for proto in ["tcp", "udp", "icmp", "AH", "ESP", "!TCP", "!UDP",
"!ICMP", "!ah", "!esp", "sctp", "!SCTP"]:
rule.protocol = proto
self.assertEquals(proto.lower(), rule.protocol)
for proto in ["", "asdf", "!"]:
try:
rule.protocol = proto
except ValueError:
pass
except IndexError:
pass
else:
self.fail("rule accepted invalid protocol %s" % (proto))
def test_rule_protocol_numeric(self):
rule = iptc.Rule6()
rule.protocol = 33
self.assertEquals(rule.protocol, '33')
rule.protocol = '!33'
self.assertEquals(rule.protocol, '!33')
def test_rule_compare(self):
r1 = iptc.Rule6()
r1.src = "::1/128"
r1.dst = "2001::/8"
r1.protocol = "tcp"
r1.in_interface = "wlan+"
r1.out_interface = "eth1"
r2 = iptc.Rule6()
r2.src = "::1/128"
r2.dst = "2001::/8"
r2.protocol = "tcp"
r2.in_interface = "wlan+"
r2.out_interface = "eth1"
self.failUnless(r1 == r2)
r1.src = "::1/ffff::"
self.failIf(r1 == r2)
def test_rule_standard_target(self):
try:
target = iptc.Target(iptc.Rule(), "jump_to_chain")
except:
pass
else:
self.fail("target accepted invalid name jump_to_chain")
rule = iptc.Rule6()
rule.protocol = "tcp"
rule.src = "::1"
target = iptc.Target(rule, "RETURN")
self.assertEquals(target.name, "RETURN")
target = iptc.Target(rule, "ACCEPT")
self.assertEquals(target.name, "ACCEPT")
target = iptc.Target(rule, "")
self.assertEquals(target.name, "")
target.standard_target = "ACCEPT"
self.assertEquals(target.name, "ACCEPT")
self.assertEquals(target.standard_target, "ACCEPT")
target = iptc.Target(rule, self.chain.name)
rule.target = target
self.chain.insert_rule(rule)
self.chain.delete_rule(rule)
def test_rule_iterate_filter(self):
if is_table6_available(iptc.Table6.FILTER):
for r in (rule for chain in iptc.Table6(iptc.Table6.FILTER).chains
for rule in chain.rules if rule):
pass
def test_rule_iterate_raw(self):
if is_table6_available(iptc.Table6.RAW):
for r in (rule for chain in iptc.Table6(iptc.Table6.RAW).chains
for rule in chain.rules if rule):
pass
def test_rule_iterate_mangle(self):
if is_table6_available(iptc.Table6.MANGLE):
for r in (rule for chain in iptc.Table6(iptc.Table6.MANGLE).chains
for rule in chain.rules if rule):
pass
def test_rule_iterate_security(self):
if is_table6_available(iptc.Table6.SECURITY):
for r in (rule for chain in
iptc.Table6(iptc.Table6.SECURITY).chains
for rule in chain.rules if rule):
pass
def test_rule_insert(self):
rules = []
rule = iptc.Rule6()
rule.protocol = "tcp"
rule.src = "::1"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
self.chain.insert_rule(rule)
rules.append(rule)
rule = iptc.Rule6()
rule.protocol = "udp"
rule.src = "::1"
target = iptc.Target(rule, "REJECT")
target.reject_with = "addr-unreach"
rule.target = target
self.chain.insert_rule(rule)
rules.append(rule)
rule = iptc.Rule6()
rule.protocol = "tcp"
rule.dst = "2001::/16"
target = iptc.Target(rule, "RETURN")
rule.target = target
self.chain.insert_rule(rule)
rules.append(rule)
crules = self.chain.rules
self.failUnless(len(rules) == len(crules))
for rule in rules:
self.failUnless(rule in crules)
crules.remove(rule)
def test_rule_to_dict(self):
rule = iptc.Rule6()
rule.protocol = "tcp"
rule.src = "::1/128"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
rule_d = iptc.easy.decode_iptc_rule(rule, ipv6=True)
# Remove counters when comparing rules
rule_d.pop('counters', None)
self.assertEqual(rule_d, {"protocol": "tcp", "src": "::1/128", "target": "ACCEPT"})
def test_rule_from_dict(self):
rule = iptc.Rule6()
rule.protocol = "tcp"
rule.src = "::1/128"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src": "::1/128", "target": "ACCEPT"}, ipv6=True)
self.assertEqual(rule, rule2)
class TestRule(unittest.TestCase):
def setUp(self):
self.table = iptc.Table(iptc.Table.FILTER)
self.chain = iptc.Chain(self.table, "iptc_test_chain")
try:
self.table.create_chain(self.chain)
except:
self.chain.flush()
if is_table_available(iptc.Table.NAT):
self.table_nat = iptc.Table(iptc.Table.NAT)
self.chain_nat = iptc.Chain(self.table_nat, "iptc_test_nat_chain")
try:
self.table_nat.create_chain(self.chain_nat)
except:
self.chain_nat.flush()
def tearDown(self):
self.table.autocommit = True
self.chain.flush()
self.chain.delete()
if is_table_available(iptc.Table.NAT):
self.table_nat.autocommit = True
self.chain_nat.flush()
self.chain_nat.delete()
def test_rule_address(self):
# valid addresses
rule = iptc.Rule()
for addr in [("127.0.0.1/255.255.255.0", "127.0.0.0/255.255.255.0"),
("!127.0.0.1/255.255.255.0", "!127.0.0.0/255.255.255.0"),
("127.0.0.1/255.255.128.0", "127.0.0.0/255.255.128.0"),
("127.0.0.1/16", "127.0.0.0/255.255.0.0"),
("127.0.0.1/24", "127.0.0.0/255.255.255.0"),
("127.0.0.1/17", "127.0.0.0/255.255.128.0"),
("!127.0.0.1/17", "!127.0.0.0/255.255.128.0")]:
rule.src = addr[0]
self.assertEquals(rule.src, addr[1])
rule.dst = addr[0]
self.assertEquals(rule.dst, addr[1])
addr = "127.0.0.1"
rule.src = addr
self.assertEquals("127.0.0.1/255.255.255.255", rule.src)
rule.dst = addr
self.assertEquals("127.0.0.1/255.255.255.255", rule.dst)
# invalid addresses
for addr in ["127.256.0.1/255.255.255.0", "127.0.1/255.255.255.0",
"127.0.0.1/255.255.255.", "127.0.0.1 255.255.255.0",
"127.0.0.1/33", "127.0.0.1/-5", "127.0.0.1/255.5"]:
try:
rule.src = addr
except ValueError:
pass
else:
self.fail("rule accepted invalid address %s" % (addr))
try:
rule.dst = addr
except ValueError:
pass
else:
self.fail("rule accepted invalid address %s" % (addr))
def test_rule_interface(self):
# valid interfaces
rule = iptc.Rule()
for intf in ["eth0", "eth+", "ip6tnl1", "ip6tnl+", "!ppp0", "!ppp+"]:
rule.in_interface = intf
self.assertEquals(intf, rule.in_interface)
rule.out_interface = intf
self.assertEquals(intf, rule.out_interface)
rule.create_target("ACCEPT")
self.chain.insert_rule(rule)
r = self.chain.rules[0]
eq = r == rule
self.chain.flush()
self.assertTrue(eq)
# invalid interfaces
for intf in ["itsaverylonginterfacename"]:
try:
rule.out_interface = intf
except ValueError:
pass
else:
self.fail("rule accepted invalid interface name %s" % (intf))
try:
rule.in_interface = intf
except ValueError:
pass
else:
self.fail("rule accepted invalid interface name %s" % (intf))
def test_rule_fragment(self):
rule = iptc.Rule()
for frag in [("1", True), ("true", True), ("asdf", True), (1, True),
(0, False), ("", False), (None, False)]:
rule.fragment = frag[0]
self.assertEquals(frag[1], rule.fragment)
def test_rule_protocol(self):
rule = iptc.Rule()
for proto in ["tcp", "udp", "icmp", "AH", "ESP", "!TCP", "!UDP",
"!ICMP", "!ah", "!esp"]:
rule.protocol = proto
self.assertEquals(proto.lower(), rule.protocol)
for proto in ["", "asdf", "!"]:
try:
rule.protocol = proto
except ValueError:
pass
except IndexError:
pass
else:
self.fail("rule accepted invalid protocol %s" % (proto))
def test_rule_protocol_numeric(self):
rule = iptc.Rule()
rule.protocol = 33
self.assertEquals(rule.protocol, '33')
rule.protocol = '!33'
self.assertEquals(rule.protocol, '!33')
def test_rule_compare(self):
r1 = iptc.Rule()
r1.src = "127.0.0.2/255.255.255.0"
r1.dst = "224.1.2.3/255.255.0.0"
r1.protocol = "tcp"
r1.fragment = False
r1.in_interface = "wlan+"
r1.out_interface = "eth1"
r2 = iptc.Rule()
r2.src = "127.0.0.2/255.255.255.0"
r2.dst = "224.1.2.3/255.255.0.0"
r2.protocol = "tcp"
r2.fragment = False
r2.in_interface = "wlan+"
r2.out_interface = "eth1"
self.failUnless(r1 == r2)
r1.src = "127.0.0.1"
self.failIf(r1 == r2)
def test_rule_standard_target(self):
try:
target = iptc.Target(iptc.Rule(), "jump_to_chain")
except:
pass
else:
self.fail("target accepted invalid name jump_to_chain")
rule = iptc.Rule()
rule.protocol = "tcp"
rule.src = "127.0.0.1"
target = iptc.Target(rule, "RETURN")
self.assertEquals(target.name, "RETURN")
target = iptc.Target(rule, "ACCEPT")
self.assertEquals(target.name, "ACCEPT")
target = iptc.Target(rule, "")
self.assertEquals(target.name, "")
target.standard_target = "ACCEPT"
self.assertEquals(target.name, "ACCEPT")
self.assertEquals(target.standard_target, "ACCEPT")
target = iptc.Target(rule, self.chain.name)
rule.target = target
self.chain.insert_rule(rule)
self.chain.delete_rule(rule)
def test_rule_iterate_filter(self):
if is_table_available(iptc.Table.FILTER):
for r in (rule for chain in iptc.Table(iptc.Table.FILTER).chains
for rule in chain.rules if rule):
pass
def test_rule_iterate_nat(self):
if is_table_available(iptc.Table.NAT):
for r in (rule for chain in iptc.Table(iptc.Table.NAT).chains
for rule in chain.rules if rule):
pass
def test_rule_iterate_mangle(self):
if is_table_available(iptc.Table.MANGLE):
for r in (rule for chain in iptc.Table(iptc.Table.MANGLE).chains
for rule in chain.rules if rule):
pass
def test_rule_iterate_rulenum(self):
"""Ensure rule numbers are always returned in order"""
insert_rule_count = 3
append_rule_count = 3
for rule_num in range(insert_rule_count, 0, -1):
rule = iptc.Rule()
match = rule.create_match("comment")
match.comment = "rule{rule_num}".format(rule_num=rule_num)
rule.create_target("ACCEPT")
self.chain.insert_rule(rule)
append_rulenum_start = insert_rule_count + 1
append_rulenum_end = append_rulenum_start + 3
for rule_num in range(append_rulenum_start, append_rulenum_end):
rule = iptc.Rule()
match = rule.create_match("comment")
match.comment = "rule{rule_num}".format(rule_num=rule_num)
rule.create_target("ACCEPT")
self.chain.append_rule(rule)
rules = self.chain.rules
assert len(rules) == (insert_rule_count + append_rule_count)
for rule_num, rule in enumerate(rules, start=1):
assert len(rule.matches) == 1
assert rule.matches[0].comment == "rule{rule_num}".format(
rule_num=rule_num), \
"rule[{left_num}] is not new {right_num}".format(
left_num=rule_num,
right_num=rule.matches[0].comment
)
def test_rule_insert(self):
rules = []
rule = iptc.Rule()
rule.protocol = "tcp"
rule.src = "127.0.0.1"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
self.chain.insert_rule(rule)
rules.append(rule)
rule = iptc.Rule()
rule.protocol = "udp"
rule.src = "127.0.0.1"
target = iptc.Target(rule, "REJECT")
target.reject_with = "host-unreach"
rule.target = target
self.chain.insert_rule(rule)
rules.append(rule)
rule = iptc.Rule()
rule.protocol = "tcp"
rule.dst = "10.1.1.0/255.255.255.0"
target = iptc.Target(rule, "RETURN")
rule.target = target
self.chain.insert_rule(rule)
rules.append(rule)
crules = self.chain.rules
self.failUnless(len(rules) == len(crules))
for rule in rules:
self.failUnless(rule in crules)
crules.remove(rule)
def test_rule_replace(self):
rule = iptc.Rule()
rule.protocol = "tcp"
rule.src = "127.0.0.1"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
self.chain.insert_rule(rule, 0)
rule = iptc.Rule()
rule.protocol = "udp"
rule.src = "127.0.0.1"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
self.chain.replace_rule(rule, 0)
self.failUnless(self.chain.rules[0] == rule)
def test_rule_multiple_parameters(self):
self.table.autocommit = False
self.table.refresh()
rule = iptc.Rule()
rule.dst = "127.0.0.1"
rule.protocol = "tcp"
match = rule.create_match('tcp')
match.sport = "1234"
match.dport = "8080"
target = rule.create_target("REJECT")
target.reject_with = "icmp-host-unreachable"
self.chain.insert_rule(rule)
self.table.commit()
self.table.refresh()
self.assertEquals(len(self.chain.rules), 1)
r = self.chain.rules[0]
self.assertEquals(r.src, '0.0.0.0/0.0.0.0')
self.assertEquals(r.dst, '127.0.0.1/255.255.255.255')
self.assertEquals(r.protocol, 'tcp')
self.assertEquals(len(r.matches), 1)
m = r.matches[0]
self.assertEquals(m.name, 'tcp')
self.assertEquals(m.sport, '1234')
self.assertEquals(m.dport, '8080')
def test_rule_delete(self):
self.table.autocommit = False
self.table.refresh()
for p in ['8001', '8002', '8003']:
rule = iptc.Rule()
rule.dst = "127.0.0.1"
rule.protocol = "tcp"
rule.dport = "8080"
target = rule.create_target("REJECT")
target.reject_with = "icmp-host-unreachable"
self.chain.insert_rule(rule)
self.table.commit()
self.table.refresh()
rules = self.chain.rules
for rule in rules:
self.chain.delete_rule(rule)
self.table.commit()
self.table.refresh()
def test_rule_delete_nat(self):
if not is_table_available(iptc.Table.NAT):
return
self.table_nat.autocommit = False
self.table_nat.refresh()
for p in ['8001', '8002', '8003']:
rule = iptc.Rule()
rule.dst = "127.0.0.1"
rule.protocol = "udp"
rule.dport = "8080"
target = rule.create_target("DNAT")
target.to_destination = '127.0.0.0:' + p
self.chain_nat.insert_rule(rule)
self.table_nat.commit()
self.table_nat.refresh()
rules = self.chain_nat.rules
for rule in rules:
self.chain_nat.delete_rule(rule)
self.table_nat.commit()
self.table_nat.refresh()
def test_rule_to_dict(self):
rule = iptc.Rule()
rule.protocol = "tcp"
rule.src = "127.0.0.1/32"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
rule_d = iptc.easy.decode_iptc_rule(rule)
# Remove counters when comparing rules
rule_d.pop('counters', None)
self.assertEqual(rule_d, {"protocol": "tcp", "src": "127.0.0.1/32", "target": "ACCEPT"})
def test_rule_from_dict(self):
rule = iptc.Rule()
rule.protocol = "tcp"
rule.src = "127.0.0.1/32"
target = iptc.Target(rule, "ACCEPT")
rule.target = target
rule2 = iptc.easy.encode_iptc_rule({"protocol": "tcp", "src": "127.0.0.1/32", "target": "ACCEPT"})
self.assertEqual(rule, rule2)
def suite():
suite_table6 = unittest.TestLoader().loadTestsFromTestCase(TestTable6)
suite_table = unittest.TestLoader().loadTestsFromTestCase(TestTable)
suite_chain = unittest.TestLoader().loadTestsFromTestCase(TestChain)
suite_rule6 = unittest.TestLoader().loadTestsFromTestCase(TestRule6)
suite_rule = unittest.TestLoader().loadTestsFromTestCase(TestRule)
return unittest.TestSuite([suite_table6, suite_table, suite_chain,
suite_rule6, suite_rule])
def run_tests():
result = unittest.TextTestRunner(verbosity=2).run(suite())
if result.errors or result.failures:
return 1
return 0
if __name__ == "__main__":
unittest.main()
0707010000002A000081ED0000000000000000000000015EE6923E0000437F000000000000000000000000000000000000002C00000000python-iptables-1.0.0/tests/test_matches.py# -*- coding: utf-8 -*-
import unittest
import iptc
is_table6_available = iptc.is_table6_available
class TestMatch(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_match_create(self):
rule = iptc.Rule()
match = rule.create_match("udp")
for m in rule.matches:
self.assertEqual(m, match)
# check that we can change match parameters after creation
match.sport = "12345:55555"
match.dport = "!33333"
m = iptc.Match(iptc.Rule(), "udp")
m.sport = "12345:55555"
m.dport = "!33333"
self.assertEqual(m, match)
def test_match_compare(self):
m1 = iptc.Match(iptc.Rule(), "udp")
m1.sport = "12345:55555"
m1.dport = "!33333"
m2 = iptc.Match(iptc.Rule(), "udp")
m2.sport = "12345:55555"
m2.dport = "!33333"
self.assertEqual(m1, m2)
m2.reset()
m2.sport = "12345:55555"
m2.dport = "33333"
self.assertNotEqual(m1, m2)
def test_match_parameters(self):
m = iptc.Match(iptc.Rule(), "udp")
m.sport = "12345:55555"
m.dport = "!33333"
self.assertEqual(len(m.parameters), 2)
for p in m.parameters:
self.assertTrue(p == "sport" or p == "dport")
self.assertEqual(m.parameters["sport"], "12345:55555")
self.assertEqual(m.parameters["dport"], "!33333")
m.reset()
self.assertEqual(len(m.parameters), 0)
def test_get_all_parameters(self):
m = iptc.Match(iptc.Rule(), "udp")
m.sport = "12345:55555"
m.dport = "!33333"
params = m.get_all_parameters()
self.assertEqual(set(params['sport']), set(['12345:55555']))
self.assertEqual(set(params['dport']), set(['!', '33333']))
class TestMultiportMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "udp"
self.rule.create_target("ACCEPT")
self.match = self.rule.create_match("multiport")
table = iptc.Table(iptc.Table.FILTER)
self.chain = iptc.Chain(table, "iptc_test_udp")
try:
self.chain.flush()
self.chain.delete()
except:
pass
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_multiport(self):
self.match.dports = '1111,2222'
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
match = rule.matches[0]
self.assertEqual(match.dports, '1111,2222')
def test_unicode_multiport(self):
self.match.dports = u'1111,2222'
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
match = rule.matches[0]
self.assertEqual(match.dports, '1111,2222')
class TestXTUdpMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "udp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "udp")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER), "iptc_test_udp")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_udp_port(self):
for port in ["12345", "12345:65535", "!12345", "12345:12346",
"!12345:12346", "0:1234", "! 1234", "!0:12345",
"!1234:65535"]:
self.match.sport = port
self.assertEqual(self.match.sport, port.replace(" ", ""))
self.match.dport = port
self.assertEqual(self.match.dport, port.replace(" ", ""))
self.match.reset()
for port in ["-1", "asdf", "!asdf"]:
try:
self.match.sport = port
except Exception:
pass
else:
self.fail("udp accepted invalid source port %s" % (port))
try:
self.match.dport = port
except Exception:
pass
else:
self.fail("udp accepted invalid destination port %s" % (port))
self.match.reset()
def test_udp_insert(self):
self.match.reset()
self.match.dport = "12345"
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestXTMarkMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "tcp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "mark")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_mark")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_mark(self):
for mark in ["0x7b", "! 0x7b", "0x7b/0xfffefffe", "!0x7b/0xff00ff00"]:
self.match.mark = mark
self.assertEqual(self.match.mark, mark.replace(" ", ""))
self.match.reset()
for mark in ["0xffffffffff", "123/0xffffffff1", "!asdf", "1234:1233"]:
try:
self.match.mark = mark
except Exception:
pass
else:
self.fail("mark accepted invalid value %s" % (mark))
self.match.reset()
def test_mark_insert(self):
self.match.reset()
self.match.mark = "0x123"
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestXTLimitMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "tcp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "limit")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_limit")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_limit(self):
for limit in ["1/sec", "5/min", "3/hour"]:
self.match.limit = limit
self.assertEqual(self.match.limit, limit)
self.match.reset()
for limit in ["asdf", "123/1", "!1", "!1/second"]:
try:
self.match.limit = limit
except Exception:
pass
else:
self.fail("limit accepted invalid value %s" % (limit))
self.match.reset()
def test_limit_insert(self):
self.match.reset()
self.match.limit = "1/min"
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestIcmpv6Match(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule6()
self.rule.protocol = "icmpv6"
self.rule.in_interface = "eth0"
self.target = self.rule.create_target("ACCEPT")
self.match = self.rule.create_match("icmp6")
self.match.icmpv6_type = "echo-request"
self.table = iptc.Table6(iptc.Table6.FILTER)
self.chain = iptc.Chain(self.table, "ip6tc_test_icmpv6")
try:
self.table.delete_chain(self.chain)
except:
pass
self.table.create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_icmpv6(self):
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
self.assertEqual(self.rule, rule)
class TestCommentMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "udp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "comment")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_comment")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_comment(self):
comment = "comment test"
self.match.reset()
self.match.comment = comment
self.chain.insert_rule(self.rule)
self.assertEqual(self.match.comment, comment)
class TestIprangeMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.protocol = "tcp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "iprange")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_iprange")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_iprange(self):
self.match.src_range = "192.168.1.100-192.168.1.200"
self.match.dst_range = "172.22.33.106"
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
def test_iprange_tcpdport(self):
self.match.src_range = "192.168.1.100-192.168.1.200"
self.match.dst_range = "172.22.33.106"
self.rule.add_match(self.match)
match = iptc.Match(self.rule, "tcp")
match.dport = "22"
self.rule.add_match(match)
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestXTStateMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "tcp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "state")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_state")
self.table = iptc.Table(iptc.Table.FILTER)
try:
self.chain.flush()
self.chain.delete()
except:
pass
self.table.create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_state(self):
self.match.state = "RELATED,ESTABLISHED"
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
m = rule.matches[0]
self.assertEqual(m.name, "state")
self.assertEqual(m.state, "RELATED,ESTABLISHED")
self.assertEqual(rule.matches[0].name, self.rule.matches[0].name)
self.assertEqual(rule, self.rule)
class TestXTConntrackMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "tcp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "conntrack")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_conntrack")
self.table = iptc.Table(iptc.Table.FILTER)
try:
self.chain.flush()
self.chain.delete()
except:
pass
self.table.create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_state(self):
self.match.ctstate = "NEW,RELATED"
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
m = rule.matches[0]
self.assertTrue(m.name, ["conntrack"])
self.assertEqual(m.ctstate, "NEW,RELATED")
class TestHashlimitMatch(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.src = "127.0.0.1"
self.rule.protocol = "udp"
self.rule.target = iptc.Target(self.rule, "ACCEPT")
self.match = iptc.Match(self.rule, "hashlimit")
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_hashlimit")
self.table = iptc.Table(iptc.Table.FILTER)
try:
self.chain.flush()
self.chain.delete()
except:
pass
self.table.create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_hashlimit(self):
self.match.hashlimit_name = 'foo'
self.match.hashlimit_mode = 'srcip'
self.match.hashlimit_upto = '200/sec'
self.match.hashlimit = '200'
self.match.hashlimit_htable_expire = '100'
self.rule.add_match(self.match)
self.chain.insert_rule(self.rule)
rule = self.chain.rules[0]
m = rule.matches[0]
self.assertTrue(m.name, ["hashlimit"])
self.assertEqual(m.hashlimit_name, "foo")
self.assertEqual(m.hashlimit_mode, "srcip")
self.assertEqual(m.hashlimit_upto, "200/sec")
self.assertEqual(m.hashlimit_burst, "5")
class TestRecentMatch(unittest.TestCase):
def setUp(self):
self.table = 'filter'
self.chain = 'iptc_test_recent'
iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False)
iptc.easy.add_chain(self.table, self.chain, ipv6=False, raise_exc=True)
def tearDown(self):
iptc.easy.delete_chain(self.table, self.chain, ipv6=False, flush=True, raise_exc=False)
def test_recent(self):
rule_d = {
'protocol': 'udp',
'recent': {
'mask': '255.255.255.255',
'update': '',
'seconds': '60',
'rsource': '',
'name': 'UDP-PORTSCAN',
},
'target': {
'REJECT':{
'reject-with': 'icmp-port-unreachable'
}
}
}
iptc.easy.add_rule(self.table, self.chain, rule_d)
rule2_d = iptc.easy.get_rule(self.table, self.chain, -1)
# Remove counters when comparing rules
rule2_d.pop('counters', None)
self.assertEqual(rule_d, rule2_d)
def suite():
suite_match = unittest.TestLoader().loadTestsFromTestCase(TestMatch)
suite_udp = unittest.TestLoader().loadTestsFromTestCase(TestXTUdpMatch)
suite_mark = unittest.TestLoader().loadTestsFromTestCase(TestXTMarkMatch)
suite_limit = unittest.TestLoader().loadTestsFromTestCase(TestXTLimitMatch)
suite_mport = unittest.TestLoader().loadTestsFromTestCase(
TestMultiportMatch)
suite_comment = unittest.TestLoader().loadTestsFromTestCase(
TestCommentMatch)
suite_iprange = unittest.TestLoader().loadTestsFromTestCase(
TestIprangeMatch)
suite_state = unittest.TestLoader().loadTestsFromTestCase(TestXTStateMatch)
suite_conntrack = unittest.TestLoader().loadTestsFromTestCase(
TestXTConntrackMatch)
suite_hashlimit = unittest.TestLoader().loadTestsFromTestCase(
TestHashlimitMatch)
suite_recent = unittest.TestLoader().loadTestsFromTestCase(
TestRecentMatch)
extra_suites = []
if is_table6_available(iptc.Table6.FILTER):
extra_suites += unittest.TestLoader().loadTestsFromTestCase(
TestIcmpv6Match)
return unittest.TestSuite([suite_match, suite_udp, suite_mark,
suite_limit, suite_mport, suite_comment,
suite_iprange, suite_state, suite_conntrack,
suite_hashlimit, suite_recent] + extra_suites)
def run_tests():
result = unittest.TextTestRunner(verbosity=2).run(suite())
if result.errors or result.failures:
return 1
return 0
if __name__ == "__main__":
unittest.main()
0707010000002B000081ED0000000000000000000000015EE6923E0000375D000000000000000000000000000000000000002C00000000python-iptables-1.0.0/tests/test_targets.py# -*- coding: utf-8 -*-
import unittest
import iptc
from iptc.xtables import xtables_version
is_table_available = iptc.is_table_available
class TestTarget(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_target_create(self):
rule = iptc.Rule()
target = rule.create_target("MARK")
self.failUnless(rule.target == target)
target.set_mark = "0x123"
t = iptc.Target(iptc.Rule(), "MARK")
t.set_mark = "0x123"
self.failUnless(t == target)
def test_target_compare(self):
t1 = iptc.Target(iptc.Rule(), "MARK")
t1.set_mark = "0x123"
t2 = iptc.Target(iptc.Rule(), "MARK")
t2.set_mark = "0x123"
self.failUnless(t1 == t2)
t2.reset()
t2.set_mark = "0x124"
self.failIf(t1 == t2)
def test_target_parameters(self):
t = iptc.Target(iptc.Rule(), "CONNMARK")
t.nfmask = "0xdeadbeef"
t.ctmask = "0xfefefefe"
t.save_mark = ""
self.failUnless(len(t.parameters) == 3)
for p in t.parameters:
self.failUnless(p == "ctmask" or p == "nfmask" or
p == "save_mark")
self.failUnless(t.parameters["save_mark"] == "")
self.failUnless(t.parameters["nfmask"] == "0xdeadbeef")
self.failUnless(t.parameters["ctmask"] == "0xfefefefe")
t.reset()
self.failUnless(len(t.parameters) == 1)
class TestXTClusteripTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.in_interface = "eth0"
self.match = iptc.Match(self.rule, "tcp")
self.rule.add_match(self.match)
self.target = iptc.Target(self.rule, "CLUSTERIP")
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.FILTER),
"iptc_test_clusterip")
iptc.Table(iptc.Table.FILTER).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_mode(self):
for hashmode in ["sourceip", "sourceip-sourceport",
"sourceip-sourceport-destport"]:
self.target.new = ""
self.target.hashmode = hashmode
self.assertEquals(self.target.hashmode, hashmode)
self.target.reset()
for hashmode in ["asdf", "1234"]:
self.target.new = ""
try:
self.target.hashmode = hashmode
except Exception:
pass
else:
self.fail("CLUSTERIP accepted invalid value %s" % (hashmode))
self.target.reset()
def test_insert(self):
self.target.reset()
self.target.new = ""
self.target.hashmode = "sourceip"
self.target.clustermac = "01:02:03:04:05:06"
self.target.local_node = "1"
self.target.total_nodes = "2"
self.rule.target = self.target
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestIPTRedirectTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.in_interface = "eth0"
self.match = iptc.Match(self.rule, "tcp")
self.rule.add_match(self.match)
self.target = iptc.Target(self.rule, "REDIRECT")
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.NAT),
"iptc_test_redirect")
iptc.Table(iptc.Table.NAT).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_mode(self):
for port in ["1234", "1234-2345", "65534-65535"]:
self.target.to_ports = port
self.assertEquals(self.target.to_ports, port)
self.target.reset()
self.target.random = ""
self.target.reset()
for port in ["1234567", "2345-1234"]: # ipt bug: it accepts strings
try:
self.target.to_ports = port
except Exception:
pass
else:
self.fail("REDIRECT accepted invalid value %s" % (port))
self.target.reset()
def test_insert(self):
self.target.reset()
self.target.to_ports = "1234-1235"
self.rule.target = self.target
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestXTTosTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.in_interface = "eth0"
self.match = iptc.Match(self.rule, "tcp")
self.rule.add_match(self.match)
self.target = iptc.Target(self.rule, "TOS")
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE), "iptc_test_tos")
iptc.Table(iptc.Table.MANGLE).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_set_tos(self):
for tos in ["0x12/0xff", "0x12/0x0f"]:
self.target.set_tos = tos
self.assertEquals(self.target.set_tos, tos)
self.target.reset()
for tos in [("Minimize-Delay", "0x10/0x3f"),
("Maximize-Throughput", "0x08/0x3f"),
("Maximize-Reliability", "0x04/0x3f"),
("Minimize-Cost", "0x02/0x3f"),
("Normal-Service", "0x00/0x3f")]:
self.target.set_tos = tos[0]
self.assertEquals(self.target.set_tos, tos[1])
self.target.reset()
def test_tos_mode(self):
for tos in ["0x04"]:
self.target.and_tos = tos
self.assertEquals(self.target.set_tos, "0x00/0xfb")
self.target.reset()
self.target.or_tos = tos
self.assertEquals(self.target.set_tos, "0x04/0x04")
self.target.reset()
self.target.xor_tos = tos
self.assertEquals(self.target.set_tos, "0x04/0x00")
self.target.reset()
for tos in ["0x1234", "0x12/0xfff", "asdf", "Minimize-Bullshit"]:
try:
self.target.and_tos = tos
except Exception:
pass
else:
self.fail("TOS accepted invalid value %s" % (tos))
self.target.reset()
try:
self.target.or_tos = tos
except Exception:
pass
else:
self.fail("TOS accepted invalid value %s" % (tos))
self.target.reset()
try:
self.target.xor_tos = tos
except Exception:
pass
else:
self.fail("TOS accepted invalid value %s" % (tos))
self.target.reset()
def test_insert(self):
self.target.reset()
self.target.set_tos = "0x12/0xff"
self.rule.target = self.target
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestDnatTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.in_interface = "eth0"
self.match = iptc.Match(self.rule, "tcp")
self.rule.add_match(self.match)
self.target = iptc.Target(self.rule, "DNAT")
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.MANGLE),
"iptc_test_dnat")
iptc.Table(iptc.Table.MANGLE).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_mode(self):
for dst in ["1.2.3.4", "199.199.199.199-199.199.199.255",
"1.2.3.4:5678", "1.2.3.4:5678-5688"]:
self.target.to_destination = dst
self.assertEquals(self.target.to_destination, dst)
self.target.reset()
self.target.to_destination = dst
self.target.random = "1"
self.assertEquals(self.target.to_destination, dst)
self.target.reset()
self.target.to_destination = dst
self.target.persistent = "1"
self.assertEquals(self.target.to_destination, dst)
self.target.reset()
def test_insert(self):
self.target.reset()
self.target.to_destination = "1.2.3.4"
self.rule.target = self.target
self.chain.insert_rule(self.rule)
for r in self.chain.rules:
if r != self.rule:
self.fail("inserted rule does not match original")
class TestIPTMasqueradeTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.out_interface = "eth0"
self.target = iptc.Target(self.rule, "MASQUERADE")
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.NAT),
"iptc_test_masquerade")
iptc.Table(iptc.Table.NAT).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_mode(self):
for port in ["1234", "1234-2345"]:
self.target.to_ports = port
self.assertEquals(self.target.to_ports, port)
self.target.reset()
self.target.random = ""
self.target.reset()
for port in ["123456", "1234-1233", "asdf"]:
try:
self.target.to_ports = port
except Exception:
pass
else:
self.fail("MASQUERADE accepted invalid value %s" % (port))
self.target.reset()
def test_insert(self):
self.target.reset()
self.target.to_ports = "1234"
self.rule.target = self.target
self.chain.insert_rule(self.rule)
found = False
for r in self.chain.rules:
if r == self.rule:
found = True
break
if not found:
self.fail("inserted rule does not match original")
class TestXTNotrackTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.out_interface = "eth0"
self.target = iptc.Target(self.rule, "NOTRACK")
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.RAW),
"iptc_test_notrack")
try:
self.chain.flush()
self.chain.delete()
except:
pass
iptc.Table(iptc.Table.RAW).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_notrack(self):
self.chain.insert_rule(self.rule)
t = self.chain.rules[0].target
self.assertTrue(t.name in ["NOTRACK", "CT"])
class TestXTCtTarget(unittest.TestCase):
def setUp(self):
self.rule = iptc.Rule()
self.rule.dst = "127.0.0.2"
self.rule.protocol = "tcp"
self.rule.out_interface = "eth0"
self.target = iptc.Target(self.rule, "CT")
self.target.notrack = "true"
self.rule.target = self.target
self.chain = iptc.Chain(iptc.Table(iptc.Table.RAW),
"iptc_test_ct")
try:
self.chain.flush()
self.chain.delete()
except:
pass
iptc.Table(iptc.Table.RAW).create_chain(self.chain)
def tearDown(self):
for r in self.chain.rules:
self.chain.delete_rule(r)
self.chain.flush()
self.chain.delete()
def test_ct(self):
self.chain.insert_rule(self.rule)
t = self.chain.rules[0].target
self.assertEquals(t.name, "CT")
self.assertTrue(t.notrack is not None)
def suite():
suites = []
suite_target = unittest.TestLoader().loadTestsFromTestCase(TestTarget)
suite_tos = unittest.TestLoader().loadTestsFromTestCase(TestXTTosTarget)
suite_cluster = unittest.TestLoader().loadTestsFromTestCase(
TestXTClusteripTarget)
suite_redir = unittest.TestLoader().loadTestsFromTestCase(
TestIPTRedirectTarget)
suite_masq = unittest.TestLoader().loadTestsFromTestCase(
TestIPTMasqueradeTarget)
suite_dnat = unittest.TestLoader().loadTestsFromTestCase(
TestDnatTarget)
suite_notrack = unittest.TestLoader().loadTestsFromTestCase(
TestXTNotrackTarget)
suite_ct = unittest.TestLoader().loadTestsFromTestCase(TestXTCtTarget)
suites.extend([suite_target, suite_cluster, suite_tos])
if is_table_available(iptc.Table.NAT):
suites.extend([suite_redir, suite_masq, suite_dnat])
if is_table_available(iptc.Table.RAW) and xtables_version >= 10:
suites.extend([suite_notrack, suite_ct])
return unittest.TestSuite(suites)
def run_tests():
result = unittest.TextTestRunner(verbosity=2).run(suite())
if result.errors or result.failures:
return 1
return 0
if __name__ == "__main__":
unittest.main()
07070100000000000000000000000000000000000000010000000000000000000000000000000000000000000000000000000B00000000TRAILER!!!611 blocks