python Python - Fibonacci迭代器

Python - Fibonacci迭代器

fibonacci.py
class Fib:                                        
    def __init__(self, max):                      
        self.max = max

    def __iter__(self):                           
        self.a = 0
        self.b = 1
        return self

    def __next__(self):                           
        fib = self.a
        if fib > self.max:
            raise StopIteration                   
        self.a, self.b = self.b, self.a + self.b
        return fib                                

python python中一个非常基本的链表。

python中一个非常基本的链表。

gistfile1.py
class Node:
    def __init__(self, cargo=None, next=None):
        self.cargo = cargo
        self.next  = next
    def __str__(self):
        return str(self.cargo)

# => Create a list of Node objects, each with .next being None
linked = [Node(x) for x in range(0,100)]

# => Link each item in the list to next item up to `len(linked) - 1`
for idx, value in enumerate(linked): 
    if idx < len(linked) - 1:
        linked[idx].next = linked[idx+1]

# => Linking nodes in the list 
node = linked[0]
while node.next is not None:
    print node.cargo
    node = node.next

python 将gbk zipfile解压缩到utf8文件中

将gbk zipfile解压缩到utf8文件中

unzipgbk.py
#!/usr/bin/env   python
# -*- encoding:  utf-8 -*-
#
#       Author:  Rex Zhang
#  Create Time:  2013-07-28 12:32
#    File name:  unzipgbk.py

"""
./unzipgbk

unzip gbk zipfile into utf8 files
usage: ./unzipgbk File.zip [File1.zip ... ]

./unzipgbk 吉卜力.-.\[街头自动风琴演奏宫崎骏的世界\].专辑.\(ape\).zip
creating path ストリートオルガンが奏でる宮崎駿の世界
[1/16] ストリートオルガンが奏でる宮崎駿の世界/01.人生のメリーゴーランド.ape ...
[2/16] ストリートオルガンが奏でる宮崎駿の世界/02.世界の約束.ape ...
[3/16] ストリートオルガンが奏でる宮崎駿の世界/03.となりのトトロ.ape ...
[4/16] ストリートオルガンが奏でる宮崎駿の世界/04.さんぽ.ape ...
[5/16] ストリートオルガンが奏でる宮崎駿の世界/05.風のとおり道.ape ...
[6/16] ストリートオルガンが奏でる宮崎駿の世界/06.もののけ姫.ape ...
[7/16] ストリートオルガンが奏でる宮崎駿の世界/07.君をのせて.ape ...
[8/16] ストリートオルガンが奏でる宮崎駿の世界/08.晴れた日に.ape ...
[9/16] ストリートオルガンが奏でる宮崎駿の世界/09.やさしさに包まれたなら.ape ...
[10/16] ストリートオルガンが奏でる宮崎駿の世界/10.カントリーロード.ape ...
[11/16] ストリートオルガンが奏でる宮崎駿の世界/11.アドリアの海へ.ape ...
[12/16] ストリートオルガンが奏でる宮崎駿の世界/12.鳥の人 -エンディング-.ape ...
[13/16] ストリートオルガンが奏でる宮崎駿の世界/13.風の谷のナウシカ.ape ...
[14/16] ストリートオルガンが奏でる宮崎駿の世界/14.ふたたび.ape ...
[15/16] ストリートオルガンが奏でる宮崎駿の世界/15.いつも何度でも.ape ...
"""

from zipfile import ZipFile
import sys
import os

def unzip(zf):

    source = ZipFile(zf, 'r')

    total = len(source.filelist)

    index = 0

    for file in source.filelist:
        if '/' in file.filename:
            dirname = os.path.sep.join(file.filename.split('/')[:-1])
            dirname = dirname.decode('gbk').encode('utf-8')

            if not os.path.exists(dirname):
                print 'creating path %s' % dirname
                try:
                    os.popen('mkdir -p "%s"' % dirname)
                except Exception as e:
                    print 'create path (%s) error: %s' % (dirname, str(e))

        #ignore path
        if file.filename.endswith('/'):
            continue

        try:
            index += 1
            new_name = file.filename.decode('gbk').encode('utf-8')
            print '[%s/%s] %s ...' % (index, total, new_name)
            fp = open(new_name, 'wb')
            fp.write(source.read(file))
            fp.close()

        except Exception as e:
            print 'error: ', e

    source.close()

if __name__ == '__main__':

    if len(sys.argv) <= 1:

        print '''unzip gbk zipfile into utf8 files\nusage: %s File.zip [File1.zip ... ]''' % sys.argv[0]
        exit(0)

    for f in sys.argv[1:]:
        unzip(f)

python download_voa.py

download_voa.py
# -*- coding: utf8 -*-

# 下载速度很慢,
import urllib2, urllib
import sys
import os
import socket
import re

import socks
socks.setdefaultproxy(socks.PROXY_TYPE_SOCKS5, '127.0.0.1', 9050, rdns=False)
socket.socket = socks.socksocket

# set urllib2 timeout
socket.setdefaulttimeout(300)

# config
INDEX_PAGE = 'http://learningenglish.voanews.com/programindex.html'
HOST = 'http://learningenglish.voanews.com'
VOA_DIR = os.path.join('D:\\', 'VOA')
RETRY_TIMES = 3

# re patterns
re_themes = re.compile('''<h4><a href=['"](.*?http.*?latest.*?)['"]>(.*?)</a></h4>''')
re_articles = re.compile('<h4.*?(/content/.*?/\d+\.html).*?</h4>')
re_article_title = re.compile('<title>\s+(.*)\s+</title>')
re_article_pdf = re.compile('''href=['"](.*pdf)['"]''')
re_audio_page = re.compile('/audio/Audio/\d+\.html')
re_article_audio = re.compile('(http:.*mp3)')

# helper
def download_data( url ):
    count = 0
    while count < RETRY_TIMES:
        count += 1
        data = urllib2.urlopen(url).read()
        if data:
            return data
        else:
            continue
    return ''

def save_url_to_file(url, file_path):
    # if file already exists and has the same length(in bytes) with the server, do not download data
    # if server do not return Content-Length header, then do not download again
    if os.path.isfile(file_path):
        # check length
        length_s = urllib.urlopen( url ).info().get('Content-Length', 0)
        length_l = os.path.getsize( file_path )
        #print 'length_s = ', repr(length_s)
        #print 'length_l = ', repr(length_l)
        if length_s == 0 or long(length_s) == (length_l):
            return True
    
    # so, redownload the file
    # when exception happen, delete the partly downloaded file
    try:
        urllib.urlretrieve(url, file_path, reporthook)
    except:
        if os.path.isfile(file_path):
            os.remove( file_path)
        raise
    else:
        return True
    
# show download progress
def reporthook(blocks_read,block_size,total_size):  
    if not blocks_read:  
        print ("Connection opened")  
    if total_size <0:  
        #print "\rRead %d blocks"  % blocks_read
        sys.stdout.write("\rRead %d blocks   "  % blocks_read)
        sys.stdout.flush()
    else:  
        #print "\rdownloading: %d KB, totalsize: %d KB" % (blocks_read*block_size/1024.0,total_size/1024.0)
        sys.stdout.write("\rdownloading: %d KB, totalsize: %d KB   " % (blocks_read*block_size/1024.0,total_size/1024.0))
        sys.stdout.flush()
    	
# get themes
# theme name and page for latest articles
print 'From %s parsing themes ...' % INDEX_PAGE
html = download_data(INDEX_PAGE)
themes = re.findall(re_themes, html)
if themes:
    themes = set(themes)
    print 'Got %d themes:' % len(themes)
    for theme in themes:
        print 'Theme: %s. Page: %s.' % (theme[1], theme[0])
else:
    sys.exit()
  
for theme in themes:
    theme_name = theme[1]
    theme_index = theme[0]

    # get article's page
    theme_html = download_data(theme_index)
    if not theme_html: sys.exit()
    article_urls = re.findall(re_articles, theme_html)

    for article in article_urls:
        article_url = HOST + article
        print 'Getting info from %s' % article_url
        article_html = download_data(article_url)
        if not article_html: sys.exit()
        print 'Got it!'
        
        try:
            # get article title          
            article_title = re.search(re_article_title, article_html).groups()
            if not article_title: sys.exit()
            article_title =  '-'.join( re.findall('(\w+)', article_title[0]) )
            #article_title = '-'.join( article_title.split(' ') )
            print 'Got article title: %s' % article_title
          
            # get pdf url
            article_pdf = re.search(re_article_pdf, article_html).groups()
            if not article_pdf: sys.exit()
            article_pdf = article_pdf[0]
            print 'Got pdf url: %s' % article_pdf
            
            # get audio url
            audio_url = HOST + re.search(re_audio_page, article_html).group()
            print 'Getting info from audio_url %s' % audio_url
            audio_html = download_data(audio_url)
            if not audio_html: sys.exit()
            article_audio = re.search(re_article_audio, audio_html).group()
            print 'Got audio url: %s' % article_audio

            print 'Downloading PDF ...'
            file = os.path.join(VOA_DIR, article_title + '.pdf')
            if save_url_to_file( article_pdf, file):
                print 'OK'
            else:
                print 'Failed'
            print 'Downloading MP3 ...'
            #print str(article_audio)
            file = os.path.join(VOA_DIR, article_title + '.mp3')
            if save_url_to_file( article_audio, file):
                print 'OK'
            else:
                print 'Failed' 
            
        except AttributeError as e:
            pass
                       
print 'end'

python Del antiprocrastinador

Del antiprocrastinador

validation.py
      #======================================
  		#        SH-------------------EH------------------------------
			#       *SH-------------------EH------------------------------
			#======================================

			# Comprobar si existe una actividad con ese mismo horario
			at_activities = UserActivity.objects.filter(
				user=user,
				# Mismo día de la semana
				activity__day_of_week=activity.day_of_week,
				# Ninguna actividad debe iniciar en el rango de horario de la fecha a guardar
				activity__start_hour=activity.start_hour,
				activity__end_hour=activity.end_hour,
				# La finalización de la relación con la actividad debe ser mayor a la fecha de la actividad
				end_time__gte = activity.date,
				# El comienzo de la relación con la actividad debe ser menor a la fecha de la actividad
				start_time__lte = activity.date,
				deleted=False)

			#======================================
			# --------------SH--------*SH------------*EH--------EH
			#======================================

			# Comprobar si existe una actividad en ese horario.
			in_activities = UserActivity.objects.filter(
				user=user,
				# Mismo día de la semana
				activity__day_of_week=activity.day_of_week,
				# Ninguna actividad debe iniciar en el rango de horario de la fecha a guardar
				activity__start_hour__gt=activity.start_hour,
				activity__start_hour__lt=activity.end_hour,
				activity__end_hour__lt=activity.end_hour,
				# La finalización de la relación con la actividad debe ser mayor a la fecha de la actividad
				end_time__gte = activity.date,
				# El comienzo de la relación con la actividad debe ser menor a la fecha de la actividad
				start_time__lte = activity.date,
				deleted=False)

			#======================================
			#                 SH------------------------EH
			#       *SH-------------------*EH------------------------------
			#======================================

			# Comprobar si existe una actividad por sobre el horario.
			cross_left_activities = UserActivity.objects.filter(
				user=user,
				# Mismo día de la semana
				activity__day_of_week=activity.day_of_week,
				# Ninguna actividad debe iniciar en el rango de horario de la fecha a guardar
				activity__start_hour__lte=activity.start_hour,
				activity__end_hour__gt=activity.start_hour,
				activity__end_hour__lte=activity.end_hour,
				# La finalización de la relación con la actividad debe ser mayor a la fecha de la actividad
				end_time__gte = activity.date,
				# El comienzo de la relación con la actividad debe ser mayor a la fecha de la actividad
				start_time__lte = activity.date,
				deleted=False)

			#======================================
			#                 SH------------------------EH
			#       *SH---------------------------------------------------*EH
			#======================================

			# Comprobar si existe una actividad por sobre el horario.
			wrap_activities = UserActivity.objects.filter(
				user=user,
				# Mismo día de la semana
				activity__day_of_week=activity.day_of_week,
				# Ninguna actividad debe iniciar en el rango de horario de la fecha a guardar
				activity__start_hour__lt=activity.start_hour,
				activity__end_hour__gt=activity.end_hour,
				# La finalización de la relación con la actividad debe ser mayor a la fecha de la actividad
				end_time__gte = activity.date,
				# El comienzo de la relación con la actividad debe ser mayor a la fecha de la actividad
				start_time__lte = activity.date,
				deleted=False)

			#======================================
			#                 SH------------------------EH
			#       ----------------------*SH---------------------------------------------------*EH
			#======================================

			# Comprobar si existe una actividad por sobre el horario.
			cross_right_activities = UserActivity.objects.filter(
				user=user,
				# Mismo día de la semana
				activity__day_of_week=activity.day_of_week,
				# Ninguna actividad debe iniciar en el rango de horario de la fecha a guardar
				activity__start_hour__lt=activity.end_hour,
				activity__start_hour__gte=activity.start_hour,
				activity__end_hour__gte=activity.end_hour,
				# La finalización de la relación con la actividad debe ser mayor a la fecha de la actividad
				end_time__gte = activity.date,
				# El comienzo de la relación con la actividad debe ser mayor a la fecha de la actividad
				start_time__lte = activity.date,
				deleted=False)

python convert_ways.py

convert_ways.py
# basically the same thing as require()
# in nodejs
from xml.etree.ElementTree import ElementTree
from sys import argv
from datetime import datetime
import time
import json

tree = ElementTree()

if (len(argv) < 3):
    print "specify an input & output filename. input is osm, output is geojson"
    exit()

tree.parse(argv[1])

geojson = { "type": "FeatureCollection", "features": [] }

nodeidx = {}

print 'mapping nodes'

for n in tree.iterfind('node'):
    nodeidx[n.attrib['id']] = [float(n.attrib['lon']), float(n.attrib['lat'])]

print 'mapping ways'

for w in tree.iterfind('way'):
    tags = {}
    for t in w.iterfind('tag'):
        tags[t.attrib['k']] = t.attrib['v']
    way = {
        "type": "Feature",
        "geometry": {
            "type": 'LineString',
            "coordinates": []
        },
        "properties": { }
        }
    for n in w.iterfind('nd'):
        way['geometry']['coordinates'].append(nodeidx[n.attrib['ref']])
        if tags.has_key('highway'):
            way['properties']['highway'] = tags['highway']
            #print w.attrib['user']          
        
        way['properties']['user'] = w.attrib['user']
        way['properties']['timestamp'] = time.mktime(datetime.strptime(w.attrib['timestamp'], '%Y-%m-%dT%H:%M:%SZ').utctimetuple())
        print w.attrib['timestamp']
    geojson['features'].append(way)

print 'saving geojson'

json.dump(geojson, open(argv[2], 'w'))

python 使用dnslib.py在Python中使用简单的DNS服务器(UDP和TCP)

Simple DNS server (UDP and TCP) in Python using dnslib.py
# coding=utf-8
"""
LICENSE http://www.apache.org/licenses/LICENSE-2.0
"""
import datetime
import sys
import time
import threading
import traceback
import SocketServer
from dnslib import *


class DomainName(str):
    def __getattr__(self, item):
        return DomainName(item + '.' + self)


D = DomainName('example.com')
IP = '127.0.0.1'
TTL = 60 * 5
PORT = 5053

soa_record = SOA(
    mname=D.ns1,  # primary name server
    rname=D.andrei,  # email of the domain administrator
    times=(
        201307231,  # serial number
        60 * 60 * 1,  # refresh
        60 * 60 * 3,  # retry
        60 * 60 * 24,  # expire
        60 * 60 * 1,  # minimum
    )
)
ns_records = [NS(D.ns1), NS(D.ns2)]
records = {
    D: [A(IP), AAAA((0,) * 16), MX(D.mail), soa_record] + ns_records,
    D.ns1: [A(IP)],  # MX and NS records must never point to a CNAME alias (RFC 2181 section 10.3)
    D.ns2: [A(IP)],
    D.mail: [A(IP)],
    D.andrei: [CNAME(D)],
}


def dns_response(data):
    request = DNSRecord.parse(data)

    print request

    reply = DNSRecord(DNSHeader(id=request.header.id, qr=1, aa=1, ra=1), q=request.q)

    qname = request.q.qname
    qn = str(qname)
    qtype = request.q.qtype
    qt = QTYPE[qtype]

    if qn == D or qn.endswith('.' + D):

        for name, rrs in records.iteritems():
            if name == qn:
                for rdata in rrs:
                    rqt = rdata.__class__.__name__
                    if qt in ['*', rqt]:
                        reply.add_answer(RR(rname=qname, rtype=QTYPE[rqt], rclass=1, ttl=TTL, rdata=rdata))

        for rdata in ns_records:
            reply.add_ns(RR(rname=D, rtype=QTYPE.NS, rclass=1, ttl=TTL, rdata=rdata))

        reply.add_ns(RR(rname=D, rtype=QTYPE.SOA, rclass=1, ttl=TTL, rdata=soa_record))

    print "---- Reply:\n", reply

    return reply.pack()


class BaseRequestHandler(SocketServer.BaseRequestHandler):

    def get_data(self):
        raise NotImplementedError

    def send_data(self, data):
        raise NotImplementedError

    def handle(self):
        now = datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S.%f')
        print "\n\n%s request %s (%s %s):" % (self.__class__.__name__[:3], now, self.client_address[0],
                                               self.client_address[1])
        try:
            data = self.get_data()
            print len(data), data.encode('hex')  # repr(data).replace('\\x', '')[1:-1]
            self.send_data(dns_response(data))
        except Exception:
            traceback.print_exc(file=sys.stderr)


class TCPRequestHandler(BaseRequestHandler):

    def get_data(self):
        data = self.request.recv(8192).strip()
        sz = int(data[:2].encode('hex'), 16)
        if sz < len(data) - 2:
            raise Exception("Wrong size of TCP packet")
        elif sz > len(data) - 2:
            raise Exception("Too big TCP packet")
        return data[2:]

    def send_data(self, data):
        sz = hex(len(data))[2:].zfill(4).decode('hex')
        return self.request.sendall(sz + data)


class UDPRequestHandler(BaseRequestHandler):

    def get_data(self):
        return self.request[0].strip()

    def send_data(self, data):
        return self.request[1].sendto(data, self.client_address)


if __name__ == '__main__':
    print "Starting nameserver..."

    servers = [
        SocketServer.ThreadingUDPServer(('', PORT), UDPRequestHandler),
        SocketServer.ThreadingTCPServer(('', PORT), TCPRequestHandler),
    ]
    for s in servers:
        thread = threading.Thread(target=s.serve_forever)  # that thread will start one more thread for each request
        thread.daemon = True  # exit the server thread when the main thread terminates
        thread.start()
        print "%s server loop running in thread: %s" % (s.RequestHandlerClass.__name__[:3], thread.name)

    try:
        while 1:
            time.sleep(1)
            sys.stderr.flush()
            sys.stdout.flush()

    except KeyboardInterrupt:
        pass
    finally:
        for s in servers:
            s.shutdown()

python 格式化单个值以在符合RFC4180的CSV文件中使用。使用逗号将这个函数返回的多个值连接到crea

格式化单个值以在符合RFC4180的CSV文件中使用。使用逗号连接此函数返回的多个值以创建行。

csv.py
def _format_csv_value(self, value):
    """Takes a single CSV value string and formats it in compliance with RFC4180.
    Multiple values can be joined together by putting them in a list and using ",".join(the_list).
    http://tools.ietf.org/html/rfc4180#page-3
    
    :param value: A single value destined to be output among multiple in a CSV row
    
    :return: The escaped and/or quoted string if necessary, otherwise simply returns <value>.
    """
    for x in [",", '"', "\n", "\r\n"]:
        if x in value:
            # Must replace double quotes '"' with two double quotes '""'
            value = value.replace('"', '""')
            # and contain all fields in double quotes if they contain commas or double quotes
            value = '"%s"' % value
            break
    return value

python 这个脚本在git log中搜索你所在的当前目录,并在“base commit”上启动一个交互式rebase,这是s

这个脚本在git log中搜索你所在的当前目录,并在“base commit”上启动一个交互式rebase,这只是一个不以“fixup”开头的提交。

gitfixup.py
#!/usr/bin/env python

import subprocess
p = subprocess.Popen(["git", "log", "--format='%s'"], stdout=subprocess.PIPE)
out, err = p.communicate()

# Keep going until we don't find a "fixup! " 
fixup_count = 0 
max_list_size = 10                                                                                                                                                                                                                                                              
commits = out.split('\n')[:max_list_size]

for c in commits:
    if "fixup! " not in c:
        break
    fixup_count += 1

head_str = "HEAD~%d" % (fixup_count + 1)
print("Fixup count: %d" % fixup_count)

if fixup_count == max_list_size:
    print("No base commit found in the last %d commits!" % max_list_size)
    print("You should REALLY consider rebasing...")
elif fixup_count == 0:
    print("No fixup commits found. Nothing to do")
else:
    print("Found base commit %d commits back: %s" % (fixup_count, c)) 
    print("*** Calling interactive rebase on %s..." % head_str)

    subprocess.call(["git", "rebase", "-i", "--autosquash", head_str])

python gistfile1.py

gistfile1.py
import urllib2
import numpy as np

train_data_file = 'http://archive.ics.uci.edu/ml/machine-learning-databases/optdigits/optdigits.tra'
test_data_file = 'http://archive.ics.uci.edu/ml/machine-learning-databases/optdigits/optdigits.tes'

tmp = np.loadtxt(urllib2.urlopen(train_data_file), delimiter=',')
train_data, train_labels = tmp[:, :-1], tmp[:, -1]

tmp = np.loadtxt(urllib2.urlopen(test_data_file), delimiter=',')
test_data, test_labels = tmp[:, :-1], tmp[:, -1]
dists = (train_data ** 2).sum(axis=1)[:, np.newaxis] + (test_data ** 2).sum(axis=1) - 2 * np.dot(train_data, test_data.T)
closest = train_labels[np.argsort(dists, axis=0)]
print "1-NN accuracy: ", (closest[0] == test_labels).sum() / float(len(test_labels))