#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
merge_chains_multi.py
このスクリプトは、modelangeloから出力されたCIFファイルを解析し、
複数の配列を含むFASTAファイルに対応して、
不要な短いチェインを削除し、残ったチェインを統合してPDBファイルとして出力します。
使用方法:
python merge_chains_multi.py
例:
python merge_chains_multi.py protein_sequences.fasta modelangelo_output.cif filtered_output.pdb
"""
import sys
from collections import defaultdict
from Bio import SeqIO
from Bio.PDB import MMCIFParser, PDBIO, Chain
import string
class ChainSelector(Chain):
"""
指定されたチェインのみをPDB出力するためのセレクタークラス。
"""
def __init__(self, selected_chains):
self.selected_chains = selected_chains
def accept_chain(self, chain):
return chain.id in self.selected_chains
def parse_fasta(fasta_file):
"""
複数のFASTA配列を読み込み、配列リストを取得します。
各配列にメインチェインID(A, B, C, ...)を割り当てます。
"""
try:
fasta_records = list(SeqIO.parse(fasta_file, "fasta"))
if not fasta_records:
print(f"FASTAファイル '{fasta_file}' に有効な配列が含まれていません。")
sys.exit(1)
sequence_info = []
main_chain_ids = list(string.ascii_uppercase)
if len(fasta_records) > len(main_chain_ids):
print("メインチェインIDの割り当てに失敗しました。サポートされるメインチェインIDの数を超えています。")
sys.exit(1)
for i, record in enumerate(fasta_records):
main_chain_id = main_chain_ids[i]
fasta_seq = str(record.seq)
seq_length = len(fasta_seq)
sequence_info.append({
'sequence_id': record.id,
'sequence': fasta_seq,
'length': seq_length,
'main_chain_id': main_chain_id
})
print(f"配列 '{record.id}' にメインチェインID '{main_chain_id}' を割り当てました。配列長: {seq_length}")
return sequence_info
except Exception as e:
print(f"FASTAファイルの読み込み中にエラーが発生しました: {e}")
sys.exit(1)
def parse_cif(cif_file):
"""
CIFファイルを解析し、構造情報を取得します。
"""
parser = MMCIFParser(QUIET=True)
try:
structure = parser.get_structure("modelangelo_structure", cif_file)
print(f"CIFファイル '{cif_file}' を解析しました。")
return structure
except Exception as e:
print(f"CIFファイルの解析中にエラーが発生しました: {e}")
sys.exit(1)
def extract_chain_info(structure):
"""
構造から各チェインの情報を抽出します。
チェインの長さが10残基以下の場合は除外します。
"""
chains_info = []
for model in structure:
for chain in model:
residues = [res for res in chain if res.id[0] == ' ']
if not residues:
continue
length = len(residues)
if length <= 10: print(f"チェイン '{chain.id}' は長さ {length} のため削除されます。") continue # 長さ10以下のチェインをスキップ start = residues[0].id[1] end = residues[-1].id[1] chains_info.append({ 'id': chain.id, 'start': start, 'end': end, 'length': length }) return chains_info def select_chains_per_main(sequence_info, chains_info): """ 各メインチェインごとにチェインを選択します。 メインチェインIDごとにチェインをフィルタリングし、重複を避けて選択します。 """ # グループ化されたチェイン情報 grouped_chains = defaultdict(list) # グループ化: メインチェインIDに基づく for chain in chains_info: # メインチェインIDの仮定: 最初の文字がメインチェインID main_chain_id = ''.join(filter(str.isalpha, chain['id']))[0] grouped_chains[main_chain_id].append(chain) # 選択されたチェインを保持 selected_chains = set() for seq_info in sequence_info: main_id = seq_info['main_chain_id'] if main_id not in grouped_chains: print(f"メインチェインID '{main_id}' に対応するチェインがCIFファイルに存在しません。") continue chains = grouped_chains[main_id] # 長さの降順にソート chains_sorted = sorted(chains, key=lambda x: x['length'], reverse=True) print(f"メインチェインID '{main_id}' のチェインを長さの降順にソートしました。") occupied_residues = set() for chain in chains_sorted: res_range = set(range(chain['start'], chain['end'] + 1)) if res_range.isdisjoint(occupied_residues): selected_chains.add(chain['id']) occupied_residues.update(res_range) print(f"チェイン '{chain['id']}' を選択しました。") else: print(f"チェイン '{chain['id']}' は既存のチェインと重複するためスキップされます。") return selected_chains def group_selected_chains(selected_chains): """ 選択されたチェインをメインチェインIDごとにグループ化します。 例: 'Aa', 'Ac' -> 'A'
"""
groups = defaultdict(list)
for cid in selected_chains:
# メインチェインIDの仮定: 最初の文字がメインチェインID
main_id = ''.join(filter(str.isalpha, cid))[0]
groups[main_id].append(cid)
return groups
def merge_chains(structure, groups):
"""
グループ化されたチェインを統合し、新しいチェインとして追加します。
元のチェインは削除されます。
"""
for model in structure:
for main_id, chain_ids in groups.items():
if not chain_ids:
continue
# メインチェインが存在しない場合、新規作成
if main_id in model:
new_chain = model[main_id]
print(f"既存のメインチェイン '{main_id}' にチェイン {', '.join(chain_ids)} をマージします。")
else:
new_chain = Chain.Chain(main_id)
model.add(new_chain)
print(f"新規メインチェイン '{main_id}' を作成し、チェイン {', '.join(chain_ids)} をマージします。")
for cid in chain_ids:
if cid == main_id:
# メインチェイン自体は既に存在する場合
continue
try:
chain = model[cid]
except KeyError:
print(f"チェイン '{cid}' がモデルに存在しません。スキップします。")
continue
for residue in chain:
new_chain.add(residue)
# 元のチェインをモデルから削除
model.detach_child(cid)
print(f"チェイン '{cid}' をモデルから削除しました。")
# 残基番号を再設定
renumber_residues(new_chain)
print(f"統合されたチェイン '{main_id}' の残基番号を再設定しました。")
return structure
def renumber_residues(chain):
"""
統合されたチェインの残基番号を1から連番に再設定します。
"""
new_resid = (' ', 1, ' ')
for residue in chain:
residue.id = new_resid
new_resid = (' ', new_resid[1] + 1, ' ')
def save_structure(structure, output_pdb):
"""
構造をPDB形式で保存します。
"""
io = PDBIO()
io.set_structure(structure)
try:
io.save(output_pdb)
print(f"フィルタリングおよびマージ後のPDBファイルが '{output_pdb}' に保存されました。")
except Exception as e:
print(f"PDBファイルの保存中にエラーが発生しました: {e}")
sys.exit(1)
def main(fasta_file, cif_file, output_pdb):
# 1. FASTAファイルの読み込み
sequence_info = parse_fasta(fasta_file)
# 2. CIFファイルの解析
structure = parse_cif(cif_file)
# 3. 各チェインの情報を収集
chains_info = extract_chain_info(structure)
if not chains_info:
print("有効なチェインが存在しません。処理を終了します。")
sys.exit(0)
# 4. 各メインチェインごとにチェインを選択
selected_chains = select_chains_per_main(sequence_info, chains_info)
if not selected_chains:
print("フィルタリング後に選択されたチェインが存在しません。処理を終了します。")
sys.exit(0)
# 5. 選択されたチェインをグループ化
groups = group_selected_chains(selected_chains)
# 6. チェインを統合
merged_structure = merge_chains(structure, groups)
# 7. PDBファイルとして保存
save_structure(merged_structure, output_pdb)
if __name__ == "__main__":
if len(sys.argv) != 4:
print("使用方法: python merge_chains_multi.py ")
print("例: python merge_chains_multi.py protein_sequences.fasta modelangelo_output.cif filtered_output.pdb")
sys.exit(1)
fasta_file = sys.argv[1]
cif_file = sys.argv[2]
output_pdb = sys.argv[3]
main(fasta_file, cif_file, output_pdb)
コメント