Python 使用pcapy在Python中实时捕获数据包

import pcapy
from impacket.ImpactDecoder import *

# list all the network devices
pcapy.findalldevs()

max_bytes = 1024
promiscuous = False
read_timeout = 100 # in milliseconds
pc = pcapy.open_live("name of network device to capture from", max_bytes, promiscuous, read_timeout)

pc.setfilter('tcp')

# callback for received packets
def recv_pkts(hdr, data):
  packet = EthDecoder().decode(data)
  print packet

packet_limit = -1 # infinite
pc.loop(packet_limit, recv_pkts) # capture packets

Python 简单的主要

#!/usr/bin/python

class A():
	"""
	"""

	def __init__(self):
		pass

if __name__ == "__main__":
	a = A()

Python ID3标签中的MP3校验和

#!/usr/bin/python

"""mp3md5: MP3 checksums stored in ID3v2

mp3md5 calculates MD5 checksums for all MP3's on the command line
(either individual files, or directories which are recursively
processed).

Checksums are calculated by skipping the ID3v2 tag at the start of the
file and any ID3v1 tag at the end (does not however know about APEv2
tags).  The checksum is stored in an ID3v2 UFID (Unique File ID) frame
with owner 'md5' (the ID3v2 tag is created if necessary).

Usage: mp3md5.py [options] [files or directories]

-h/--help
  Output this message and exit.

-l/--license
  Output license terms for mp3md5 and exit.

-n/--nocheck
  Do not check existing checksums (so no CONFIRMED or CHANGED lines
  will be output). Causes --update to be ignored.

-r/--remove
  Remove checksums, outputting REMOVED lines (outputs NOCHECKSUM for
  files already without them).  Ignores --nocheck and --update.

-u/--update
  Instead of printing changes, update the checksum aand output UPDATED
  lines.

Depends on the eyeD3 module (http://eyeD3.nicfit.net)

Copyright 2007 G raham P oulter
"""

__copyright__ = "2007 G raham P oulter"
__author__ = "G raham P oulter"
__license__ = """This program is free software: you can redistribute it and/or
modify it under the terms of the GNU General Public License as published by the
Free Software Foundation, either version 3 of the License, or (at your option)
any later version.

This program is distributed in the hope that it will be useful, but WITHOUT ANY
WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
PARTICULAR PURPOSE. See the GNU General Public License for more details.

You should have received a copy of the GNU General Public License along with
this program. If not, see <http://www.gnu.org/licenses/>."""

import eyeD3
from getopt import getopt
import md5
import os
import struct
import sys

pretend = False # Whether to pretend to write tags
nocheck = False # Whether to not check existing sums
remove = False  # Whether to remove checksums
update = False  # Whether to update changed checksums
    
def log(head, body, *args):
    """Print a message to standard output"""
    print head + " "*(12-len(head)) + (body % args)

def openTag(fpath):
    """Attempt to open ID3 tag, creating a new one if not present"""
    if not eyeD3.tag.isMp3File(fpath):
        raise ValueError("NOT AN MP3: %s" % fpath)
    try:
        audioFile = eyeD3.tag.Mp3AudioFile(fpath, eyeD3.ID3_V2)
    except eyeD3.tag.InvalidAudioFormatException, ex:
        raise ValueError("ERROR IN MP3: %s" % fpath)
    tag = audioFile.getTag()
    if tag is None:
        tag = eyeD3.Tag(fpath)
        tag.header.setVersion(eyeD3.ID3_V2_3)
        if not pretend:
            tag.update()
    return tag

### WARNING: REMEMBER TO UPDATE THE COPY IN MD5DIR
def calculateUID(filepath):
    """Calculate MD5 for an MP3 excluding ID3v1 and ID3v2 tags if
    present. See www.id3.org for tag format specifications."""
    f = open(filepath, "rb")
    # Detect ID3v1 tag if present
    finish = os.stat(filepath).st_size;
    f.seek(-128, 2)
    if f.read(3) == "TAG":
        finish -= 128
    # ID3 at the start marks ID3v2 tag (0-2)
    f.seek(0)
    start = f.tell()
    if f.read(3) == "ID3":
        # Bytes w major/minor version (3-4)
        version = f.read(2)
        # Flags byte (5)
        flags = struct.unpack("B", f.read(1))[0]
        # Flat bit 4 means footer is present (10 bytes)
        footer = flags & (1<<4)
        # Size of tag body synchsafe integer (6-9)
        bs = struct.unpack("BBBB", f.read(4))
        bodysize = (bs[0]<<21) + (bs[1]<<14) + (bs[2]<<7) + bs[3]
        # Seek to end of ID3v2 tag
        f.seek(bodysize, 1)
        if footer:
            f.seek(10, 1)
        # Start of rest of the file
        start = f.tell()
    # Calculate MD5 using stuff between tags
    f.seek(start)
    h = md5.new()
    h.update(f.read(finish-start))
    f.close()
    return h.hexdigest()

def readUID(fpath):
    """Read MD5 UID from ID3v2 tag of fpath."""
    tag = openTag(fpath)
    for x in tag.getUniqueFileIDs():
        if x.owner_id == "md5":
            return x.id
    return None

def removeUID(fpath):
    """Remove MD5 UID from ID3v2 tag of fpath"""
    tag = openTag(fpath)
    todel = None
    for i, x in enumerate(tag.frames):
        if isinstance(x, eyeD3.frames.UniqueFileIDFrame) \
               and x.owner_id == "md5":
            todel = i
            break
    if todel is not None:
        del tag.frames[i]
        if not pretend:
            tag.update(eyeD3.ID3_V2_3)
        return True
    else:
        return False

def writeUID(fpath, uid):
    """Write the MD5 UID in the ID3v2 tag of fpath."""
    tag = openTag(fpath)
    present = False
    for x in tag.getUniqueFileIDs():
        if x.owner_id == "md5":
            present = True
            x.id = uid
            break
    if not present:
        tag.addUniqueFileID("md5", uid)
    if not pretend:
        tag.update(eyeD3.ID3_V2_3)

def mungeUID(fpath):
    "Update the MD5 UID on the tag"""
    if remove:
        if removeUID(fpath):
            log("REMOVED", fpath)
        else:
            log("NOCHECKSUM", fpath)
    else:
        cur_uid = readUID(fpath)
        if cur_uid is None:
            new_uid = calculateUID(fpath)
            writeUID(fpath, new_uid)
            log("ADDED", fpath)
        elif not nocheck:
            new_uid = calculateUID(fpath)
            if cur_uid == new_uid:
                log("CONFIRMED", fpath)
            elif update:
                writeUID(fpath, new_uid)
                log("UPDATED", fpath)
            else:
                log("BROKEN", fpath)

if __name__ == "__main__":
    optlist, args = getopt(sys.argv[1:], "hlnru", ["help","license","nocheck","remove","update"])
    for key, value in optlist:
        if key in ("-h","--help"):
            print __doc__
            sys.exit(0)
        elif key in ("-l","--license"):
            print license
            sys.exit(0)
        elif key in ("-n","--nocheck"):
            nocheck = True
        elif key in ("-r", "--remove"):
            remove = True
        elif key in ("-u", "--update"):
            update = True
    for start in args:
        if os.path.isfile(start):
            if start.endswith(".mp3"):
                mungeUID(start)
        elif os.path.isdir(start):
            for root, dirs, files in os.walk(start):
                dirs.sort()
                files.sort()
                for fname in files:
                    if fname.endswith(".mp3"):
                        mungeUID(os.path.join(root,fname))
        else:
            log("WARNING", "%s does not exist", start)

Python GData python客户端访问谷歌笔记本

#!/usr/bin/python

import gdata.service

#Get the generic GData service
gservice = gdata.service.GDataService()

#Set the server we are going to work with
gservice.server = "www.google.com"

#User id for google notebook to use: 09641038559637245294
#(see google notebook sharing options to optains yours)
#Get the feed for shared notebooks
feed = gservice.GetFeed("/notebook/feeds/09641038559637245294")
#This call is the same that doing this:
# curl http://www.google.com/notebook/feeds/09641038559637245294

#For each notebook in feeds use gdata api to extract info
for notebook_entry in feed.entry:
	print "Notebookt: %s" % notebook_entry.title.text
	print "URI: %s" % notebook_entry.GetSelfLink().href

Python 字典类

class dictclass:
	"""Takes a dictionary and uses it as its __dict__ member, effectively
	making the dictionary supply the object's attributes."""
	
	def __init__( self, d ):
		self.__dict__ = d

Python 使用Python进行单元测试

"""Unit test for messager.py."""

from messager import messager, Receiver
import unittest

class Logger(Receiver):
    """An object that maintains a log, in self.log, of the order in which its
    methods foo and bar are called and the arguments that are passed to them.
    Each item in the list is a 2-tuple: the method that was called, and a tuple
    of the arguments that were passed to it."""
 
    def __init__(self):

        # FIXME: log should be a class variable that all instances log to. 
        self.log = []
 
    def foo(self,*args):

        self.log.append((self.foo,args))
 
    def bar(self,*args):
 
        self.log.append((self.bar,args))

class MessagerTest(unittest.TestCase):

    def setUp(self):
    
        self.logger = Logger()
        
    def tearDown(self):
        pass

    # Accepting and sending messages.
    def testMultipleAccept(self):
        """foo and bar are subscribed to message a, when a is sent both foo and
        bar should be called."""

        self.logger.accept('a',self.logger.foo)
        self.logger.accept('a',self.logger.bar)

        messager.send('a')
        # Two methods were called.
        self.assertEqual(len(self.logger.log),2)
        # foo was called once.
        count = self.logger.log.count((self.logger.foo,()))
        self.assertEqual(count,1)
        # bar was called once.
        count = self.logger.log.count((self.logger.bar,()))
        self.assertEqual(count,1)
        
    def testAcceptOnce(self):
        """foo is subscribed to message b once only, bar is subscribed to it
        permanently. If b is sent twice, foo and bar should be called the first
        time, only bar should be called the second time."""

        self.logger.acceptOnce('b',self.logger.foo)
        self.logger.accept('b',self.logger.bar)
        
        messager.send('b')
        # foo should have been called once.
        count = self.logger.log.count((self.logger.foo,()))
        self.assertEqual(count,1)
        # bar should have been called once.
        count = self.logger.log.count((self.logger.bar,()))
        self.assertEqual(count,1)
        messager.send('b')
        # foo should still have been called only once.
        count = self.logger.log.count(( self.logger.foo,()))
        self.assertEqual(count,1)
        # bar should now have been called twice.
        count = self.logger.log.count((self.logger.bar,()))
        self.assertEqual(count,2)

    # Ignoring messages.
    def testIgnore(self):
        """foo and bar are subscribed to c, after ignore(c,foo) only bar should
        be called when c is sent."""

        self.logger.accept('c',self.logger.foo)
        self.logger.accept('c',self.logger.bar)
        self.logger.ignore('c',self.logger.foo)
        messager.send('c')
        # Only one method should have been called.
        self.assertEqual(len(self.logger.log),1)
        # bar should have been called once.
        count = self.logger.log.count((self.logger.bar,()))
        self.assertEqual(count,1)

    def testIgnoreMessage(self):
        """foo and bar are subscribed to c, after ignore(c) neither foo nor bar
        should be called."""

        self.logger.accept('c',self.logger.foo)
        self.logger.accept('c',self.logger.bar)
        self.logger.ignore('c')
        messager.send('c')
        # No methods should have been called.
        self.assertEqual(self.logger.log,[])
      
    def testIgnoreAll(self):
        """After a Receiver object calls ignoreAll() no methods of this object
        should be called when any message is sent, but methods of other objects 
        should continue to be called."""

        self.logger.accept('d',self.logger.foo)
        self.logger.accept('e',self.logger.bar)
        second_logger = Logger()
        second_logger.accept('d',second_logger.foo)
        second_logger.accept('e',second_logger.bar)

        self.logger.ignoreAll()
        messager.send('d')
        messager.send('e')
        # No methods of logger should have been called.
        self.assertEqual(self.logger.log,[])
        # Two methods should have been called on second_logger.
        self.assertEqual(len(second_logger.log),2)
        # foo should have been called once.
        count = second_logger.log.count((second_logger.foo,()))
        self.assertEqual(count,1)
        # bar should have been called once.
        count = second_logger.log.count((second_logger.bar,()))
        self.assertEqual(count,1)

    # Clear.
    def testClear(self):
        """After clear has been called, sending a message should not result in
        any functions being called."""
        
        messager.clear()
        messager.send('a')
        messager.send('b')
        messager.send('c')
        # No methods should have been called.
        self.assertEqual(self.logger.log,[])

    # Sending with arguments.    
    def testSendWithTwoArguments(self):
        """If accept is called with an argument and then send is called with an
        argument (and the same message name) the function subscribed via accept
        should be called with accept's argument followed by send's argument."""
        
        self.logger.accept('f',self.logger.foo,'accepter arg')
        messager.send('f','sender arg')
        # One method should have been called.
        self.assertEqual(len(self.logger.log),1)
        # foo should have been called with the two arguments in the right order.
        count = self.logger.log.count((self.logger.foo,('accepter arg','sender arg')))        
        self.assertEqual(count,1)
        
    def testSendWithNoAccepterArgument(self):
        """If no argument is given to the accept method, but an argument is
        given to the send method, then the subscribed function(s) should be
        called with the send argument only."""
        
        self.logger.accept('foo',self.logger.foo)
        messager.send('foo','sender arg')
        # One method should have been called.
        self.assertEqual(len(self.logger.log),1)
        # foo should have been called with the right argument.
        count = self.logger.log.count((self.logger.foo,('sender arg',)))        
        self.assertEqual(count,1)

    def testSendWithNoSenderArgument(self):
        """If no argument is given to the send method, but an argument is given
        to the accept method, then the subscribed function(s) should be called
         with the accept argument only."""
        
        self.logger.accept('foo',self.logger.foo,'accepter arg')
        messager.send('foo')
        # One method should have been called.
        self.assertEqual(len(self.logger.log),1)
        # foo should have been called with the right argument.
        count = self.logger.log.count((self.logger.foo,('accepter arg',)))        
        self.assertEqual(count,1)
        
if __name__ == '__main__':
    unittest.main()

Python 在Python中列出目录

# http://www.diveintopython.org/file_handling/os_module.html

# Use a one-line list comprehension to get all the files in a given directory with a given extension.
import os
dir = '.'
ext = '.txt'
txt_files = [f for f in os.listdir(dir) if os.path.isfile(os.path.join(dir,f)) and f.endswith(ext)]

# os.path.join joins a directory and a filename into a path. You can also split a path name into directory and file with 
# os.path.split(), and you can split a filename with extension into filename and extension with os.path.splitext()
# os.path.expandUser() will expand '~' to the absolute path of the current user's home directory on Windows, Linux or Mac

# The rest of the os.path module:
# http://docs.python.org/lib/module-os.path.html

Python 使用Python OS模块创建和删除文件和目录

import os

# Make a new file.
# Simply opening a file in write mode will create it, if it doesn't exist. (If
# the file does exist, the act of opening it in write mode will completely
# overwrite its contents.)
try:
    f = open("file.txt", "w")
except IOError:
    pass

# Remove a file.
try:
    os.remove(temp)
except os.error:
    pass

# Make a new directory.
os.mkdir('dirname')

# Recursive directory creation: creates dir_c and if necessary dir_b and dir_a.
os.makedirs('dir_a/dir_b/dir_c')

# Remove an empty directory.
os.rmdir('dirname')
os.rmdir('dir_a/dir_b/dir_c') # Removes dir_c only.

# Recursively remove empty directories.
# removedirs removes all empty directories in the given path.
os.removedirs('dir_a/dir_b/dir_c')

# Neither rmdir or removedirs can remove a non-empty directory, for that you need the further file 
# operations in the shutil module.
# This removes the directory 'three' and anything beneath it in the filesystem.
import shutil
shutil.rmtree('one/two/three')

Python 获取目录文件扩展名列表

def getFileExtList (dirPath,uniq=True,sorted=True):
    extList=list()
    for dirpath,dirnames,filenames in os.walk(dirPath):
        for file in filenames:
            fileExt=os.path.splitext(file)[-1]
            extList.append(fileExt)
            
    if uniq:
        extList=list(set(extList))
    if sorted:
        extList.sort()
    return extList

Python WiFi信号监视器有6个源代码行

import time
from pythonwifi.iwlibs import Wireless
 
wifi = Wireless('eth1')
 
while True:
    print wifi.getStatistics()[1].getSignallevel()
    time.sleep(1)